#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 概念异动检测器 - 融合版 结合两种方法的优势: 1. 规则评分系统:可解释、稳定、覆盖已知模式 2. LSTM Autoencoder:发现未知的异常模式 融合策略: ┌─────────────────────────────────────────────────────────┐ │ 输入特征 │ │ (alpha, alpha_delta, amt_ratio, amt_delta, rank_pct, │ │ limit_up_ratio) │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 规则评分系统 │ │ LSTM Autoencoder │ │ │ │ (0-100分) │ │ (重构误差) │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ▼ ▼ │ │ rule_score (0-100) ml_score (标准化后 0-100) │ │ │ ├─────────────────────────────────────────────────────────┤ │ 融合策略 │ │ │ │ final_score = w1 * rule_score + w2 * ml_score │ │ │ │ 异动判定: │ │ - rule_score >= 60 → 直接触发(规则强信号) │ │ - ml_score >= 80 → 直接触发(ML强信号) │ │ - final_score >= 50 → 融合触发 │ │ │ └─────────────────────────────────────────────────────────┘ 优势: - 规则系统保证已知模式的检出率 - ML模型捕捉规则未覆盖的异常 - 两者互相验证,减少误报 """ import json from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass import numpy as np import torch # 尝试导入模型(可能不存在) try: from model import LSTMAutoencoder, create_model HAS_MODEL = True except ImportError: HAS_MODEL = False @dataclass class AnomalyResult: """异动检测结果""" is_anomaly: bool final_score: float # 最终得分 (0-100) rule_score: float # 规则得分 (0-100) ml_score: float # ML得分 (0-100) trigger_reason: str # 触发原因 rule_details: Dict # 规则明细 anomaly_type: str # 异动类型: surge_up / surge_down / volume_spike / unknown class RuleBasedScorer: """ 基于规则的评分系统 设计原则: - 每个规则独立打分 - 分数可叠加 - 阈值可配置 """ # 默认规则配置 DEFAULT_RULES = { # Alpha 相关(超额收益) 'alpha_strong': { 'condition': lambda r: abs(r.get('alpha', 0)) >= 3.0, 'score': 35, 'description': 'Alpha强信号(|α|≥3%)' }, 'alpha_medium': { 'condition': lambda r: 2.0 <= abs(r.get('alpha', 0)) < 3.0, 'score': 25, 'description': 'Alpha中等(2%≤|α|<3%)' }, 'alpha_weak': { 'condition': lambda r: 1.5 <= abs(r.get('alpha', 0)) < 2.0, 'score': 15, 'description': 'Alpha轻微(1.5%≤|α|<2%)' }, # Alpha 变化率(加速度) 'alpha_delta_strong': { 'condition': lambda r: abs(r.get('alpha_delta', 0)) >= 1.0, 'score': 30, 'description': 'Alpha加速强(|Δα|≥1%)' }, 'alpha_delta_medium': { 'condition': lambda r: 0.5 <= abs(r.get('alpha_delta', 0)) < 1.0, 'score': 20, 'description': 'Alpha加速中(0.5%≤|Δα|<1%)' }, # 成交额比率(放量) 'volume_spike_strong': { 'condition': lambda r: r.get('amt_ratio', 1) >= 5.0, 'score': 30, 'description': '极度放量(≥5倍)' }, 'volume_spike_medium': { 'condition': lambda r: 3.0 <= r.get('amt_ratio', 1) < 5.0, 'score': 20, 'description': '显著放量(3-5倍)' }, 'volume_spike_weak': { 'condition': lambda r: 2.0 <= r.get('amt_ratio', 1) < 3.0, 'score': 10, 'description': '轻微放量(2-3倍)' }, # 成交额变化率 'amt_delta_strong': { 'condition': lambda r: abs(r.get('amt_delta', 0)) >= 1.0, 'score': 15, 'description': '成交额急变(|Δamt|≥100%)' }, # 排名跳变 'rank_top': { 'condition': lambda r: r.get('rank_pct', 0.5) >= 0.95, 'score': 25, 'description': '排名前5%' }, 'rank_bottom': { 'condition': lambda r: r.get('rank_pct', 0.5) <= 0.05, 'score': 25, 'description': '排名后5%' }, 'rank_high': { 'condition': lambda r: 0.9 <= r.get('rank_pct', 0.5) < 0.95, 'score': 15, 'description': '排名前10%' }, # 涨停比例 'limit_up_high': { 'condition': lambda r: r.get('limit_up_ratio', 0) >= 0.2, 'score': 25, 'description': '涨停比例≥20%' }, 'limit_up_medium': { 'condition': lambda r: 0.1 <= r.get('limit_up_ratio', 0) < 0.2, 'score': 15, 'description': '涨停比例10-20%' }, # 组合条件(更可靠的信号) 'alpha_with_volume': { 'condition': lambda r: abs(r.get('alpha', 0)) >= 1.5 and r.get('amt_ratio', 1) >= 2.0, 'score': 20, # 额外加分 'description': 'Alpha+放量组合' }, 'acceleration_with_rank': { 'condition': lambda r: abs(r.get('alpha_delta', 0)) >= 0.5 and (r.get('rank_pct', 0.5) >= 0.9 or r.get('rank_pct', 0.5) <= 0.1), 'score': 15, # 额外加分 'description': '加速+排名异常组合' }, } def __init__(self, rules: Dict = None): """ 初始化规则评分器 Args: rules: 自定义规则,格式同 DEFAULT_RULES """ self.rules = rules or self.DEFAULT_RULES def score(self, features: Dict) -> Tuple[float, Dict]: """ 计算规则得分 Args: features: 特征字典,包含 alpha, alpha_delta, amt_ratio 等 Returns: score: 总分 (0-100) details: 触发的规则明细 """ total_score = 0 triggered_rules = {} for rule_name, rule_config in self.rules.items(): try: if rule_config['condition'](features): total_score += rule_config['score'] triggered_rules[rule_name] = { 'score': rule_config['score'], 'description': rule_config['description'] } except Exception: # 忽略规则计算错误 pass # 限制在 0-100 total_score = min(100, max(0, total_score)) return total_score, triggered_rules def get_anomaly_type(self, features: Dict) -> str: """判断异动类型""" alpha = features.get('alpha', 0) amt_ratio = features.get('amt_ratio', 1) if alpha >= 1.5: return 'surge_up' elif alpha <= -1.5: return 'surge_down' elif amt_ratio >= 3.0: return 'volume_spike' else: return 'unknown' class MLScorer: """ 基于 LSTM Autoencoder 的评分器 将重构误差转换为 0-100 的分数 """ def __init__( self, checkpoint_dir: str = 'ml/checkpoints', device: str = 'auto' ): self.checkpoint_dir = Path(checkpoint_dir) # 设备检测 if device == 'auto': self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') elif device == 'cuda' and not torch.cuda.is_available(): print("警告: CUDA 不可用,使用 CPU") self.device = torch.device('cpu') else: self.device = torch.device(device) self.model = None self.thresholds = None self.config = None # 尝试加载模型 self._load_model() def _load_model(self): """加载模型和阈值""" if not HAS_MODEL: print("警告: 无法导入模型模块") return model_path = self.checkpoint_dir / 'best_model.pt' thresholds_path = self.checkpoint_dir / 'thresholds.json' config_path = self.checkpoint_dir / 'config.json' if not model_path.exists(): print(f"警告: 模型文件不存在 {model_path}") return try: # 加载配置 if config_path.exists(): with open(config_path, 'r') as f: self.config = json.load(f) # 先用 CPU 加载模型(避免 CUDA 不可用问题),再移动到目标设备 checkpoint = torch.load(model_path, map_location='cpu') model_config = self.config.get('model', {}) if self.config else {} self.model = create_model(model_config) self.model.load_state_dict(checkpoint['model_state_dict']) self.model.to(self.device) self.model.eval() # 加载阈值 if thresholds_path.exists(): with open(thresholds_path, 'r') as f: self.thresholds = json.load(f) print(f"MLScorer 加载成功 (设备: {self.device})") except Exception as e: print(f"警告: 模型加载失败 - {e}") import traceback traceback.print_exc() self.model = None def is_ready(self) -> bool: """检查模型是否就绪""" return self.model is not None @torch.no_grad() def score(self, sequence: np.ndarray) -> float: """ 计算 ML 得分 Args: sequence: (seq_len, n_features) 或 (batch, seq_len, n_features) Returns: score: 0-100 的分数,越高越异常 """ if not self.is_ready(): return 0.0 # 确保是 3D if sequence.ndim == 2: sequence = sequence[np.newaxis, ...] # 转为 tensor x = torch.FloatTensor(sequence).to(self.device) # 计算重构误差 output, _ = self.model(x) mse = ((output - x) ** 2).mean(dim=-1) # (batch, seq_len) # 取最后时刻的误差 error = mse[:, -1].cpu().numpy() # 转换为 0-100 分数 # 使用 p95 阈值作为参考 if self.thresholds: p95 = self.thresholds.get('p95', 0.1) p99 = self.thresholds.get('p99', 0.2) else: p95, p99 = 0.1, 0.2 # 线性映射:p95 -> 50分, p99 -> 80分 # error=0 -> 0分, error>=p99*1.5 -> 100分 score = np.clip(error / p95 * 50, 0, 100) return float(score[0]) if len(score) == 1 else score.tolist() class HybridAnomalyDetector: """ 融合异动检测器 结合规则系统和 ML 模型 """ # 默认配置 DEFAULT_CONFIG = { # 权重配置 'rule_weight': 0.6, # 规则权重 'ml_weight': 0.4, # ML权重 # 触发阈值 'rule_trigger': 60, # 规则直接触发阈值 'ml_trigger': 80, # ML直接触发阈值 'fusion_trigger': 50, # 融合触发阈值 # 特征列表 'features': [ 'alpha', 'alpha_delta', 'amt_ratio', 'amt_delta', 'rank_pct', 'limit_up_ratio' ], # 序列长度(ML模型需要) 'seq_len': 30, } def __init__( self, config: Dict = None, checkpoint_dir: str = 'ml/checkpoints', device: str = 'auto' ): self.config = {**self.DEFAULT_CONFIG, **(config or {})} # 初始化评分器 self.rule_scorer = RuleBasedScorer() self.ml_scorer = MLScorer(checkpoint_dir, device) print(f"HybridAnomalyDetector 初始化完成") print(f" 规则权重: {self.config['rule_weight']}") print(f" ML权重: {self.config['ml_weight']}") print(f" ML模型: {'就绪' if self.ml_scorer.is_ready() else '未加载'}") def detect( self, features: Dict, sequence: np.ndarray = None ) -> AnomalyResult: """ 检测异动 Args: features: 当前时刻的特征字典 sequence: 历史序列 (seq_len, n_features),ML模型需要 Returns: AnomalyResult: 检测结果 """ # 1. 规则评分 rule_score, rule_details = self.rule_scorer.score(features) # 2. ML评分 ml_score = 0.0 if sequence is not None and self.ml_scorer.is_ready(): ml_score = self.ml_scorer.score(sequence) # 3. 融合得分 w1 = self.config['rule_weight'] w2 = self.config['ml_weight'] # 如果ML不可用,全部权重给规则 if not self.ml_scorer.is_ready(): w1, w2 = 1.0, 0.0 final_score = w1 * rule_score + w2 * ml_score # 4. 判断是否异动 is_anomaly = False trigger_reason = '' if rule_score >= self.config['rule_trigger']: is_anomaly = True trigger_reason = f'规则强信号({rule_score:.0f}分)' elif ml_score >= self.config['ml_trigger']: is_anomaly = True trigger_reason = f'ML强信号({ml_score:.0f}分)' elif final_score >= self.config['fusion_trigger']: is_anomaly = True trigger_reason = f'融合触发({final_score:.0f}分)' # 5. 判断异动类型 anomaly_type = self.rule_scorer.get_anomaly_type(features) if is_anomaly else '' return AnomalyResult( is_anomaly=is_anomaly, final_score=final_score, rule_score=rule_score, ml_score=ml_score, trigger_reason=trigger_reason, rule_details=rule_details, anomaly_type=anomaly_type ) def detect_batch( self, features_list: List[Dict], sequences: np.ndarray = None ) -> List[AnomalyResult]: """ 批量检测 Args: features_list: 特征字典列表 sequences: (batch, seq_len, n_features) Returns: List[AnomalyResult] """ results = [] for i, features in enumerate(features_list): seq = sequences[i] if sequences is not None else None result = self.detect(features, seq) results.append(result) return results # ==================== 便捷函数 ==================== def create_detector( checkpoint_dir: str = 'ml/checkpoints', config: Dict = None ) -> HybridAnomalyDetector: """创建融合检测器""" return HybridAnomalyDetector(config, checkpoint_dir) def quick_detect(features: Dict) -> bool: """ 快速检测(只用规则,不需要ML模型) 适用于: - 实时检测 - ML模型未训练完成时 """ scorer = RuleBasedScorer() score, _ = scorer.score(features) return score >= 50 # ==================== 测试 ==================== if __name__ == "__main__": print("=" * 60) print("融合异动检测器测试") print("=" * 60) # 创建检测器 detector = create_detector() # 测试用例 test_cases = [ { 'name': '正常情况', 'features': { 'alpha': 0.5, 'alpha_delta': 0.1, 'amt_ratio': 1.2, 'amt_delta': 0.1, 'rank_pct': 0.5, 'limit_up_ratio': 0.02 } }, { 'name': 'Alpha异动', 'features': { 'alpha': 3.5, 'alpha_delta': 0.8, 'amt_ratio': 2.5, 'amt_delta': 0.5, 'rank_pct': 0.92, 'limit_up_ratio': 0.05 } }, { 'name': '放量异动', 'features': { 'alpha': 1.2, 'alpha_delta': 0.3, 'amt_ratio': 6.0, 'amt_delta': 1.5, 'rank_pct': 0.85, 'limit_up_ratio': 0.08 } }, { 'name': '涨停潮', 'features': { 'alpha': 2.5, 'alpha_delta': 0.6, 'amt_ratio': 3.5, 'amt_delta': 0.8, 'rank_pct': 0.98, 'limit_up_ratio': 0.25 } }, ] print("\n" + "-" * 60) print("测试1: 只用规则(无序列数据)") print("-" * 60) for case in test_cases: result = detector.detect(case['features']) print(f"\n{case['name']}:") print(f" 异动: {'是' if result.is_anomaly else '否'}") print(f" 最终得分: {result.final_score:.1f}") print(f" 规则得分: {result.rule_score:.1f}") print(f" ML得分: {result.ml_score:.1f}") if result.is_anomaly: print(f" 触发原因: {result.trigger_reason}") print(f" 异动类型: {result.anomaly_type}") print(f" 触发规则: {list(result.rule_details.keys())}") # 测试2: 带序列数据的融合检测 print("\n" + "-" * 60) print("测试2: 融合检测(规则 + ML)") print("-" * 60) # 生成模拟序列数据 seq_len = 30 n_features = 6 # 正常序列:小幅波动 normal_sequence = np.random.randn(seq_len, n_features) * 0.3 normal_sequence[:, 0] = np.linspace(0, 0.5, seq_len) # alpha 缓慢上升 normal_sequence[:, 2] = np.abs(normal_sequence[:, 2]) + 1 # amt_ratio > 0 # 异常序列:最后几个时间步突然变化 anomaly_sequence = np.random.randn(seq_len, n_features) * 0.3 anomaly_sequence[-5:, 0] = np.linspace(1, 4, 5) # alpha 突然飙升 anomaly_sequence[-5:, 1] = np.linspace(0.2, 1.5, 5) # alpha_delta 加速 anomaly_sequence[-5:, 2] = np.linspace(2, 6, 5) # amt_ratio 放量 anomaly_sequence[:, 2] = np.abs(anomaly_sequence[:, 2]) + 1 # 测试正常序列 normal_features = { 'alpha': float(normal_sequence[-1, 0]), 'alpha_delta': float(normal_sequence[-1, 1]), 'amt_ratio': float(normal_sequence[-1, 2]), 'amt_delta': float(normal_sequence[-1, 3]), 'rank_pct': 0.5, 'limit_up_ratio': 0.02 } result = detector.detect(normal_features, normal_sequence) print(f"\n正常序列:") print(f" 异动: {'是' if result.is_anomaly else '否'}") print(f" 最终得分: {result.final_score:.1f}") print(f" 规则得分: {result.rule_score:.1f}") print(f" ML得分: {result.ml_score:.1f}") # 测试异常序列 anomaly_features = { 'alpha': float(anomaly_sequence[-1, 0]), 'alpha_delta': float(anomaly_sequence[-1, 1]), 'amt_ratio': float(anomaly_sequence[-1, 2]), 'amt_delta': float(anomaly_sequence[-1, 3]), 'rank_pct': 0.95, 'limit_up_ratio': 0.15 } result = detector.detect(anomaly_features, anomaly_sequence) print(f"\n异常序列:") print(f" 异动: {'是' if result.is_anomaly else '否'}") print(f" 最终得分: {result.final_score:.1f}") print(f" 规则得分: {result.rule_score:.1f}") print(f" ML得分: {result.ml_score:.1f}") if result.is_anomaly: print(f" 触发原因: {result.trigger_reason}") print(f" 异动类型: {result.anomaly_type}") print("\n" + "=" * 60) print("测试完成!")