Files
vf_react/ml/detector.py
2025-12-09 16:27:56 +08:00

636 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
概念异动检测器 - 融合版
结合两种方法的优势:
1. 规则评分系统:可解释、稳定、覆盖已知模式
2. LSTM Autoencoder发现未知的异常模式
融合策略:
┌─────────────────────────────────────────────────────────┐
│ 输入特征 │
│ (alpha, alpha_delta, amt_ratio, amt_delta, rank_pct, │
│ limit_up_ratio) │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 规则评分系统 │ │ LSTM Autoencoder │ │
│ │ (0-100分) │ │ (重构误差) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ rule_score (0-100) ml_score (标准化后 0-100) │
│ │
├─────────────────────────────────────────────────────────┤
│ 融合策略 │
│ │
│ final_score = w1 * rule_score + w2 * ml_score │
│ │
│ 异动判定: │
│ - rule_score >= 60 → 直接触发(规则强信号) │
│ - ml_score >= 80 → 直接触发ML强信号
│ - final_score >= 50 → 融合触发 │
│ │
└─────────────────────────────────────────────────────────┘
优势:
- 规则系统保证已知模式的检出率
- ML模型捕捉规则未覆盖的异常
- 两者互相验证,减少误报
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import numpy as np
import torch
# 尝试导入模型(可能不存在)
try:
from model import LSTMAutoencoder, create_model
HAS_MODEL = True
except ImportError:
HAS_MODEL = False
@dataclass
class AnomalyResult:
"""异动检测结果"""
is_anomaly: bool
final_score: float # 最终得分 (0-100)
rule_score: float # 规则得分 (0-100)
ml_score: float # ML得分 (0-100)
trigger_reason: str # 触发原因
rule_details: Dict # 规则明细
anomaly_type: str # 异动类型: surge_up / surge_down / volume_spike / unknown
class RuleBasedScorer:
"""
基于规则的评分系统
设计原则:
- 每个规则独立打分
- 分数可叠加
- 阈值可配置
"""
# 默认规则配置
DEFAULT_RULES = {
# Alpha 相关(超额收益)
'alpha_strong': {
'condition': lambda r: abs(r.get('alpha', 0)) >= 3.0,
'score': 35,
'description': 'Alpha强信号(|α|≥3%)'
},
'alpha_medium': {
'condition': lambda r: 2.0 <= abs(r.get('alpha', 0)) < 3.0,
'score': 25,
'description': 'Alpha中等(2%≤|α|<3%)'
},
'alpha_weak': {
'condition': lambda r: 1.5 <= abs(r.get('alpha', 0)) < 2.0,
'score': 15,
'description': 'Alpha轻微(1.5%≤|α|<2%)'
},
# Alpha 变化率(加速度)
'alpha_delta_strong': {
'condition': lambda r: abs(r.get('alpha_delta', 0)) >= 1.0,
'score': 30,
'description': 'Alpha加速强(|Δα|≥1%)'
},
'alpha_delta_medium': {
'condition': lambda r: 0.5 <= abs(r.get('alpha_delta', 0)) < 1.0,
'score': 20,
'description': 'Alpha加速中(0.5%≤|Δα|<1%)'
},
# 成交额比率(放量)
'volume_spike_strong': {
'condition': lambda r: r.get('amt_ratio', 1) >= 5.0,
'score': 30,
'description': '极度放量(≥5倍)'
},
'volume_spike_medium': {
'condition': lambda r: 3.0 <= r.get('amt_ratio', 1) < 5.0,
'score': 20,
'description': '显著放量(3-5倍)'
},
'volume_spike_weak': {
'condition': lambda r: 2.0 <= r.get('amt_ratio', 1) < 3.0,
'score': 10,
'description': '轻微放量(2-3倍)'
},
# 成交额变化率
'amt_delta_strong': {
'condition': lambda r: abs(r.get('amt_delta', 0)) >= 1.0,
'score': 15,
'description': '成交额急变(|Δamt|≥100%)'
},
# 排名跳变
'rank_top': {
'condition': lambda r: r.get('rank_pct', 0.5) >= 0.95,
'score': 25,
'description': '排名前5%'
},
'rank_bottom': {
'condition': lambda r: r.get('rank_pct', 0.5) <= 0.05,
'score': 25,
'description': '排名后5%'
},
'rank_high': {
'condition': lambda r: 0.9 <= r.get('rank_pct', 0.5) < 0.95,
'score': 15,
'description': '排名前10%'
},
# 涨停比例
'limit_up_high': {
'condition': lambda r: r.get('limit_up_ratio', 0) >= 0.2,
'score': 25,
'description': '涨停比例≥20%'
},
'limit_up_medium': {
'condition': lambda r: 0.1 <= r.get('limit_up_ratio', 0) < 0.2,
'score': 15,
'description': '涨停比例10-20%'
},
# 组合条件(更可靠的信号)
'alpha_with_volume': {
'condition': lambda r: abs(r.get('alpha', 0)) >= 1.5 and r.get('amt_ratio', 1) >= 2.0,
'score': 20, # 额外加分
'description': 'Alpha+放量组合'
},
'acceleration_with_rank': {
'condition': lambda r: abs(r.get('alpha_delta', 0)) >= 0.5 and (r.get('rank_pct', 0.5) >= 0.9 or r.get('rank_pct', 0.5) <= 0.1),
'score': 15, # 额外加分
'description': '加速+排名异常组合'
},
}
def __init__(self, rules: Dict = None):
"""
初始化规则评分器
Args:
rules: 自定义规则,格式同 DEFAULT_RULES
"""
self.rules = rules or self.DEFAULT_RULES
def score(self, features: Dict) -> Tuple[float, Dict]:
"""
计算规则得分
Args:
features: 特征字典,包含 alpha, alpha_delta, amt_ratio 等
Returns:
score: 总分 (0-100)
details: 触发的规则明细
"""
total_score = 0
triggered_rules = {}
for rule_name, rule_config in self.rules.items():
try:
if rule_config['condition'](features):
total_score += rule_config['score']
triggered_rules[rule_name] = {
'score': rule_config['score'],
'description': rule_config['description']
}
except Exception:
# 忽略规则计算错误
pass
# 限制在 0-100
total_score = min(100, max(0, total_score))
return total_score, triggered_rules
def get_anomaly_type(self, features: Dict) -> str:
"""判断异动类型"""
alpha = features.get('alpha', 0)
amt_ratio = features.get('amt_ratio', 1)
if alpha >= 1.5:
return 'surge_up'
elif alpha <= -1.5:
return 'surge_down'
elif amt_ratio >= 3.0:
return 'volume_spike'
else:
return 'unknown'
class MLScorer:
"""
基于 LSTM Autoencoder 的评分器
将重构误差转换为 0-100 的分数
"""
def __init__(
self,
checkpoint_dir: str = 'ml/checkpoints',
device: str = 'auto'
):
self.checkpoint_dir = Path(checkpoint_dir)
# 设备检测
if device == 'auto':
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
elif device == 'cuda' and not torch.cuda.is_available():
print("警告: CUDA 不可用,使用 CPU")
self.device = torch.device('cpu')
else:
self.device = torch.device(device)
self.model = None
self.thresholds = None
self.config = None
# 尝试加载模型
self._load_model()
def _load_model(self):
"""加载模型和阈值"""
if not HAS_MODEL:
print("警告: 无法导入模型模块")
return
model_path = self.checkpoint_dir / 'best_model.pt'
thresholds_path = self.checkpoint_dir / 'thresholds.json'
config_path = self.checkpoint_dir / 'config.json'
if not model_path.exists():
print(f"警告: 模型文件不存在 {model_path}")
return
try:
# 加载配置
if config_path.exists():
with open(config_path, 'r') as f:
self.config = json.load(f)
# 先用 CPU 加载模型(避免 CUDA 不可用问题),再移动到目标设备
checkpoint = torch.load(model_path, map_location='cpu')
model_config = self.config.get('model', {}) if self.config else {}
self.model = create_model(model_config)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.to(self.device)
self.model.eval()
# 加载阈值
if thresholds_path.exists():
with open(thresholds_path, 'r') as f:
self.thresholds = json.load(f)
print(f"MLScorer 加载成功 (设备: {self.device})")
except Exception as e:
print(f"警告: 模型加载失败 - {e}")
import traceback
traceback.print_exc()
self.model = None
def is_ready(self) -> bool:
"""检查模型是否就绪"""
return self.model is not None
@torch.no_grad()
def score(self, sequence: np.ndarray) -> float:
"""
计算 ML 得分
Args:
sequence: (seq_len, n_features) 或 (batch, seq_len, n_features)
Returns:
score: 0-100 的分数,越高越异常
"""
if not self.is_ready():
return 0.0
# 确保是 3D
if sequence.ndim == 2:
sequence = sequence[np.newaxis, ...]
# 转为 tensor
x = torch.FloatTensor(sequence).to(self.device)
# 计算重构误差
output, _ = self.model(x)
mse = ((output - x) ** 2).mean(dim=-1) # (batch, seq_len)
# 取最后时刻的误差
error = mse[:, -1].cpu().numpy()
# 转换为 0-100 分数
# 使用 p95 阈值作为参考
if self.thresholds:
p95 = self.thresholds.get('p95', 0.1)
p99 = self.thresholds.get('p99', 0.2)
else:
p95, p99 = 0.1, 0.2
# 线性映射p95 -> 50分, p99 -> 80分
# error=0 -> 0分, error>=p99*1.5 -> 100分
score = np.clip(error / p95 * 50, 0, 100)
return float(score[0]) if len(score) == 1 else score.tolist()
class HybridAnomalyDetector:
"""
融合异动检测器
结合规则系统和 ML 模型
"""
# 默认配置
DEFAULT_CONFIG = {
# 权重配置
'rule_weight': 0.6, # 规则权重
'ml_weight': 0.4, # ML权重
# 触发阈值
'rule_trigger': 60, # 规则直接触发阈值
'ml_trigger': 80, # ML直接触发阈值
'fusion_trigger': 50, # 融合触发阈值
# 特征列表
'features': [
'alpha', 'alpha_delta', 'amt_ratio',
'amt_delta', 'rank_pct', 'limit_up_ratio'
],
# 序列长度ML模型需要
'seq_len': 30,
}
def __init__(
self,
config: Dict = None,
checkpoint_dir: str = 'ml/checkpoints',
device: str = 'auto'
):
self.config = {**self.DEFAULT_CONFIG, **(config or {})}
# 初始化评分器
self.rule_scorer = RuleBasedScorer()
self.ml_scorer = MLScorer(checkpoint_dir, device)
print(f"HybridAnomalyDetector 初始化完成")
print(f" 规则权重: {self.config['rule_weight']}")
print(f" ML权重: {self.config['ml_weight']}")
print(f" ML模型: {'就绪' if self.ml_scorer.is_ready() else '未加载'}")
def detect(
self,
features: Dict,
sequence: np.ndarray = None
) -> AnomalyResult:
"""
检测异动
Args:
features: 当前时刻的特征字典
sequence: 历史序列 (seq_len, n_features)ML模型需要
Returns:
AnomalyResult: 检测结果
"""
# 1. 规则评分
rule_score, rule_details = self.rule_scorer.score(features)
# 2. ML评分
ml_score = 0.0
if sequence is not None and self.ml_scorer.is_ready():
ml_score = self.ml_scorer.score(sequence)
# 3. 融合得分
w1 = self.config['rule_weight']
w2 = self.config['ml_weight']
# 如果ML不可用全部权重给规则
if not self.ml_scorer.is_ready():
w1, w2 = 1.0, 0.0
final_score = w1 * rule_score + w2 * ml_score
# 4. 判断是否异动
is_anomaly = False
trigger_reason = ''
if rule_score >= self.config['rule_trigger']:
is_anomaly = True
trigger_reason = f'规则强信号({rule_score:.0f}分)'
elif ml_score >= self.config['ml_trigger']:
is_anomaly = True
trigger_reason = f'ML强信号({ml_score:.0f}分)'
elif final_score >= self.config['fusion_trigger']:
is_anomaly = True
trigger_reason = f'融合触发({final_score:.0f}分)'
# 5. 判断异动类型
anomaly_type = self.rule_scorer.get_anomaly_type(features) if is_anomaly else ''
return AnomalyResult(
is_anomaly=is_anomaly,
final_score=final_score,
rule_score=rule_score,
ml_score=ml_score,
trigger_reason=trigger_reason,
rule_details=rule_details,
anomaly_type=anomaly_type
)
def detect_batch(
self,
features_list: List[Dict],
sequences: np.ndarray = None
) -> List[AnomalyResult]:
"""
批量检测
Args:
features_list: 特征字典列表
sequences: (batch, seq_len, n_features)
Returns:
List[AnomalyResult]
"""
results = []
for i, features in enumerate(features_list):
seq = sequences[i] if sequences is not None else None
result = self.detect(features, seq)
results.append(result)
return results
# ==================== 便捷函数 ====================
def create_detector(
checkpoint_dir: str = 'ml/checkpoints',
config: Dict = None
) -> HybridAnomalyDetector:
"""创建融合检测器"""
return HybridAnomalyDetector(config, checkpoint_dir)
def quick_detect(features: Dict) -> bool:
"""
快速检测只用规则不需要ML模型
适用于:
- 实时检测
- ML模型未训练完成时
"""
scorer = RuleBasedScorer()
score, _ = scorer.score(features)
return score >= 50
# ==================== 测试 ====================
if __name__ == "__main__":
print("=" * 60)
print("融合异动检测器测试")
print("=" * 60)
# 创建检测器
detector = create_detector()
# 测试用例
test_cases = [
{
'name': '正常情况',
'features': {
'alpha': 0.5,
'alpha_delta': 0.1,
'amt_ratio': 1.2,
'amt_delta': 0.1,
'rank_pct': 0.5,
'limit_up_ratio': 0.02
}
},
{
'name': 'Alpha异动',
'features': {
'alpha': 3.5,
'alpha_delta': 0.8,
'amt_ratio': 2.5,
'amt_delta': 0.5,
'rank_pct': 0.92,
'limit_up_ratio': 0.05
}
},
{
'name': '放量异动',
'features': {
'alpha': 1.2,
'alpha_delta': 0.3,
'amt_ratio': 6.0,
'amt_delta': 1.5,
'rank_pct': 0.85,
'limit_up_ratio': 0.08
}
},
{
'name': '涨停潮',
'features': {
'alpha': 2.5,
'alpha_delta': 0.6,
'amt_ratio': 3.5,
'amt_delta': 0.8,
'rank_pct': 0.98,
'limit_up_ratio': 0.25
}
},
]
print("\n" + "-" * 60)
print("测试1: 只用规则(无序列数据)")
print("-" * 60)
for case in test_cases:
result = detector.detect(case['features'])
print(f"\n{case['name']}:")
print(f" 异动: {'' if result.is_anomaly else ''}")
print(f" 最终得分: {result.final_score:.1f}")
print(f" 规则得分: {result.rule_score:.1f}")
print(f" ML得分: {result.ml_score:.1f}")
if result.is_anomaly:
print(f" 触发原因: {result.trigger_reason}")
print(f" 异动类型: {result.anomaly_type}")
print(f" 触发规则: {list(result.rule_details.keys())}")
# 测试2: 带序列数据的融合检测
print("\n" + "-" * 60)
print("测试2: 融合检测(规则 + ML")
print("-" * 60)
# 生成模拟序列数据
seq_len = 30
n_features = 6
# 正常序列:小幅波动
normal_sequence = np.random.randn(seq_len, n_features) * 0.3
normal_sequence[:, 0] = np.linspace(0, 0.5, seq_len) # alpha 缓慢上升
normal_sequence[:, 2] = np.abs(normal_sequence[:, 2]) + 1 # amt_ratio > 0
# 异常序列:最后几个时间步突然变化
anomaly_sequence = np.random.randn(seq_len, n_features) * 0.3
anomaly_sequence[-5:, 0] = np.linspace(1, 4, 5) # alpha 突然飙升
anomaly_sequence[-5:, 1] = np.linspace(0.2, 1.5, 5) # alpha_delta 加速
anomaly_sequence[-5:, 2] = np.linspace(2, 6, 5) # amt_ratio 放量
anomaly_sequence[:, 2] = np.abs(anomaly_sequence[:, 2]) + 1
# 测试正常序列
normal_features = {
'alpha': float(normal_sequence[-1, 0]),
'alpha_delta': float(normal_sequence[-1, 1]),
'amt_ratio': float(normal_sequence[-1, 2]),
'amt_delta': float(normal_sequence[-1, 3]),
'rank_pct': 0.5,
'limit_up_ratio': 0.02
}
result = detector.detect(normal_features, normal_sequence)
print(f"\n正常序列:")
print(f" 异动: {'' if result.is_anomaly else ''}")
print(f" 最终得分: {result.final_score:.1f}")
print(f" 规则得分: {result.rule_score:.1f}")
print(f" ML得分: {result.ml_score:.1f}")
# 测试异常序列
anomaly_features = {
'alpha': float(anomaly_sequence[-1, 0]),
'alpha_delta': float(anomaly_sequence[-1, 1]),
'amt_ratio': float(anomaly_sequence[-1, 2]),
'amt_delta': float(anomaly_sequence[-1, 3]),
'rank_pct': 0.95,
'limit_up_ratio': 0.15
}
result = detector.detect(anomaly_features, anomaly_sequence)
print(f"\n异常序列:")
print(f" 异动: {'' if result.is_anomaly else ''}")
print(f" 最终得分: {result.final_score:.1f}")
print(f" 规则得分: {result.rule_score:.1f}")
print(f" ML得分: {result.ml_score:.1f}")
if result.is_anomaly:
print(f" 触发原因: {result.trigger_reason}")
print(f" 异动类型: {result.anomaly_type}")
print("\n" + "=" * 60)
print("测试完成!")