#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LSTM Autoencoder 模型定义 用于概念异动检测: - 学习"正常"市场模式 - 重构误差大的时刻 = 异动 模型结构(简洁有效): ┌─────────────────────────────────────┐ │ 输入: (batch, seq_len, n_features) │ │ 过去30分钟的特征序列 │ ├─────────────────────────────────────┤ │ LSTM Encoder │ │ - 双向 LSTM │ │ - 输出最后隐藏状态 │ ├─────────────────────────────────────┤ │ Bottleneck (压缩层) │ │ 降维到 latent_dim(关键!) │ ├─────────────────────────────────────┤ │ LSTM Decoder │ │ - 单向 LSTM │ │ - 重构序列 │ ├─────────────────────────────────────┤ │ 输出: (batch, seq_len, n_features) │ │ 重构的特征序列 │ └─────────────────────────────────────┘ 为什么用 LSTM 而不是 Transformer: 1. 参数更少,不容易过拟合 2. 对于 6 维特征足够用 3. 训练更稳定 4. 瓶颈约束更容易控制 """ import torch import torch.nn as nn import torch.nn.functional as F from typing import Optional, Tuple class LSTMAutoencoder(nn.Module): """ LSTM Autoencoder for Anomaly Detection 设计原则: - 足够简单,避免过拟合 - 瓶颈层严格限制,迫使模型只学习主要模式 - 异常难以通过狭窄瓶颈,重构误差大 """ def __init__( self, n_features: int = 6, hidden_dim: int = 32, # LSTM 隐藏维度(小!) latent_dim: int = 4, # 瓶颈维度(非常小!关键参数) num_layers: int = 1, # LSTM 层数 dropout: float = 0.2, bidirectional: bool = True, # 双向编码器 ): super().__init__() self.n_features = n_features self.hidden_dim = hidden_dim self.latent_dim = latent_dim self.num_layers = num_layers self.bidirectional = bidirectional self.num_directions = 2 if bidirectional else 1 # Encoder: 双向 LSTM self.encoder = nn.LSTM( input_size=n_features, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=bidirectional ) # Bottleneck: 压缩到极小的 latent space encoder_output_dim = hidden_dim * self.num_directions self.bottleneck_down = nn.Sequential( nn.Linear(encoder_output_dim, latent_dim), nn.Tanh(), # 限制范围,增加约束 ) # 使用 LeakyReLU 替代 ReLU # 原因:Z-Score 数据范围是 [-5, +5],ReLU 会截断负值,丢失跌幅信息 # LeakyReLU 保留负值信号(乘以 0.1) self.bottleneck_up = nn.Sequential( nn.Linear(latent_dim, hidden_dim), nn.LeakyReLU(negative_slope=0.1), ) # Decoder: 单向 LSTM self.decoder = nn.LSTM( input_size=hidden_dim, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=False # 解码器用单向 ) # 输出层 self.output_layer = nn.Linear(hidden_dim, n_features) # Dropout self.dropout = nn.Dropout(dropout) # 初始化 self._init_weights() def _init_weights(self): """初始化权重""" for name, param in self.named_parameters(): if 'weight_ih' in name: nn.init.xavier_uniform_(param) elif 'weight_hh' in name: nn.init.orthogonal_(param) elif 'bias' in name: nn.init.zeros_(param) def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ 编码器 Args: x: (batch, seq_len, n_features) Returns: latent: (batch, seq_len, latent_dim) 每个时间步的压缩表示 encoder_outputs: (batch, seq_len, hidden_dim * num_directions) """ # LSTM 编码 encoder_outputs, (h_n, c_n) = self.encoder(x) # encoder_outputs: (batch, seq_len, hidden_dim * num_directions) encoder_outputs = self.dropout(encoder_outputs) # 压缩到 latent space(对每个时间步) latent = self.bottleneck_down(encoder_outputs) # latent: (batch, seq_len, latent_dim) return latent, encoder_outputs def decode(self, latent: torch.Tensor, seq_len: int) -> torch.Tensor: """ 解码器 Args: latent: (batch, seq_len, latent_dim) seq_len: 序列长度 Returns: output: (batch, seq_len, n_features) """ # 从 latent space 恢复 decoder_input = self.bottleneck_up(latent) # decoder_input: (batch, seq_len, hidden_dim) # LSTM 解码 decoder_outputs, _ = self.decoder(decoder_input) # decoder_outputs: (batch, seq_len, hidden_dim) decoder_outputs = self.dropout(decoder_outputs) # 投影到原始特征空间 output = self.output_layer(decoder_outputs) # output: (batch, seq_len, n_features) return output def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ 前向传播 Args: x: (batch, seq_len, n_features) Returns: output: (batch, seq_len, n_features) 重构结果 latent: (batch, seq_len, latent_dim) 隐向量 """ batch_size, seq_len, _ = x.shape # 编码 latent, _ = self.encode(x) # 解码 output = self.decode(latent, seq_len) return output, latent def compute_reconstruction_error( self, x: torch.Tensor, reduction: str = 'none' ) -> torch.Tensor: """ 计算重构误差 Args: x: (batch, seq_len, n_features) reduction: 'none' | 'mean' | 'sum' Returns: error: 重构误差 """ output, _ = self.forward(x) # MSE per feature per timestep error = F.mse_loss(output, x, reduction='none') if reduction == 'none': # (batch, seq_len, n_features) -> (batch, seq_len) return error.mean(dim=-1) elif reduction == 'mean': return error.mean() elif reduction == 'sum': return error.sum() else: raise ValueError(f"Unknown reduction: {reduction}") def detect_anomaly( self, x: torch.Tensor, threshold: float = None, return_scores: bool = True ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: """ 检测异动 Args: x: (batch, seq_len, n_features) threshold: 异动阈值(如果为 None,只返回分数) return_scores: 是否返回异动分数 Returns: is_anomaly: (batch, seq_len) bool tensor (if threshold is not None) scores: (batch, seq_len) 异动分数 (if return_scores) """ scores = self.compute_reconstruction_error(x, reduction='none') is_anomaly = None if threshold is not None: is_anomaly = scores > threshold if return_scores: return is_anomaly, scores else: return is_anomaly, None # 为了兼容性,创建别名 TransformerAutoencoder = LSTMAutoencoder # ==================== 损失函数 ==================== class AnomalyDetectionLoss(nn.Module): """ 异动检测损失函数 简单的 MSE 重构损失 """ def __init__( self, feature_weights: torch.Tensor = None, ): super().__init__() self.feature_weights = feature_weights def forward( self, output: torch.Tensor, target: torch.Tensor, latent: torch.Tensor = None ) -> Tuple[torch.Tensor, dict]: """ Args: output: (batch, seq_len, n_features) 重构结果 target: (batch, seq_len, n_features) 原始输入 latent: (batch, seq_len, latent_dim) 隐向量(未使用) Returns: loss: 总损失 loss_dict: 各项损失详情 """ # 重构损失 (MSE) mse = F.mse_loss(output, target, reduction='none') # 特征加权(可选) if self.feature_weights is not None: weights = self.feature_weights.to(mse.device) mse = mse * weights reconstruction_loss = mse.mean() loss_dict = { 'total': reconstruction_loss.item(), 'reconstruction': reconstruction_loss.item(), } return reconstruction_loss, loss_dict # ==================== 工具函数 ==================== def count_parameters(model: nn.Module) -> int: """统计模型参数量""" return sum(p.numel() for p in model.parameters() if p.requires_grad) def create_model(config: dict = None) -> LSTMAutoencoder: """ 创建模型 默认使用小型 LSTM 配置,适合异动检测 """ default_config = { 'n_features': 6, 'hidden_dim': 32, # 小! 'latent_dim': 4, # 非常小!关键 'num_layers': 1, 'dropout': 0.2, 'bidirectional': True, } if config: # 兼容旧的 Transformer 配置键名 if 'd_model' in config: config['hidden_dim'] = config.pop('d_model') // 2 if 'num_encoder_layers' in config: config['num_layers'] = config.pop('num_encoder_layers') if 'num_decoder_layers' in config: config.pop('num_decoder_layers') if 'nhead' in config: config.pop('nhead') if 'dim_feedforward' in config: config.pop('dim_feedforward') if 'max_seq_len' in config: config.pop('max_seq_len') if 'use_instance_norm' in config: config.pop('use_instance_norm') default_config.update(config) model = LSTMAutoencoder(**default_config) param_count = count_parameters(model) print(f"模型参数量: {param_count:,}") if param_count > 100000: print(f"⚠️ 警告: 参数量较大({param_count:,}),可能过拟合") else: print(f"✓ 参数量适中(LSTM Autoencoder)") return model if __name__ == "__main__": # 测试模型 print("测试 LSTM Autoencoder...") # 创建模型 model = create_model() # 测试输入 batch_size = 32 seq_len = 30 n_features = 6 x = torch.randn(batch_size, seq_len, n_features) # 前向传播 output, latent = model(x) print(f"输入形状: {x.shape}") print(f"输出形状: {output.shape}") print(f"隐向量形状: {latent.shape}") # 计算重构误差 error = model.compute_reconstruction_error(x) print(f"重构误差形状: {error.shape}") print(f"平均重构误差: {error.mean().item():.4f}") # 测试异动检测 is_anomaly, scores = model.detect_anomaly(x, threshold=0.5) print(f"异动检测结果形状: {is_anomaly.shape if is_anomaly is not None else 'None'}") print(f"异动分数形状: {scores.shape}") # 测试损失函数 criterion = AnomalyDetectionLoss() loss, loss_dict = criterion(output, x, latent) print(f"损失: {loss.item():.4f}") print("\n测试通过!")