update pay ui

This commit is contained in:
2025-12-09 08:31:18 +08:00
parent e4937c2719
commit 25492caf15
26 changed files with 15577 additions and 1061 deletions

390
ml/model.py Normal file
View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LSTM Autoencoder 模型定义
用于概念异动检测:
- 学习"正常"市场模式
- 重构误差大的时刻 = 异动
模型结构(简洁有效):
┌─────────────────────────────────────┐
│ 输入: (batch, seq_len, n_features) │
│ 过去30分钟的特征序列 │
├─────────────────────────────────────┤
│ LSTM Encoder │
│ - 双向 LSTM │
│ - 输出最后隐藏状态 │
├─────────────────────────────────────┤
│ Bottleneck (压缩层) │
│ 降维到 latent_dim关键
├─────────────────────────────────────┤
│ LSTM Decoder │
│ - 单向 LSTM │
│ - 重构序列 │
├─────────────────────────────────────┤
│ 输出: (batch, seq_len, n_features) │
│ 重构的特征序列 │
└─────────────────────────────────────┘
为什么用 LSTM 而不是 Transformer:
1. 参数更少,不容易过拟合
2. 对于 6 维特征足够用
3. 训练更稳定
4. 瓶颈约束更容易控制
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
class LSTMAutoencoder(nn.Module):
"""
LSTM Autoencoder for Anomaly Detection
设计原则:
- 足够简单,避免过拟合
- 瓶颈层严格限制,迫使模型只学习主要模式
- 异常难以通过狭窄瓶颈,重构误差大
"""
def __init__(
self,
n_features: int = 6,
hidden_dim: int = 32, # LSTM 隐藏维度(小!)
latent_dim: int = 4, # 瓶颈维度(非常小!关键参数)
num_layers: int = 1, # LSTM 层数
dropout: float = 0.2,
bidirectional: bool = True, # 双向编码器
):
super().__init__()
self.n_features = n_features
self.hidden_dim = hidden_dim
self.latent_dim = latent_dim
self.num_layers = num_layers
self.bidirectional = bidirectional
self.num_directions = 2 if bidirectional else 1
# Encoder: 双向 LSTM
self.encoder = nn.LSTM(
input_size=n_features,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=bidirectional
)
# Bottleneck: 压缩到极小的 latent space
encoder_output_dim = hidden_dim * self.num_directions
self.bottleneck_down = nn.Sequential(
nn.Linear(encoder_output_dim, latent_dim),
nn.Tanh(), # 限制范围,增加约束
)
self.bottleneck_up = nn.Sequential(
nn.Linear(latent_dim, hidden_dim),
nn.ReLU(),
)
# Decoder: 单向 LSTM
self.decoder = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False # 解码器用单向
)
# 输出层
self.output_layer = nn.Linear(hidden_dim, n_features)
# Dropout
self.dropout = nn.Dropout(dropout)
# 初始化
self._init_weights()
def _init_weights(self):
"""初始化权重"""
for name, param in self.named_parameters():
if 'weight_ih' in name:
nn.init.xavier_uniform_(param)
elif 'weight_hh' in name:
nn.init.orthogonal_(param)
elif 'bias' in name:
nn.init.zeros_(param)
def encode(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
编码器
Args:
x: (batch, seq_len, n_features)
Returns:
latent: (batch, seq_len, latent_dim) 每个时间步的压缩表示
encoder_outputs: (batch, seq_len, hidden_dim * num_directions)
"""
# LSTM 编码
encoder_outputs, (h_n, c_n) = self.encoder(x)
# encoder_outputs: (batch, seq_len, hidden_dim * num_directions)
encoder_outputs = self.dropout(encoder_outputs)
# 压缩到 latent space对每个时间步
latent = self.bottleneck_down(encoder_outputs)
# latent: (batch, seq_len, latent_dim)
return latent, encoder_outputs
def decode(self, latent: torch.Tensor, seq_len: int) -> torch.Tensor:
"""
解码器
Args:
latent: (batch, seq_len, latent_dim)
seq_len: 序列长度
Returns:
output: (batch, seq_len, n_features)
"""
# 从 latent space 恢复
decoder_input = self.bottleneck_up(latent)
# decoder_input: (batch, seq_len, hidden_dim)
# LSTM 解码
decoder_outputs, _ = self.decoder(decoder_input)
# decoder_outputs: (batch, seq_len, hidden_dim)
decoder_outputs = self.dropout(decoder_outputs)
# 投影到原始特征空间
output = self.output_layer(decoder_outputs)
# output: (batch, seq_len, n_features)
return output
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
前向传播
Args:
x: (batch, seq_len, n_features)
Returns:
output: (batch, seq_len, n_features) 重构结果
latent: (batch, seq_len, latent_dim) 隐向量
"""
batch_size, seq_len, _ = x.shape
# 编码
latent, _ = self.encode(x)
# 解码
output = self.decode(latent, seq_len)
return output, latent
def compute_reconstruction_error(
self,
x: torch.Tensor,
reduction: str = 'none'
) -> torch.Tensor:
"""
计算重构误差
Args:
x: (batch, seq_len, n_features)
reduction: 'none' | 'mean' | 'sum'
Returns:
error: 重构误差
"""
output, _ = self.forward(x)
# MSE per feature per timestep
error = F.mse_loss(output, x, reduction='none')
if reduction == 'none':
# (batch, seq_len, n_features) -> (batch, seq_len)
return error.mean(dim=-1)
elif reduction == 'mean':
return error.mean()
elif reduction == 'sum':
return error.sum()
else:
raise ValueError(f"Unknown reduction: {reduction}")
def detect_anomaly(
self,
x: torch.Tensor,
threshold: float = None,
return_scores: bool = True
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
检测异动
Args:
x: (batch, seq_len, n_features)
threshold: 异动阈值(如果为 None只返回分数
return_scores: 是否返回异动分数
Returns:
is_anomaly: (batch, seq_len) bool tensor (if threshold is not None)
scores: (batch, seq_len) 异动分数 (if return_scores)
"""
scores = self.compute_reconstruction_error(x, reduction='none')
is_anomaly = None
if threshold is not None:
is_anomaly = scores > threshold
if return_scores:
return is_anomaly, scores
else:
return is_anomaly, None
# 为了兼容性,创建别名
TransformerAutoencoder = LSTMAutoencoder
# ==================== 损失函数 ====================
class AnomalyDetectionLoss(nn.Module):
"""
异动检测损失函数
简单的 MSE 重构损失
"""
def __init__(
self,
feature_weights: torch.Tensor = None,
):
super().__init__()
self.feature_weights = feature_weights
def forward(
self,
output: torch.Tensor,
target: torch.Tensor,
latent: torch.Tensor = None
) -> Tuple[torch.Tensor, dict]:
"""
Args:
output: (batch, seq_len, n_features) 重构结果
target: (batch, seq_len, n_features) 原始输入
latent: (batch, seq_len, latent_dim) 隐向量(未使用)
Returns:
loss: 总损失
loss_dict: 各项损失详情
"""
# 重构损失 (MSE)
mse = F.mse_loss(output, target, reduction='none')
# 特征加权(可选)
if self.feature_weights is not None:
weights = self.feature_weights.to(mse.device)
mse = mse * weights
reconstruction_loss = mse.mean()
loss_dict = {
'total': reconstruction_loss.item(),
'reconstruction': reconstruction_loss.item(),
}
return reconstruction_loss, loss_dict
# ==================== 工具函数 ====================
def count_parameters(model: nn.Module) -> int:
"""统计模型参数量"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
def create_model(config: dict = None) -> LSTMAutoencoder:
"""
创建模型
默认使用小型 LSTM 配置,适合异动检测
"""
default_config = {
'n_features': 6,
'hidden_dim': 32, # 小!
'latent_dim': 4, # 非常小!关键
'num_layers': 1,
'dropout': 0.2,
'bidirectional': True,
}
if config:
# 兼容旧的 Transformer 配置键名
if 'd_model' in config:
config['hidden_dim'] = config.pop('d_model') // 2
if 'num_encoder_layers' in config:
config['num_layers'] = config.pop('num_encoder_layers')
if 'num_decoder_layers' in config:
config.pop('num_decoder_layers')
if 'nhead' in config:
config.pop('nhead')
if 'dim_feedforward' in config:
config.pop('dim_feedforward')
if 'max_seq_len' in config:
config.pop('max_seq_len')
if 'use_instance_norm' in config:
config.pop('use_instance_norm')
default_config.update(config)
model = LSTMAutoencoder(**default_config)
param_count = count_parameters(model)
print(f"模型参数量: {param_count:,}")
if param_count > 100000:
print(f"⚠️ 警告: 参数量较大({param_count:,}),可能过拟合")
else:
print(f"✓ 参数量适中LSTM Autoencoder")
return model
if __name__ == "__main__":
# 测试模型
print("测试 LSTM Autoencoder...")
# 创建模型
model = create_model()
# 测试输入
batch_size = 32
seq_len = 30
n_features = 6
x = torch.randn(batch_size, seq_len, n_features)
# 前向传播
output, latent = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐向量形状: {latent.shape}")
# 计算重构误差
error = model.compute_reconstruction_error(x)
print(f"重构误差形状: {error.shape}")
print(f"平均重构误差: {error.mean().item():.4f}")
# 测试异动检测
is_anomaly, scores = model.detect_anomaly(x, threshold=0.5)
print(f"异动检测结果形状: {is_anomaly.shape if is_anomaly is not None else 'None'}")
print(f"异动分数形状: {scores.shape}")
# 测试损失函数
criterion = AnomalyDetectionLoss()
loss, loss_dict = criterion(output, x, latent)
print(f"损失: {loss.item():.4f}")
print("\n测试通过!")