Files
vf_react/ml/model.py
2025-12-10 11:02:09 +08:00

394 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LSTM Autoencoder 模型定义
用于概念异动检测:
- 学习"正常"市场模式
- 重构误差大的时刻 = 异动
模型结构(简洁有效):
┌─────────────────────────────────────┐
│ 输入: (batch, seq_len, n_features) │
│ 过去30分钟的特征序列 │
├─────────────────────────────────────┤
│ LSTM Encoder │
│ - 双向 LSTM │
│ - 输出最后隐藏状态 │
├─────────────────────────────────────┤
│ Bottleneck (压缩层) │
│ 降维到 latent_dim关键
├─────────────────────────────────────┤
│ LSTM Decoder │
│ - 单向 LSTM │
│ - 重构序列 │
├─────────────────────────────────────┤
│ 输出: (batch, seq_len, n_features) │
│ 重构的特征序列 │
└─────────────────────────────────────┘
为什么用 LSTM 而不是 Transformer:
1. 参数更少,不容易过拟合
2. 对于 6 维特征足够用
3. 训练更稳定
4. 瓶颈约束更容易控制
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
class LSTMAutoencoder(nn.Module):
"""
LSTM Autoencoder for Anomaly Detection
设计原则:
- 足够简单,避免过拟合
- 瓶颈层严格限制,迫使模型只学习主要模式
- 异常难以通过狭窄瓶颈,重构误差大
"""
def __init__(
self,
n_features: int = 6,
hidden_dim: int = 32, # LSTM 隐藏维度(小!)
latent_dim: int = 4, # 瓶颈维度(非常小!关键参数)
num_layers: int = 1, # LSTM 层数
dropout: float = 0.2,
bidirectional: bool = True, # 双向编码器
):
super().__init__()
self.n_features = n_features
self.hidden_dim = hidden_dim
self.latent_dim = latent_dim
self.num_layers = num_layers
self.bidirectional = bidirectional
self.num_directions = 2 if bidirectional else 1
# Encoder: 双向 LSTM
self.encoder = nn.LSTM(
input_size=n_features,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=bidirectional
)
# Bottleneck: 压缩到极小的 latent space
encoder_output_dim = hidden_dim * self.num_directions
self.bottleneck_down = nn.Sequential(
nn.Linear(encoder_output_dim, latent_dim),
nn.Tanh(), # 限制范围,增加约束
)
# 使用 LeakyReLU 替代 ReLU
# 原因Z-Score 数据范围是 [-5, +5]ReLU 会截断负值,丢失跌幅信息
# LeakyReLU 保留负值信号(乘以 0.1
self.bottleneck_up = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.LeakyReLU(negative_slope=0.1),
)
# Decoder: 单向 LSTM
self.decoder = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False # 解码器用单向
)
# 输出层
self.output_layer = nn.Linear(hidden_dim, n_features)
# Dropout
self.dropout = nn.Dropout(dropout)
# 初始化
self._init_weights()
def _init_weights(self):
"""初始化权重"""
for name, param in self.named_parameters():
if 'weight_ih' in name:
nn.init.xavier_uniform_(param)
elif 'weight_hh' in name:
nn.init.orthogonal_(param)
elif 'bias' in name:
nn.init.zeros_(param)
def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
编码器
Args:
x: (batch, seq_len, n_features)
Returns:
latent: (batch, seq_len, latent_dim) 每个时间步的压缩表示
encoder_outputs: (batch, seq_len, hidden_dim * num_directions)
"""
# LSTM 编码
encoder_outputs, (h_n, c_n) = self.encoder(x)
# encoder_outputs: (batch, seq_len, hidden_dim * num_directions)
encoder_outputs = self.dropout(encoder_outputs)
# 压缩到 latent space对每个时间步
latent = self.bottleneck_down(encoder_outputs)
# latent: (batch, seq_len, latent_dim)
return latent, encoder_outputs
def decode(self, latent: torch.Tensor, seq_len: int) -> torch.Tensor:
"""
解码器
Args:
latent: (batch, seq_len, latent_dim)
seq_len: 序列长度
Returns:
output: (batch, seq_len, n_features)
"""
# 从 latent space 恢复
decoder_input = self.bottleneck_up(latent)
# decoder_input: (batch, seq_len, hidden_dim)
# LSTM 解码
decoder_outputs, _ = self.decoder(decoder_input)
# decoder_outputs: (batch, seq_len, hidden_dim)
decoder_outputs = self.dropout(decoder_outputs)
# 投影到原始特征空间
output = self.output_layer(decoder_outputs)
# output: (batch, seq_len, n_features)
return output
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
前向传播
Args:
x: (batch, seq_len, n_features)
Returns:
output: (batch, seq_len, n_features) 重构结果
latent: (batch, seq_len, latent_dim) 隐向量
"""
batch_size, seq_len, _ = x.shape
# 编码
latent, _ = self.encode(x)
# 解码
output = self.decode(latent, seq_len)
return output, latent
def compute_reconstruction_error(
self,
x: torch.Tensor,
reduction: str = 'none'
) -> torch.Tensor:
"""
计算重构误差
Args:
x: (batch, seq_len, n_features)
reduction: 'none' | 'mean' | 'sum'
Returns:
error: 重构误差
"""
output, _ = self.forward(x)
# MSE per feature per timestep
error = F.mse_loss(output, x, reduction='none')
if reduction == 'none':
# (batch, seq_len, n_features) -> (batch, seq_len)
return error.mean(dim=-1)
elif reduction == 'mean':
return error.mean()
elif reduction == 'sum':
return error.sum()
else:
raise ValueError(f"Unknown reduction: {reduction}")
def detect_anomaly(
self,
x: torch.Tensor,
threshold: float = None,
return_scores: bool = True
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
检测异动
Args:
x: (batch, seq_len, n_features)
threshold: 异动阈值(如果为 None只返回分数
return_scores: 是否返回异动分数
Returns:
is_anomaly: (batch, seq_len) bool tensor (if threshold is not None)
scores: (batch, seq_len) 异动分数 (if return_scores)
"""
scores = self.compute_reconstruction_error(x, reduction='none')
is_anomaly = None
if threshold is not None:
is_anomaly = scores > threshold
if return_scores:
return is_anomaly, scores
else:
return is_anomaly, None
# 为了兼容性,创建别名
TransformerAutoencoder = LSTMAutoencoder
# ==================== 损失函数 ====================
class AnomalyDetectionLoss(nn.Module):
"""
异动检测损失函数
简单的 MSE 重构损失
"""
def __init__(
self,
feature_weights: torch.Tensor = None,
):
super().__init__()
self.feature_weights = feature_weights
def forward(
self,
output: torch.Tensor,
target: torch.Tensor,
latent: torch.Tensor = None
) -> Tuple[torch.Tensor, dict]:
"""
Args:
output: (batch, seq_len, n_features) 重构结果
target: (batch, seq_len, n_features) 原始输入
latent: (batch, seq_len, latent_dim) 隐向量(未使用)
Returns:
loss: 总损失
loss_dict: 各项损失详情
"""
# 重构损失 (MSE)
mse = F.mse_loss(output, target, reduction='none')
# 特征加权(可选)
if self.feature_weights is not None:
weights = self.feature_weights.to(mse.device)
mse = mse * weights
reconstruction_loss = mse.mean()
loss_dict = {
'total': reconstruction_loss.item(),
'reconstruction': reconstruction_loss.item(),
}
return reconstruction_loss, loss_dict
# ==================== 工具函数 ====================
def count_parameters(model: nn.Module) -> int:
"""统计模型参数量"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
def create_model(config: dict = None) -> LSTMAutoencoder:
"""
创建模型
默认使用小型 LSTM 配置,适合异动检测
"""
default_config = {
'n_features': 6,
'hidden_dim': 32, # 小!
'latent_dim': 4, # 非常小!关键
'num_layers': 1,
'dropout': 0.2,
'bidirectional': True,
}
if config:
# 兼容旧的 Transformer 配置键名
if 'd_model' in config:
config['hidden_dim'] = config.pop('d_model') // 2
if 'num_encoder_layers' in config:
config['num_layers'] = config.pop('num_encoder_layers')
if 'num_decoder_layers' in config:
config.pop('num_decoder_layers')
if 'nhead' in config:
config.pop('nhead')
if 'dim_feedforward' in config:
config.pop('dim_feedforward')
if 'max_seq_len' in config:
config.pop('max_seq_len')
if 'use_instance_norm' in config:
config.pop('use_instance_norm')
default_config.update(config)
model = LSTMAutoencoder(**default_config)
param_count = count_parameters(model)
print(f"模型参数量: {param_count:,}")
if param_count > 100000:
print(f"⚠️ 警告: 参数量较大({param_count:,}),可能过拟合")
else:
print(f"✓ 参数量适中LSTM Autoencoder")
return model
if __name__ == "__main__":
# 测试模型
print("测试 LSTM Autoencoder...")
# 创建模型
model = create_model()
# 测试输入
batch_size = 32
seq_len = 30
n_features = 6
x = torch.randn(batch_size, seq_len, n_features)
# 前向传播
output, latent = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐向量形状: {latent.shape}")
# 计算重构误差
error = model.compute_reconstruction_error(x)
print(f"重构误差形状: {error.shape}")
print(f"平均重构误差: {error.mean().item():.4f}")
# 测试异动检测
is_anomaly, scores = model.detect_anomaly(x, threshold=0.5)
print(f"异动检测结果形状: {is_anomaly.shape if is_anomaly is not None else 'None'}")
print(f"异动分数形状: {scores.shape}")
# 测试损失函数
criterion = AnomalyDetectionLoss()
loss, loss_dict = criterion(output, x, latent)
print(f"损失: {loss.item():.4f}")
print("\n测试通过!")