""" MCP Quant - 量化因子计算模块 基于日线(MySQL ea_trade)和分钟频(ClickHouse stock_minute)数据计算技术指标和量化因子 数据源: - MySQL ea_trade: OHLCV, 换手率, 涨跌幅, PE等日线数据 - ClickHouse stock_minute: 分钟级 OHLCV 数据 设计原则: 1. 返回"状态"而非原始数值 - 便于大模型理解 2. 支持批量计算 - 提高效率 3. 错误优雅降级 - 数据不足时返回 None 而非报错 """ import numpy as np import pandas as pd from typing import Dict, List, Any, Optional, Tuple, Literal from datetime import datetime, timedelta from dataclasses import dataclass from enum import Enum import logging import asyncio # 导入数据库模块 import mcp_database as db logger = logging.getLogger(__name__) # ==================== 信号枚举定义 ==================== class TrendSignal(str, Enum): """趋势信号""" STRONG_BULLISH = "强势上涨" BULLISH = "上涨" NEUTRAL = "震荡" BEARISH = "下跌" STRONG_BEARISH = "强势下跌" class MACDSignal(str, Enum): """MACD信号""" GOLDEN_CROSS = "金叉" DEATH_CROSS = "死叉" BULLISH_DIVERGENCE = "底背离" BEARISH_DIVERGENCE = "顶背离" MOMENTUM_INCREASING = "动能增强" MOMENTUM_DECREASING = "动能减弱" NEUTRAL = "中性" class OscillatorZone(str, Enum): """超买超卖区域""" OVERBOUGHT = "超买" OVERSOLD = "超卖" NEUTRAL = "中性" class BollingerSignal(str, Enum): """布林带信号""" ABOVE_UPPER = "触及上轨压力" BELOW_LOWER = "触及下轨支撑" ABOVE_MIDDLE = "中轨之上强势" BELOW_MIDDLE = "中轨之下弱势" SQUEEZE = "布林带收窄变盘在即" EXPANSION = "布林带扩张趋势加速" class VolumeSignal(str, Enum): """量能信号""" VOLUME_SURGE = "放量" VOLUME_SHRINK = "缩量" VOLUME_PRICE_DIVERGENCE = "量价背离" ACCUMULATION = "主力吸筹" DISTRIBUTION = "主力出货" NORMAL = "正常" class BreakoutSignal(str, Enum): """突破信号""" NEW_HIGH_BREAKOUT = "突破新高" NEW_LOW_BREAKDOWN = "跌破新低" RESISTANCE_TEST = "测试压力位" SUPPORT_TEST = "测试支撑位" NO_SIGNAL = "无信号" # ==================== 数据类定义 ==================== @dataclass class MACDResult: """MACD计算结果""" signal: MACDSignal dif: float dea: float macd_bar: float trend_strength: str # 弱/中/强 description: str @dataclass class OscillatorResult: """超买超卖指标结果""" zone: OscillatorZone rsi_value: float kdj_k: float kdj_d: float kdj_j: float description: str @dataclass class BollingerResult: """布林带分析结果""" signal: BollingerSignal upper: float middle: float lower: float bandwidth: float # 带宽百分比 position: str # 价格在通道中的位置描述 description: str @dataclass class ATRResult: """ATR计算结果""" atr: float atr_percent: float # ATR占价格的百分比 suggested_stop_loss: float # 建议止损价 suggested_stop_loss_pct: float # 建议止损幅度 volatility_level: str # 低/中/高 description: str @dataclass class VolumeAnalysisResult: """成交量分析结果""" signal: VolumeSignal turnover_rate: float volume_ratio: float # 相对于MA5的量比 obv_trend: str # OBV趋势 heat_level: str # 冷门/正常/活跃/火热/极热 description: str @dataclass class BreakoutResult: """突破信号结果""" signal: BreakoutSignal high_20d: float low_20d: float high_60d: float low_60d: float distance_to_high: float # 距离20日高点的百分比 distance_to_low: float # 距离20日低点的百分比 description: str @dataclass class RiskMetricsResult: """风险指标结果""" max_drawdown: float # 最大回撤 max_drawdown_period: str # 最大回撤发生期间 sharpe_ratio: float # 夏普比率 volatility: float # 波动率 risk_level: str # 低风险/中风险/高风险 description: str @dataclass class ValuationResult: """估值分析结果""" pe_current: float pe_percentile: float # PE历史百分位 pb_current: Optional[float] peg_ratio: Optional[float] valuation_level: str # 低估/合理/高估 description: str # ==================== 核心计算函数 ==================== def _calc_ema(data: np.ndarray, period: int) -> np.ndarray: """计算EMA指数移动平均""" ema = np.zeros_like(data) ema[0] = data[0] multiplier = 2 / (period + 1) for i in range(1, len(data)): ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1] return ema def _calc_sma(data: np.ndarray, period: int) -> np.ndarray: """计算SMA简单移动平均""" return pd.Series(data).rolling(window=period, min_periods=1).mean().values def _calc_std(data: np.ndarray, period: int) -> np.ndarray: """计算滚动标准差""" return pd.Series(data).rolling(window=period, min_periods=1).std().values def _calc_rsi(close: np.ndarray, period: int = 14) -> np.ndarray: """计算RSI相对强弱指标""" delta = np.diff(close) gain = np.where(delta > 0, delta, 0) loss = np.where(delta < 0, -delta, 0) avg_gain = pd.Series(gain).rolling(window=period, min_periods=1).mean().values avg_loss = pd.Series(loss).rolling(window=period, min_periods=1).mean().values rs = np.where(avg_loss != 0, avg_gain / avg_loss, 100) rsi = 100 - (100 / (1 + rs)) # 补齐第一个值 return np.insert(rsi, 0, 50) def _calc_kdj(high: np.ndarray, low: np.ndarray, close: np.ndarray, n: int = 9, m1: int = 3, m2: int = 3) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """计算KDJ指标""" length = len(close) rsv = np.zeros(length) k = np.zeros(length) d = np.zeros(length) j = np.zeros(length) for i in range(length): start = max(0, i - n + 1) high_n = np.max(high[start:i+1]) low_n = np.min(low[start:i+1]) if high_n != low_n: rsv[i] = (close[i] - low_n) / (high_n - low_n) * 100 else: rsv[i] = 50 if i == 0: k[i] = 50 d[i] = 50 else: k[i] = (m1 - 1) / m1 * k[i-1] + 1 / m1 * rsv[i] d[i] = (m2 - 1) / m2 * d[i-1] + 1 / m2 * k[i] j[i] = 3 * k[i] - 2 * d[i] return k, d, j def _calc_macd(close: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """计算MACD指标""" ema_fast = _calc_ema(close, fast) ema_slow = _calc_ema(close, slow) dif = ema_fast - ema_slow dea = _calc_ema(dif, signal) macd_bar = 2 * (dif - dea) return dif, dea, macd_bar def _calc_bollinger(close: np.ndarray, period: int = 20, std_dev: float = 2) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """计算布林带""" middle = _calc_sma(close, period) std = _calc_std(close, period) upper = middle + std_dev * std lower = middle - std_dev * std return upper, middle, lower def _calc_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> np.ndarray: """计算ATR真实波幅""" tr = np.zeros(len(close)) tr[0] = high[0] - low[0] for i in range(1, len(close)): tr[i] = max( high[i] - low[i], abs(high[i] - close[i-1]), abs(low[i] - close[i-1]) ) atr = _calc_sma(tr, period) return atr def _calc_obv(close: np.ndarray, volume: np.ndarray) -> np.ndarray: """计算OBV能量潮""" obv = np.zeros(len(close)) obv[0] = volume[0] for i in range(1, len(close)): if close[i] > close[i-1]: obv[i] = obv[i-1] + volume[i] elif close[i] < close[i-1]: obv[i] = obv[i-1] - volume[i] else: obv[i] = obv[i-1] return obv def _detect_divergence(price: np.ndarray, indicator: np.ndarray, lookback: int = 20) -> Optional[str]: """检测背离""" if len(price) < lookback: return None recent_price = price[-lookback:] recent_indicator = indicator[-lookback:] # 找到价格的高点和低点 price_high_idx = np.argmax(recent_price) price_low_idx = np.argmin(recent_price) # 检查顶背离:价格创新高但指标没创新高 if price_high_idx > lookback // 2: # 高点在后半段 prev_high = np.max(recent_price[:lookback//2]) if recent_price[price_high_idx] > prev_high: prev_indicator_high = np.max(recent_indicator[:lookback//2]) if recent_indicator[price_high_idx] < prev_indicator_high: return "bearish_divergence" # 检查底背离:价格创新低但指标没创新低 if price_low_idx > lookback // 2: # 低点在后半段 prev_low = np.min(recent_price[:lookback//2]) if recent_price[price_low_idx] < prev_low: prev_indicator_low = np.min(recent_indicator[:lookback//2]) if recent_indicator[price_low_idx] > prev_indicator_low: return "bullish_divergence" return None # ==================== MCP 工具函数 ==================== async def get_macd_signal(code: str, days: int = 60) -> Dict[str, Any]: """ 获取MACD趋势判定信号 Args: code: 股票代码 days: 分析天数,默认60天 Returns: MACD信号分析结果 """ try: # 获取日线数据 trade_data = await db.get_stock_trade_data(code, limit=days + 30) if len(trade_data) < 35: return {"success": False, "error": "数据不足,需要至少35个交易日"} # 按日期排序(从旧到新) trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算MACD dif, dea, macd_bar = _calc_macd(close) # 获取最新值 current_dif = dif[-1] current_dea = dea[-1] current_bar = macd_bar[-1] prev_bar = macd_bar[-2] if len(macd_bar) > 1 else 0 # 判断信号 signal = MACDSignal.NEUTRAL # 金叉/死叉判断 if len(dif) >= 2: if dif[-1] > dea[-1] and dif[-2] <= dea[-2]: signal = MACDSignal.GOLDEN_CROSS elif dif[-1] < dea[-1] and dif[-2] >= dea[-2]: signal = MACDSignal.DEATH_CROSS # 动能判断 if signal == MACDSignal.NEUTRAL: if current_bar > 0 and current_bar > prev_bar: signal = MACDSignal.MOMENTUM_INCREASING elif current_bar < 0 and current_bar < prev_bar: signal = MACDSignal.MOMENTUM_DECREASING # 背离检测 divergence = _detect_divergence(close, dif, lookback=20) if divergence == "bullish_divergence": signal = MACDSignal.BULLISH_DIVERGENCE elif divergence == "bearish_divergence": signal = MACDSignal.BEARISH_DIVERGENCE # 趋势强度 bar_abs = abs(current_bar) if bar_abs < 0.5: trend_strength = "弱" elif bar_abs < 1.5: trend_strength = "中" else: trend_strength = "强" # 生成描述 descriptions = { MACDSignal.GOLDEN_CROSS: f"MACD金叉形成,DIF上穿DEA,短期看涨信号,动能{trend_strength}", MACDSignal.DEATH_CROSS: f"MACD死叉形成,DIF下穿DEA,短期看跌信号,动能{trend_strength}", MACDSignal.BULLISH_DIVERGENCE: "出现底背离,股价创新低但MACD没创新低,可能反转向上", MACDSignal.BEARISH_DIVERGENCE: "出现顶背离,股价创新高但MACD没创新高,上涨动能衰竭", MACDSignal.MOMENTUM_INCREASING: f"红柱放大,上涨动能增强,趋势强度:{trend_strength}", MACDSignal.MOMENTUM_DECREASING: f"绿柱放大,下跌动能增强,趋势强度:{trend_strength}", MACDSignal.NEUTRAL: "MACD处于中性状态,无明显信号", } return { "success": True, "data": { "signal": signal.value, "dif": round(current_dif, 4), "dea": round(current_dea, 4), "macd_bar": round(current_bar, 4), "trend_strength": trend_strength, "description": descriptions[signal], "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[MACD] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def check_oscillator_status(code: str, days: int = 60) -> Dict[str, Any]: """ 检查KDJ/RSI超买超卖状态 Args: code: 股票代码 days: 分析天数 Returns: 超买超卖状态分析 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) high = np.array([float(d['high_price']) for d in trade_data]) low = np.array([float(d['low_price']) for d in trade_data]) close = np.array([float(d['close_price']) for d in trade_data]) # 计算RSI rsi = _calc_rsi(close, 14) current_rsi = rsi[-1] # 计算KDJ k, d, j = _calc_kdj(high, low, close) current_k = k[-1] current_d = d[-1] current_j = j[-1] # 判断区域 zone = OscillatorZone.NEUTRAL # RSI超买超卖判断 if current_rsi > 80 or current_j > 100: zone = OscillatorZone.OVERBOUGHT elif current_rsi < 20 or current_j < 0: zone = OscillatorZone.OVERSOLD # 生成描述 if zone == OscillatorZone.OVERBOUGHT: desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超买区域,短期回调风险较大" elif zone == OscillatorZone.OVERSOLD: desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超卖区域,可能存在反弹机会" else: desc = f"RSI={current_rsi:.1f},KDJ(K={current_k:.1f},D={current_d:.1f},J={current_j:.1f}),处于中性区域" return { "success": True, "data": { "zone": zone.value, "rsi_value": round(current_rsi, 2), "kdj_k": round(current_k, 2), "kdj_d": round(current_d, 2), "kdj_j": round(current_j, 2), "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[Oscillator] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def analyze_bollinger_bands(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]: """ 分析布林带通道 Args: code: 股票代码 days: 分析天数 period: 布林带周期,默认20 Returns: 布林带分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < period + 5: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算布林带 upper, middle, lower = _calc_bollinger(close, period) current_price = close[-1] current_upper = upper[-1] current_middle = middle[-1] current_lower = lower[-1] # 计算带宽 bandwidth = (current_upper - current_lower) / current_middle * 100 prev_bandwidth = (upper[-5] - lower[-5]) / middle[-5] * 100 if len(upper) > 5 else bandwidth # 判断信号 signal = BollingerSignal.ABOVE_MIDDLE if current_price > current_middle else BollingerSignal.BELOW_MIDDLE # 检查是否触及上下轨 if current_price >= current_upper * 0.99: signal = BollingerSignal.ABOVE_UPPER elif current_price <= current_lower * 1.01: signal = BollingerSignal.BELOW_LOWER # 检查布林带收窄/扩张 if bandwidth < prev_bandwidth * 0.7: signal = BollingerSignal.SQUEEZE elif bandwidth > prev_bandwidth * 1.3: signal = BollingerSignal.EXPANSION # 价格位置描述 position_pct = (current_price - current_lower) / (current_upper - current_lower) * 100 position = f"价格位于布林带{position_pct:.0f}%位置" # 生成描述 descriptions = { BollingerSignal.ABOVE_UPPER: f"股价触及布林带上轨({current_upper:.2f}),面临短期压力", BollingerSignal.BELOW_LOWER: f"股价触及布林带下轨({current_lower:.2f}),可能存在支撑", BollingerSignal.ABOVE_MIDDLE: f"股价在中轨({current_middle:.2f})之上运行,整体偏强", BollingerSignal.BELOW_MIDDLE: f"股价在中轨({current_middle:.2f})之下运行,整体偏弱", BollingerSignal.SQUEEZE: f"布林带收窄(带宽{bandwidth:.1f}%),变盘在即,关注突破方向", BollingerSignal.EXPANSION: f"布林带扩张(带宽{bandwidth:.1f}%),趋势加速中", } return { "success": True, "data": { "signal": signal.value, "upper": round(current_upper, 2), "middle": round(current_middle, 2), "lower": round(current_lower, 2), "bandwidth": round(bandwidth, 2), "position": position, "description": descriptions[signal], "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[Bollinger] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_stop_loss_atr(code: str, days: int = 30, atr_multiplier: float = 2.0) -> Dict[str, Any]: """ 使用ATR计算止损位 Args: code: 股票代码 days: 分析天数 atr_multiplier: ATR倍数,默认2倍 Returns: ATR止损建议 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 15: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) high = np.array([float(d['high_price']) for d in trade_data]) low = np.array([float(d['low_price']) for d in trade_data]) close = np.array([float(d['close_price']) for d in trade_data]) # 计算ATR atr = _calc_atr(high, low, close, 14) current_atr = atr[-1] current_price = close[-1] # 计算ATR百分比 atr_percent = current_atr / current_price * 100 # 计算止损价 stop_loss = current_price - atr_multiplier * current_atr stop_loss_pct = (current_price - stop_loss) / current_price * 100 # 波动级别 if atr_percent < 2: volatility_level = "低" elif atr_percent < 4: volatility_level = "中" else: volatility_level = "高" desc = (f"ATR={current_atr:.2f}(占价格{atr_percent:.1f}%),波动性{volatility_level}。" f"建议止损位:{stop_loss:.2f}({atr_multiplier}倍ATR),止损幅度{stop_loss_pct:.1f}%") return { "success": True, "data": { "atr": round(current_atr, 3), "atr_percent": round(atr_percent, 2), "suggested_stop_loss": round(stop_loss, 2), "suggested_stop_loss_pct": round(stop_loss_pct, 2), "volatility_level": volatility_level, "current_price": round(current_price, 2), "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[ATR] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def analyze_market_heat(code: str, days: int = 30) -> Dict[str, Any]: """ 分析换手率活跃度和量能 Args: code: 股票代码 days: 分析天数 Returns: 市场热度分析 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 10: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) volume = np.array([float(d['volume']) for d in trade_data]) turnover_rates = [float(d.get('turnover_rate', 0) or 0) for d in trade_data] current_turnover = turnover_rates[-1] avg_turnover_5 = np.mean(turnover_rates[-5:]) # 计算量比 volume_ma5 = _calc_sma(volume, 5) volume_ratio = volume[-1] / volume_ma5[-1] if volume_ma5[-1] > 0 else 1 # 计算OBV obv = _calc_obv(close, volume) obv_ma5 = _calc_sma(obv, 5) obv_trend = "上升" if obv[-1] > obv_ma5[-1] else "下降" # 换手率分级 if current_turnover < 1: heat_level = "冷门" elif current_turnover < 3: heat_level = "正常" elif current_turnover < 7: heat_level = "活跃" elif current_turnover < 15: heat_level = "火热" else: heat_level = "极热" # 判断信号 signal = VolumeSignal.NORMAL if volume_ratio > 2: signal = VolumeSignal.VOLUME_SURGE elif volume_ratio < 0.5: signal = VolumeSignal.VOLUME_SHRINK # 检查量价背离 price_up = close[-1] > close[-5] if len(close) > 5 else False volume_down = volume[-1] < volume[-5] if len(volume) > 5 else False if price_up and volume_down: signal = VolumeSignal.VOLUME_PRICE_DIVERGENCE # OBV判断主力动向 if obv_trend == "上升" and close[-1] <= close[-5]: signal = VolumeSignal.ACCUMULATION elif obv_trend == "下降" and close[-1] >= close[-5]: signal = VolumeSignal.DISTRIBUTION desc = (f"换手率{current_turnover:.2f}%({heat_level}),量比{volume_ratio:.2f}," f"OBV趋势{obv_trend}。{signal.value}") return { "success": True, "data": { "signal": signal.value, "turnover_rate": round(current_turnover, 2), "avg_turnover_5d": round(avg_turnover_5, 2), "volume_ratio": round(volume_ratio, 2), "obv_trend": obv_trend, "heat_level": heat_level, "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[MarketHeat] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def check_new_high_breakout(code: str, days: int = 60) -> Dict[str, Any]: """ 检查唐奇安通道突破(新高/新低突破) Args: code: 股票代码 days: 分析天数 Returns: 突破信号分析 """ try: trade_data = await db.get_stock_trade_data(code, limit=days + 10) if len(trade_data) < 25: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) high = np.array([float(d['high_price']) for d in trade_data]) low = np.array([float(d['low_price']) for d in trade_data]) close = np.array([float(d['close_price']) for d in trade_data]) current_price = close[-1] # 计算20日和60日高低点 high_20d = np.max(high[-21:-1]) if len(high) > 21 else np.max(high[:-1]) low_20d = np.min(low[-21:-1]) if len(low) > 21 else np.min(low[:-1]) high_60d = np.max(high[-61:-1]) if len(high) > 61 else np.max(high[:-1]) low_60d = np.min(low[-61:-1]) if len(low) > 61 else np.min(low[:-1]) # 判断突破信号 signal = BreakoutSignal.NO_SIGNAL if current_price > high_20d: signal = BreakoutSignal.NEW_HIGH_BREAKOUT elif current_price < low_20d: signal = BreakoutSignal.NEW_LOW_BREAKDOWN elif current_price > high_20d * 0.97: signal = BreakoutSignal.RESISTANCE_TEST elif current_price < low_20d * 1.03: signal = BreakoutSignal.SUPPORT_TEST # 计算距离 distance_to_high = (high_20d - current_price) / current_price * 100 distance_to_low = (current_price - low_20d) / current_price * 100 descriptions = { BreakoutSignal.NEW_HIGH_BREAKOUT: f"突破20日新高({high_20d:.2f}),海龟交易法则买入信号触发", BreakoutSignal.NEW_LOW_BREAKDOWN: f"跌破20日新低({low_20d:.2f}),海龟交易法则卖出信号触发", BreakoutSignal.RESISTANCE_TEST: f"接近20日高点({high_20d:.2f}),距离{distance_to_high:.1f}%,关注能否突破", BreakoutSignal.SUPPORT_TEST: f"接近20日低点({low_20d:.2f}),距离{distance_to_low:.1f}%,关注能否守住", BreakoutSignal.NO_SIGNAL: f"价格在20日通道内运行({low_20d:.2f} - {high_20d:.2f})", } return { "success": True, "data": { "signal": signal.value, "high_20d": round(high_20d, 2), "low_20d": round(low_20d, 2), "high_60d": round(high_60d, 2), "low_60d": round(low_60d, 2), "current_price": round(current_price, 2), "distance_to_high_pct": round(distance_to_high, 2), "distance_to_low_pct": round(distance_to_low, 2), "description": descriptions[signal], "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[Breakout] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_max_drawdown(code: str, days: int = 250) -> Dict[str, Any]: """ 计算最大回撤 Args: code: 股票代码 days: 分析天数,默认250天(约一年) Returns: 最大回撤分析 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) dates = [d['TRADEDATE'] for d in trade_data] # 计算累计最大值 running_max = np.maximum.accumulate(close) # 计算回撤 drawdown = (running_max - close) / running_max * 100 # 找到最大回撤 max_dd = np.max(drawdown) max_dd_idx = np.argmax(drawdown) # 找到回撤开始的位置(峰值) peak_idx = np.argmax(close[:max_dd_idx + 1]) if max_dd_idx > 0 else 0 # 回撤期间 period_start = dates[peak_idx] if peak_idx < len(dates) else dates[0] period_end = dates[max_dd_idx] if max_dd_idx < len(dates) else dates[-1] # 计算波动率(年化) returns = np.diff(close) / close[:-1] volatility = np.std(returns) * np.sqrt(252) * 100 # 计算夏普比率(假设无风险利率2%) avg_return = np.mean(returns) * 252 * 100 sharpe_ratio = (avg_return - 2) / volatility if volatility > 0 else 0 # 风险等级 if max_dd < 10: risk_level = "低风险" elif max_dd < 25: risk_level = "中风险" else: risk_level = "高风险" desc = (f"最大回撤{max_dd:.1f}%(从{period_start}到{period_end})," f"年化波动率{volatility:.1f}%,夏普比率{sharpe_ratio:.2f},{risk_level}") return { "success": True, "data": { "max_drawdown": round(max_dd, 2), "max_drawdown_period": f"{period_start} 至 {period_end}", "sharpe_ratio": round(sharpe_ratio, 2), "volatility": round(volatility, 2), "risk_level": risk_level, "description": desc, "code": code, "analysis_days": len(trade_data) } } except Exception as e: logger.error(f"[MaxDrawdown] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def check_valuation_rank(code: str, years: int = 3) -> Dict[str, Any]: """ 检查历史PE/PB百分位估值 Args: code: 股票代码 years: 历史年数,默认3年 Returns: 估值分析结果 """ try: days = years * 250 trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 60: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) # 提取PE数据 pe_values = [float(d.get('pe_ratio', 0) or 0) for d in trade_data if d.get('pe_ratio')] pe_values = [pe for pe in pe_values if 0 < pe < 1000] # 过滤异常值 if len(pe_values) < 30: return {"success": False, "error": "PE数据不足"} current_pe = pe_values[-1] # 计算百分位 pe_percentile = (np.sum(np.array(pe_values) < current_pe) / len(pe_values)) * 100 # 获取财务数据计算PEG peg_ratio = None try: financial_data = await db.get_stock_financial_index(code, limit=2) if len(financial_data) >= 2: growth_rate = financial_data[0].get('profit_growth', 0) or 0 if growth_rate > 0: peg_ratio = current_pe / growth_rate except: pass # 估值水平判断 if pe_percentile < 20: valuation_level = "低估" elif pe_percentile < 50: valuation_level = "较低" elif pe_percentile < 80: valuation_level = "合理" else: valuation_level = "高估" # PEG修正 peg_desc = "" if peg_ratio is not None: if peg_ratio < 1: peg_desc = f",PEG={peg_ratio:.2f}<1(成长性强)" elif peg_ratio < 2: peg_desc = f",PEG={peg_ratio:.2f}(合理)" else: peg_desc = f",PEG={peg_ratio:.2f}(偏贵)" desc = (f"当前PE={current_pe:.1f},处于近{years}年{pe_percentile:.0f}%分位," f"估值{valuation_level}{peg_desc}") return { "success": True, "data": { "pe_current": round(current_pe, 2), "pe_percentile": round(pe_percentile, 1), "pe_min": round(min(pe_values), 2), "pe_max": round(max(pe_values), 2), "pe_median": round(np.median(pe_values), 2), "peg_ratio": round(peg_ratio, 2) if peg_ratio else None, "valuation_level": valuation_level, "description": desc, "code": code, "analysis_years": years } } except Exception as e: logger.error(f"[Valuation] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_price_zscore(code: str, period: int = 60) -> Dict[str, Any]: """ 计算价格Z-Score(乖离率标准化) Args: code: 股票代码 period: 均线周期,默认60日 Returns: Z-Score分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=period + 10) if len(trade_data) < period: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算均线和标准差 ma = _calc_sma(close, period) std = _calc_std(close, period) current_price = close[-1] current_ma = ma[-1] current_std = std[-1] # 计算Z-Score zscore = (current_price - current_ma) / current_std if current_std > 0 else 0 # 计算乖离率 bias = (current_price - current_ma) / current_ma * 100 # 判断状态 if zscore > 2: status = "极度偏高" probability = "历史上此位置回落概率约95%" elif zscore > 1: status = "偏高" probability = "历史上此位置回落概率约68%" elif zscore < -2: status = "极度偏低" probability = "历史上此位置反弹概率约95%" elif zscore < -1: status = "偏低" probability = "历史上此位置反弹概率约68%" else: status = "正常" probability = "价格在正常波动范围内" desc = f"Z-Score={zscore:.2f},乖离率{bias:+.1f}%,{status}。{probability}" return { "success": True, "data": { "zscore": round(zscore, 3), "bias": round(bias, 2), "ma_period": period, "current_ma": round(current_ma, 2), "current_price": round(current_price, 2), "status": status, "mean_reversion_hint": probability, "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[ZScore] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} # ==================== 分钟级高阶算子(基于ClickHouse) ==================== async def calc_market_profile_vpoc(code: str, date: str = None) -> Dict[str, Any]: """ 计算市场轮廓VPOC(成交量最大的价格档位) Args: code: 股票代码 date: 日期,格式 YYYY-MM-DD(可选,默认使用最近交易日) Returns: VPOC分析结果 """ try: # 获取分钟数据(如果未指定日期,获取最近的数据) if date: minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) else: # 获取最近的分钟数据 minute_data = await db.get_stock_minute_data(code, limit=500) if len(minute_data) < 10: return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} # 从数据中提取实际日期 if not date and minute_data: date = minute_data[0].get('timestamp', '')[:10] # 按价格区间统计成交量 prices = np.array([float(d['close']) for d in minute_data]) volumes = np.array([float(d['volume']) for d in minute_data]) # 创建价格档位(将价格四舍五入到0.01) price_bins = np.round(prices, 2) # 统计每个价格档位的成交量 unique_prices, indices = np.unique(price_bins, return_inverse=True) volume_by_price = np.zeros(len(unique_prices)) for i, idx in enumerate(indices): volume_by_price[idx] += volumes[i] # 找到VPOC vpoc_idx = np.argmax(volume_by_price) vpoc_price = unique_prices[vpoc_idx] vpoc_volume = volume_by_price[vpoc_idx] # 当前价格 current_price = prices[-1] # 判断位置关系 if current_price > vpoc_price * 1.01: position = "价格在VPOC之上,短期支撑位" elif current_price < vpoc_price * 0.99: position = "价格在VPOC之下,短期压力位" else: position = "价格在VPOC附近,成交密集区" desc = f"VPOC价格={vpoc_price:.2f},成交量占比{vpoc_volume/np.sum(volumes)*100:.1f}%。{position}" return { "success": True, "data": { "vpoc_price": round(vpoc_price, 2), "vpoc_volume": int(vpoc_volume), "vpoc_volume_pct": round(vpoc_volume / np.sum(volumes) * 100, 1), "current_price": round(current_price, 2), "position": position, "description": desc, "code": code, "date": date } } except Exception as e: logger.error(f"[VPOC] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_realized_volatility(code: str, date: str = None) -> Dict[str, Any]: """ 计算已实现波动率(Realized Volatility) Args: code: 股票代码 date: 日期(可选,默认使用最近交易日) Returns: RV分析结果 """ try: # 获取分钟数据 if date: minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) else: minute_data = await db.get_stock_minute_data(code, limit=500) if len(minute_data) < 30: return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} # 从数据中提取实际日期 if not date and minute_data: date = minute_data[0].get('timestamp', '')[:10] # 按时间排序 minute_data = sorted(minute_data, key=lambda x: x['timestamp']) close = np.array([float(d['close']) for d in minute_data]) # 计算分钟收益率 returns = np.diff(np.log(close)) # 计算已实现波动率(日内) rv_intraday = np.sqrt(np.sum(returns ** 2)) * 100 # 年化波动率(假设一天240分钟,一年250天) rv_annual = rv_intraday * np.sqrt(250) * 100 # 波动率水平 if rv_intraday < 1: vol_level = "低波动" elif rv_intraday < 3: vol_level = "中波动" else: vol_level = "高波动" desc = f"日内已实现波动率{rv_intraday:.2f}%(年化约{rv_annual:.0f}%),{vol_level}" return { "success": True, "data": { "rv_intraday": round(rv_intraday, 3), "rv_annualized": round(rv_annual, 1), "volatility_level": vol_level, "description": desc, "code": code, "date": date } } except Exception as e: logger.error(f"[RV] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def analyze_buying_pressure(code: str, date: str = None) -> Dict[str, Any]: """ 分析买卖压力失衡 Args: code: 股票代码 date: 日期(可选,默认使用最近交易日) Returns: 买卖压力分析 """ try: # 获取分钟数据 if date: minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) else: minute_data = await db.get_stock_minute_data(code, limit=500) if len(minute_data) < 30: return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} # 从数据中提取实际日期 if not date and minute_data: date = minute_data[0].get('timestamp', '')[:10] minute_data = sorted(minute_data, key=lambda x: x['timestamp']) high = np.array([float(d['high']) for d in minute_data]) low = np.array([float(d['low']) for d in minute_data]) close = np.array([float(d['close']) for d in minute_data]) volume = np.array([float(d['volume']) for d in minute_data]) # 计算买卖压力指标 # (Close - Low) - (High - Close),正值表示买压,负值表示卖压 pressure = (close - low) - (high - close) # 成交量加权 weighted_pressure = np.sum(pressure * volume) / np.sum(volume) # 计算买压占比 total_range = high - low buying_pct = np.where(total_range > 0, (close - low) / total_range, 0.5) avg_buying_pct = np.mean(buying_pct) * 100 # 判断信号 if weighted_pressure > 0.1: signal = "强买压" desc = f"盘中主力抢筹明显,买方占优,收盘偏向日内高点" elif weighted_pressure < -0.1: signal = "强卖压" desc = f"盘中抛压沉重,卖方占优,收盘偏向日内低点" else: signal = "均衡" desc = f"买卖双方博弈均衡,无明显方向" return { "success": True, "data": { "pressure_signal": signal, "weighted_pressure": round(weighted_pressure, 4), "avg_buying_pct": round(avg_buying_pct, 1), "description": desc, "code": code, "date": date } } except Exception as e: logger.error(f"[BuyingPressure] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def check_volume_price_divergence(code: str, days: int = 20) -> Dict[str, Any]: """ 检测量价背离 Args: code: 股票代码 days: 分析天数 Returns: 量价背离分析 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 10: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) volume = np.array([float(d['volume']) for d in trade_data]) # 检查价格趋势 price_trend = "up" if close[-1] > close[-5] else "down" price_change = (close[-1] - close[-5]) / close[-5] * 100 # 检查成交量趋势 vol_avg_recent = np.mean(volume[-5:]) vol_avg_prev = np.mean(volume[-10:-5]) if len(volume) >= 10 else np.mean(volume[:-5]) vol_trend = "up" if vol_avg_recent > vol_avg_prev else "down" vol_change = (vol_avg_recent - vol_avg_prev) / vol_avg_prev * 100 if vol_avg_prev > 0 else 0 # 判断背离 has_divergence = False divergence_type = None if price_trend == "up" and vol_trend == "down": has_divergence = True divergence_type = "顶背离" desc = f"股价上涨{price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,上涨动能不足,警惕回调" elif price_trend == "down" and vol_trend == "down": has_divergence = True divergence_type = "底背离" desc = f"股价下跌{-price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,抛压减弱,可能企稳" else: desc = f"量价配合正常,价格{price_trend},成交量{vol_trend}" return { "success": True, "data": { "has_divergence": has_divergence, "divergence_type": divergence_type, "price_change_5d": round(price_change, 2), "volume_change_5d": round(vol_change, 2), "signal": "量价背离" if has_divergence else "量价正常", "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[VolPriceDivergence] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def identify_candlestick_pattern(code: str, days: int = 10) -> Dict[str, Any]: """ 识别K线组合形态 Args: code: 股票代码 days: 分析天数 Returns: K线形态识别结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 3: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) patterns = [] # 获取最近3天的OHLC for i in range(len(trade_data) - 2, len(trade_data)): if i < 0: continue d = trade_data[i] open_p = float(d['open_price']) high = float(d['high_price']) low = float(d['low_price']) close = float(d['close_price']) body = close - open_p upper_shadow = high - max(open_p, close) lower_shadow = min(open_p, close) - low body_size = abs(body) # 十字星 if body_size < (high - low) * 0.1: patterns.append(("十字星", "犹豫信号,可能变盘")) # 锤子线 if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5: patterns.append(("锤子线", "底部反转信号")) # 上吊线 if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5 and body < 0: patterns.append(("上吊线", "顶部反转信号")) # 检查多日形态(需要至少3天数据) if len(trade_data) >= 3: d1, d2, d3 = trade_data[-3], trade_data[-2], trade_data[-1] close1 = float(d1['close_price']) open1 = float(d1['open_price']) close2 = float(d2['close_price']) open2 = float(d2['open_price']) close3 = float(d3['close_price']) open3 = float(d3['open_price']) # 红三兵 if (close1 > open1 and close2 > open2 and close3 > open3 and close2 > close1 and close3 > close2): patterns.append(("红三兵", "强势上涨信号")) # 三只乌鸦 if (close1 < open1 and close2 < open2 and close3 < open3 and close2 < close1 and close3 < close2): patterns.append(("三只乌鸦", "强势下跌信号")) # 早晨之星 body1 = abs(close1 - open1) body2 = abs(close2 - open2) body3 = abs(close3 - open3) if (close1 < open1 and body2 < body1 * 0.3 and close3 > open3 and close3 > (open1 + close1) / 2): patterns.append(("早晨之星", "底部反转强信号")) # 黄昏之星 if (close1 > open1 and body2 < body1 * 0.3 and close3 < open3 and close3 < (open1 + close1) / 2): patterns.append(("黄昏之星", "顶部反转强信号")) # 生成描述 if patterns: pattern_names = [f"{p[0]}({p[1]})" for p in patterns] desc = f"识别到以下K线形态:" + "、".join(pattern_names) else: desc = "未识别到明显的K线形态" return { "success": True, "data": { "patterns": [{"name": p[0], "meaning": p[1]} for p in patterns], "pattern_count": len(patterns), "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[CandlestickPattern] 识别失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def find_price_gaps(code: str, days: int = 30) -> Dict[str, Any]: """ 寻找跳空缺口 Args: code: 股票代码 days: 分析天数 Returns: 缺口分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 5: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) gaps = [] current_price = float(trade_data[-1]['close_price']) for i in range(1, len(trade_data)): prev = trade_data[i-1] curr = trade_data[i] prev_high = float(prev['high_price']) prev_low = float(prev['low_price']) curr_high = float(curr['high_price']) curr_low = float(curr['low_price']) # 上涨缺口 if curr_low > prev_high: gap_size = (curr_low - prev_high) / prev_high * 100 is_filled = current_price <= curr_low gaps.append({ "type": "上涨缺口", "date": curr['TRADEDATE'], "gap_low": prev_high, "gap_high": curr_low, "gap_size_pct": round(gap_size, 2), "is_filled": is_filled }) # 下跌缺口 if curr_high < prev_low: gap_size = (prev_low - curr_high) / prev_low * 100 is_filled = current_price >= curr_high gaps.append({ "type": "下跌缺口", "date": curr['TRADEDATE'], "gap_low": curr_high, "gap_high": prev_low, "gap_size_pct": round(gap_size, 2), "is_filled": is_filled }) # 只保留未回补的缺口 unfilled_gaps = [g for g in gaps if not g['is_filled']] if unfilled_gaps: gap_desc = [f"{g['type']}({g['date']},{g['gap_size_pct']}%)" for g in unfilled_gaps[:3]] desc = f"发现{len(unfilled_gaps)}个未回补缺口:" + "、".join(gap_desc) else: desc = "近期无未回补缺口" return { "success": True, "data": { "total_gaps": len(gaps), "unfilled_gaps": unfilled_gaps, "unfilled_count": len(unfilled_gaps), "description": desc, "code": code } } except Exception as e: logger.error(f"[PriceGaps] 分析失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def get_comprehensive_analysis(code: str) -> Dict[str, Any]: """ 综合技术分析(一次性返回多个指标) Args: code: 股票代码 Returns: 综合分析结果 """ try: # 并行执行多个分析 results = await asyncio.gather( get_macd_signal(code), check_oscillator_status(code), analyze_bollinger_bands(code), analyze_market_heat(code), check_new_high_breakout(code), check_volume_price_divergence(code), identify_candlestick_pattern(code), return_exceptions=True ) analysis = {} indicators = [ ("macd", "MACD趋势"), ("oscillator", "超买超卖"), ("bollinger", "布林带"), ("market_heat", "市场热度"), ("breakout", "突破信号"), ("volume_price", "量价分析"), ("candlestick", "K线形态") ] signals = [] for i, (key, name) in enumerate(indicators): if isinstance(results[i], dict) and results[i].get("success"): analysis[key] = results[i]["data"] signals.append(f"【{name}】{results[i]['data'].get('description', '')}") # 生成综合评估 bullish_count = 0 bearish_count = 0 for result in results: if isinstance(result, dict) and result.get("success"): data = result.get("data", {}) signal = data.get("signal", "") if any(word in signal for word in ["金叉", "底背离", "超卖", "支撑", "突破新高", "红三兵"]): bullish_count += 1 elif any(word in signal for word in ["死叉", "顶背离", "超买", "压力", "跌破", "乌鸦"]): bearish_count += 1 if bullish_count > bearish_count + 2: overall = "多方占优,短期看涨" elif bearish_count > bullish_count + 2: overall = "空方占优,短期看跌" else: overall = "多空胶着,观望为主" return { "success": True, "data": { "code": code, "analysis": analysis, "signals_summary": signals, "bullish_signals": bullish_count, "bearish_signals": bearish_count, "overall_assessment": overall } } except Exception as e: logger.error(f"[ComprehensiveAnalysis] 分析失败: {e}", exc_info=True) return {"success": False, "error": str(e)} # ==================== 新增因子(12个) ==================== async def calc_rsi_divergence(code: str, days: int = 60, rsi_period: int = 14) -> Dict[str, Any]: """ RSI背离检测(独立于MACD的RSI专项背离分析) Args: code: 股票代码 days: 分析天数 rsi_period: RSI周期,默认14 Returns: RSI背离分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < rsi_period + 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算RSI rsi = _calc_rsi(close, rsi_period) # 检测背离 divergence = _detect_divergence(close, rsi, lookback=20) current_rsi = rsi[-1] # 判断RSI位置 if current_rsi > 70: rsi_zone = "超买区" elif current_rsi < 30: rsi_zone = "超卖区" else: rsi_zone = "中性区" # 生成描述 if divergence == "bullish_divergence": signal = "底背离" desc = f"RSI底背离形成:股价创新低但RSI没有创新低,当前RSI={current_rsi:.1f},位于{rsi_zone},反弹概率增大" elif divergence == "bearish_divergence": signal = "顶背离" desc = f"RSI顶背离形成:股价创新高但RSI没有创新高,当前RSI={current_rsi:.1f},位于{rsi_zone},回调风险加大" else: signal = "无背离" desc = f"RSI当前值={current_rsi:.1f},位于{rsi_zone},未检测到明显背离" return { "success": True, "data": { "signal": signal, "rsi_value": round(current_rsi, 2), "rsi_zone": rsi_zone, "has_divergence": divergence is not None, "divergence_type": divergence, "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[RSI Divergence] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_bollinger_squeeze(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]: """ 布林带挤压分析(Bollinger Squeeze) 当布林带收窄到一定程度时,通常预示着变盘 Args: code: 股票代码 days: 分析天数 period: 布林带周期 Returns: 布林带挤压分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < period + 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算布林带 upper, middle, lower = _calc_bollinger(close, period) # 计算带宽序列 bandwidths = (upper - lower) / middle * 100 current_bandwidth = bandwidths[-1] # 计算历史带宽百分位 bandwidth_percentile = (np.sum(bandwidths < current_bandwidth) / len(bandwidths)) * 100 # 计算带宽变化趋势 bandwidth_ma5 = np.mean(bandwidths[-5:]) bandwidth_ma20 = np.mean(bandwidths[-20:]) bandwidth_trend = "收窄" if bandwidth_ma5 < bandwidth_ma20 else "扩张" # 判断挤压状态 if bandwidth_percentile < 20: squeeze_status = "强挤压" signal = "变盘在即" desc = f"布林带处于强挤压状态,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),变盘信号强烈" elif bandwidth_percentile < 40: squeeze_status = "中度挤压" signal = "关注突破" desc = f"布林带中度挤压,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),关注突破方向" else: squeeze_status = "无挤压" signal = "正常波动" desc = f"布林带正常,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),趋势{bandwidth_trend}中" return { "success": True, "data": { "squeeze_status": squeeze_status, "signal": signal, "bandwidth": round(current_bandwidth, 2), "bandwidth_percentile": round(bandwidth_percentile, 1), "bandwidth_trend": bandwidth_trend, "upper": round(upper[-1], 2), "middle": round(middle[-1], 2), "lower": round(lower[-1], 2), "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[Bollinger Squeeze] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def analyze_obv_trend(code: str, days: int = 60) -> Dict[str, Any]: """ OBV能量潮独立分析 Args: code: 股票代码 days: 分析天数 Returns: OBV趋势分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) volume = np.array([float(d['volume']) for d in trade_data]) # 计算OBV obv = _calc_obv(close, volume) # 计算OBV均线 obv_ma5 = _calc_sma(obv, 5) obv_ma20 = _calc_sma(obv, 20) current_obv = obv[-1] current_obv_ma5 = obv_ma5[-1] current_obv_ma20 = obv_ma20[-1] # 判断OBV趋势 if current_obv > current_obv_ma5 > current_obv_ma20: obv_trend = "强势上升" signal = "资金持续流入" elif current_obv > current_obv_ma5: obv_trend = "短期上升" signal = "资金开始流入" elif current_obv < current_obv_ma5 < current_obv_ma20: obv_trend = "强势下降" signal = "资金持续流出" elif current_obv < current_obv_ma5: obv_trend = "短期下降" signal = "资金开始流出" else: obv_trend = "震荡" signal = "资金观望" # 检测OBV与价格的背离 price_change_20d = (close[-1] - close[-20]) / close[-20] * 100 if len(close) >= 20 else 0 obv_change_20d = (obv[-1] - obv[-20]) / abs(obv[-20]) * 100 if len(obv) >= 20 and obv[-20] != 0 else 0 obv_divergence = None if price_change_20d > 5 and obv_change_20d < -5: obv_divergence = "顶背离" elif price_change_20d < -5 and obv_change_20d > 5: obv_divergence = "底背离" desc = f"OBV趋势:{obv_trend},{signal}。" if obv_divergence: desc += f"警告:出现OBV{obv_divergence}!" return { "success": True, "data": { "obv_trend": obv_trend, "signal": signal, "current_obv": int(current_obv), "obv_ma5": int(current_obv_ma5), "obv_ma20": int(current_obv_ma20), "obv_divergence": obv_divergence, "price_change_20d": round(price_change_20d, 2), "obv_change_20d": round(obv_change_20d, 2), "description": desc, "code": code, "analysis_date": trade_data[-1]['TRADEDATE'] } } except Exception as e: logger.error(f"[OBV Trend] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_amihud_illiquidity(code: str, days: int = 20) -> Dict[str, Any]: """ 计算Amihud非流动性因子 Amihud = |收益率| / 成交额 值越大表示流动性越差(价格对成交额的敏感度越高) Args: code: 股票代码 days: 分析天数 Returns: Amihud非流动性分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 5: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) turnover = np.array([float(d.get('turnover', 0) or d.get('amount', 0) or 0) for d in trade_data]) # 过滤掉成交额为0的数据 valid_mask = turnover > 0 if np.sum(valid_mask) < 5: return {"success": False, "error": "有效成交额数据不足"} # 计算日收益率 returns = np.zeros(len(close)) returns[1:] = np.abs(np.diff(close) / close[:-1]) # 计算Amihud因子(单位:每百万成交额导致的价格变动百分比) amihud_daily = np.where(turnover > 0, returns / (turnover / 1e6), 0) # 平均Amihud值 avg_amihud = np.mean(amihud_daily[valid_mask]) * 1e6 # 放大以便阅读 # 流动性评级 if avg_amihud < 0.1: liquidity_level = "极高流动性" signal = "大单冲击成本低" elif avg_amihud < 0.5: liquidity_level = "高流动性" signal = "交易成本可控" elif avg_amihud < 2: liquidity_level = "中等流动性" signal = "注意交易量" elif avg_amihud < 10: liquidity_level = "低流动性" signal = "大单需分批" else: liquidity_level = "极低流动性" signal = "谨慎交易" desc = f"Amihud非流动性指标={avg_amihud:.4f},{liquidity_level},{signal}" return { "success": True, "data": { "amihud_value": round(avg_amihud, 6), "liquidity_level": liquidity_level, "signal": signal, "avg_daily_turnover": round(np.mean(turnover[valid_mask]) / 1e4, 2), # 万元 "description": desc, "code": code, "analysis_days": days } } except Exception as e: logger.error(f"[Amihud] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_parkinson_volatility(code: str, date: str = None) -> Dict[str, Any]: """ 计算帕金森波动率(基于分钟级High/Low数据) 帕金森波动率只使用最高价和最低价,比传统波动率更准确 Args: code: 股票代码 date: 日期(可选,默认使用最近交易日) Returns: 帕金森波动率结果 """ try: # 获取分钟数据 if date: minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) else: minute_data = await db.get_stock_minute_data(code, limit=500) if len(minute_data) < 30: return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} # 从数据中提取实际日期 if not date and minute_data: date = minute_data[0].get('timestamp', '')[:10] high = np.array([float(d['high']) for d in minute_data]) low = np.array([float(d['low']) for d in minute_data]) # 帕金森波动率公式 # σ = sqrt(1/(4*ln(2)) * mean((ln(H/L))^2)) log_hl = np.log(high / low) parkinson_var = np.mean(log_hl ** 2) / (4 * np.log(2)) parkinson_vol = np.sqrt(parkinson_var) * 100 # 转为百分比 # 年化 parkinson_vol_annual = parkinson_vol * np.sqrt(252 * 240) # 240分钟/天 # 与传统波动率对比(使用收盘价) close = np.array([float(d['close']) for d in minute_data]) returns = np.diff(np.log(close)) traditional_vol = np.std(returns) * 100 # 波动率评级 if parkinson_vol < 0.5: vol_level = "低波动" elif parkinson_vol < 1.5: vol_level = "中波动" else: vol_level = "高波动" desc = f"帕金森波动率={parkinson_vol:.3f}%(年化约{parkinson_vol_annual:.1f}%),{vol_level}。传统波动率={traditional_vol:.3f}%" return { "success": True, "data": { "parkinson_vol": round(parkinson_vol, 4), "parkinson_vol_annual": round(parkinson_vol_annual, 2), "traditional_vol": round(traditional_vol, 4), "vol_level": vol_level, "description": desc, "code": code, "date": date } } except Exception as e: logger.error(f"[Parkinson Vol] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_trend_slope(code: str, days: int = 20) -> Dict[str, Any]: """ 计算趋势线性回归斜率 Args: code: 股票代码 days: 分析天数 Returns: 趋势斜率分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 10: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 线性回归 x = np.arange(len(close)) slope, intercept = np.polyfit(x, close, 1) # 计算R² y_pred = slope * x + intercept ss_res = np.sum((close - y_pred) ** 2) ss_tot = np.sum((close - np.mean(close)) ** 2) r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 # 将斜率转化为日均涨幅百分比 slope_pct = slope / close[0] * 100 # 判断趋势 if slope_pct > 0.5 and r_squared > 0.7: trend = "强上升趋势" signal = "顺势做多" elif slope_pct > 0.2 and r_squared > 0.5: trend = "弱上升趋势" signal = "关注回调买点" elif slope_pct < -0.5 and r_squared > 0.7: trend = "强下降趋势" signal = "顺势做空或观望" elif slope_pct < -0.2 and r_squared > 0.5: trend = "弱下降趋势" signal = "关注反弹卖点" else: trend = "震荡无趋势" signal = "区间交易" desc = f"近{days}日趋势斜率={slope_pct:.3f}%/日,R²={r_squared:.2f},{trend},建议{signal}" return { "success": True, "data": { "slope": round(slope, 4), "slope_pct_daily": round(slope_pct, 3), "r_squared": round(r_squared, 3), "trend": trend, "signal": signal, "intercept": round(intercept, 2), "description": desc, "code": code, "analysis_days": days } } except Exception as e: logger.error(f"[Trend Slope] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_hurst_exponent(code: str, days: int = 100) -> Dict[str, Any]: """ 计算Hurst指数(用于判断市场是趋势还是均值回归) H > 0.5: 趋势市场(序列具有长期记忆) H = 0.5: 随机游走 H < 0.5: 均值回归市场 Args: code: 股票代码 days: 分析天数(建议100天以上) Returns: Hurst指数分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 50: return {"success": False, "error": "数据不足,Hurst指数需要至少50个数据点"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # R/S分析法计算Hurst指数 n = len(close) max_k = min(n // 2, 50) # 最大分组数 rs_list = [] n_list = [] for k in range(10, max_k + 1): # 将序列分成多个子序列 rs_values = [] for start in range(0, n - k + 1, k): segment = close[start:start + k] # 计算累积离差 mean_seg = np.mean(segment) cumdev = np.cumsum(segment - mean_seg) # R = max - min r = np.max(cumdev) - np.min(cumdev) # S = 标准差 s = np.std(segment) if s > 0: rs_values.append(r / s) if rs_values: rs_list.append(np.mean(rs_values)) n_list.append(k) if len(rs_list) < 3: return {"success": False, "error": "无法计算Hurst指数"} # 线性回归计算H log_n = np.log(n_list) log_rs = np.log(rs_list) hurst, _ = np.polyfit(log_n, log_rs, 1) # 解读Hurst指数 if hurst > 0.6: market_type = "趋势市场" signal = "趋势策略有效" desc = f"Hurst指数={hurst:.3f}>0.6,市场呈现明显趋势特征,适合趋势跟踪策略" elif hurst > 0.5: market_type = "弱趋势市场" signal = "混合策略" desc = f"Hurst指数={hurst:.3f},接近随机游走,趋势与均值回归策略均可尝试" elif hurst > 0.4: market_type = "弱均值回归" signal = "关注反转" desc = f"Hurst指数={hurst:.3f},市场有轻微均值回归倾向" else: market_type = "强均值回归" signal = "反转策略有效" desc = f"Hurst指数={hurst:.3f}<0.4,市场呈现强均值回归特征,适合反转策略" return { "success": True, "data": { "hurst_exponent": round(hurst, 3), "market_type": market_type, "signal": signal, "description": desc, "code": code, "analysis_days": len(trade_data) } } except Exception as e: logger.error(f"[Hurst] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def test_cointegration(code1: str, code2: str, days: int = 250) -> Dict[str, Any]: """ 协整性测试(用于配对交易) Args: code1: 股票代码1 code2: 股票代码2 days: 分析天数 Returns: 协整性测试结果 """ try: # 获取两只股票的数据 data1 = await db.get_stock_trade_data(code1, limit=days) data2 = await db.get_stock_trade_data(code2, limit=days) if len(data1) < 60 or len(data2) < 60: return {"success": False, "error": "数据不足"} # 按日期排序 data1 = sorted(data1, key=lambda x: x['TRADEDATE']) data2 = sorted(data2, key=lambda x: x['TRADEDATE']) # 对齐日期 dates1 = {d['TRADEDATE']: float(d['close_price']) for d in data1} dates2 = {d['TRADEDATE']: float(d['close_price']) for d in data2} common_dates = sorted(set(dates1.keys()) & set(dates2.keys())) if len(common_dates) < 60: return {"success": False, "error": "共同交易日不足"} price1 = np.array([dates1[d] for d in common_dates]) price2 = np.array([dates2[d] for d in common_dates]) # 计算对数价格 log_price1 = np.log(price1) log_price2 = np.log(price2) # 线性回归找对冲比率 hedge_ratio, intercept = np.polyfit(log_price2, log_price1, 1) # 计算价差(spread) spread = log_price1 - hedge_ratio * log_price2 # ADF检验的简化版本(计算价差的平稳性指标) spread_diff = np.diff(spread) spread_lag = spread[:-1] # 计算回归系数 if np.var(spread_lag) > 0: rho = np.cov(spread_diff, spread_lag)[0, 1] / np.var(spread_lag) else: rho = 0 # rho接近-1表示强平稳性 stationarity_score = -rho # 计算价差的均值回归半衰期 if rho < 0: half_life = -np.log(2) / np.log(1 + rho) else: half_life = float('inf') # 当前价差Z-score spread_mean = np.mean(spread) spread_std = np.std(spread) current_zscore = (spread[-1] - spread_mean) / spread_std if spread_std > 0 else 0 # 判断协整性 if stationarity_score > 0.3 and half_life < 30: cointegration_level = "强协整" signal = "适合配对交易" elif stationarity_score > 0.1 and half_life < 60: cointegration_level = "中等协整" signal = "可尝试配对" else: cointegration_level = "弱协整或无协整" signal = "不建议配对" # 交易建议 if abs(current_zscore) > 2: if current_zscore > 0: trade_signal = f"做空价差(卖{code1}买{code2})" else: trade_signal = f"做多价差(买{code1}卖{code2})" elif abs(current_zscore) > 1: trade_signal = "关注价差回归机会" else: trade_signal = "价差在正常范围" desc = (f"{code1}与{code2}{cointegration_level},对冲比率={hedge_ratio:.3f}," f"半衰期={half_life:.1f}天,当前价差Z-score={current_zscore:.2f},{signal}") return { "success": True, "data": { "code1": code1, "code2": code2, "cointegration_level": cointegration_level, "hedge_ratio": round(hedge_ratio, 4), "half_life_days": round(half_life, 1) if half_life < 1000 else "N/A", "current_spread_zscore": round(current_zscore, 3), "stationarity_score": round(stationarity_score, 3), "trade_signal": trade_signal, "signal": signal, "description": desc, "common_days": len(common_dates) } } except Exception as e: logger.error(f"[Cointegration] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_kelly_position(win_rate: float, win_loss_ratio: float, max_position: float = 0.25) -> Dict[str, Any]: """ 凯利公式计算最优仓位 Kelly% = W - (1-W)/R 其中 W=胜率,R=盈亏比 Args: win_rate: 胜率(0-1之间) win_loss_ratio: 盈亏比(平均盈利/平均亏损) max_position: 最大允许仓位(默认25%) Returns: 凯利仓位计算结果 """ try: if not 0 < win_rate < 1: return {"success": False, "error": "胜率必须在0-1之间"} if win_loss_ratio <= 0: return {"success": False, "error": "盈亏比必须大于0"} # 计算凯利比例 kelly_pct = win_rate - (1 - win_rate) / win_loss_ratio # 半凯利(更保守) half_kelly = kelly_pct / 2 # 限制最大仓位 recommended_position = min(max(kelly_pct, 0), max_position) conservative_position = min(max(half_kelly, 0), max_position) # 判断策略质量 if kelly_pct >= 0.2: strategy_quality = "优秀" signal = "可以较大仓位" elif kelly_pct >= 0.1: strategy_quality = "良好" signal = "适中仓位" elif kelly_pct > 0: strategy_quality = "一般" signal = "小仓位试探" else: strategy_quality = "不可行" signal = "不建议交易" desc = (f"胜率{win_rate*100:.1f}%,盈亏比{win_loss_ratio:.2f}," f"凯利比例={kelly_pct*100:.1f}%,策略{strategy_quality}。" f"建议仓位:激进{recommended_position*100:.1f}%,保守{conservative_position*100:.1f}%") return { "success": True, "data": { "kelly_pct": round(kelly_pct * 100, 2), "half_kelly_pct": round(half_kelly * 100, 2), "recommended_position": round(recommended_position * 100, 2), "conservative_position": round(conservative_position * 100, 2), "strategy_quality": strategy_quality, "signal": signal, "win_rate": round(win_rate * 100, 1), "win_loss_ratio": round(win_loss_ratio, 2), "description": desc } } except Exception as e: logger.error(f"[Kelly] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def search_similar_kline(code: str, lookback: int = 10, top_n: int = 5) -> Dict[str, Any]: """ 相似K线检索(基于形态相似度) Args: code: 股票代码 lookback: 匹配窗口大小(默认10天) top_n: 返回最相似的N个历史片段 Returns: 相似K线检索结果 """ try: # 获取足够的历史数据 trade_data = await db.get_stock_trade_data(code, limit=500) if len(trade_data) < lookback * 3: return {"success": False, "error": "历史数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) dates = [d['TRADEDATE'] for d in trade_data] # 当前形态(标准化) current_pattern = close[-lookback:] current_norm = (current_pattern - np.mean(current_pattern)) / np.std(current_pattern) # 在历史中搜索相似形态 similar_patterns = [] for i in range(len(close) - lookback * 2): # 不包括最近的数据 historical_pattern = close[i:i + lookback] # 标准化 if np.std(historical_pattern) > 0: hist_norm = (historical_pattern - np.mean(historical_pattern)) / np.std(historical_pattern) # 计算相似度(使用相关系数) correlation = np.corrcoef(current_norm, hist_norm)[0, 1] # 计算后续N天的收益 future_return = 0 if i + lookback + 5 < len(close): future_return = (close[i + lookback + 5] - close[i + lookback - 1]) / close[i + lookback - 1] * 100 if correlation > 0.8: # 只保留高相似度的 similar_patterns.append({ "start_date": dates[i], "end_date": dates[i + lookback - 1], "correlation": correlation, "future_return_5d": future_return }) # 按相似度排序,取前N个 similar_patterns = sorted(similar_patterns, key=lambda x: x['correlation'], reverse=True)[:top_n] if not similar_patterns: return { "success": True, "data": { "message": "未找到高度相似的历史形态", "code": code, "lookback": lookback } } # 统计历史相似形态后的走势 future_returns = [p['future_return_5d'] for p in similar_patterns if p['future_return_5d'] != 0] if future_returns: avg_future_return = np.mean(future_returns) up_probability = sum(1 for r in future_returns if r > 0) / len(future_returns) * 100 else: avg_future_return = 0 up_probability = 50 # 预测信号 if avg_future_return > 2 and up_probability > 60: prediction = "历史形态后多数上涨" elif avg_future_return < -2 and up_probability < 40: prediction = "历史形态后多数下跌" else: prediction = "历史走势分化,谨慎参考" desc = (f"找到{len(similar_patterns)}个相似历史形态," f"历史5日平均收益{avg_future_return:.1f}%,上涨概率{up_probability:.0f}%,{prediction}") return { "success": True, "data": { "similar_patterns": similar_patterns, "pattern_count": len(similar_patterns), "avg_future_return_5d": round(avg_future_return, 2), "up_probability": round(up_probability, 1), "prediction": prediction, "description": desc, "code": code, "lookback": lookback } } except Exception as e: logger.error(f"[Similar KLine] 检索失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def decompose_trend_simple(code: str, days: int = 120) -> Dict[str, Any]: """ 简化版趋势分解(不依赖Prophet) 将价格序列分解为:趋势 + 周期 + 残差 Args: code: 股票代码 days: 分析天数 Returns: 趋势分解结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 60: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) dates = [d['TRADEDATE'] for d in trade_data] # 1. 提取趋势(使用长周期移动平均) trend = _calc_sma(close, min(60, len(close) // 2)) # 2. 去趋势 detrended = close - trend # 3. 提取周期成分(使用FFT的简化方法) # 计算周期性强度 fft_result = np.fft.fft(detrended) fft_freq = np.fft.fftfreq(len(detrended)) # 找到主要周期 positive_freq_idx = fft_freq > 0 amplitudes = np.abs(fft_result[positive_freq_idx]) frequencies = fft_freq[positive_freq_idx] # 找最强的周期 if len(amplitudes) > 0: dominant_idx = np.argmax(amplitudes) dominant_period = 1 / frequencies[dominant_idx] if frequencies[dominant_idx] > 0 else 0 periodicity_strength = amplitudes[dominant_idx] / np.sum(amplitudes) * 100 else: dominant_period = 0 periodicity_strength = 0 # 4. 残差 = 原始 - 趋势 - 周期 residual = detrended # 简化处理,这里残差就是去趋势后的数据 # 分析当前趋势方向 trend_direction = "上升" if trend[-1] > trend[-20] else "下降" trend_strength = abs(trend[-1] - trend[-20]) / trend[-20] * 100 # 当前位置相对于趋势 current_deviation = (close[-1] - trend[-1]) / trend[-1] * 100 if current_deviation > 3: position = "高于趋势线" signal = "短期或有回调" elif current_deviation < -3: position = "低于趋势线" signal = "短期或有反弹" else: position = "贴近趋势线" signal = "跟随趋势" desc = (f"趋势{trend_direction}(强度{trend_strength:.1f}%)," f"主周期约{dominant_period:.0f}天,周期性强度{periodicity_strength:.1f}%," f"当前{position}(偏离{current_deviation:+.1f}%),{signal}") return { "success": True, "data": { "trend_direction": trend_direction, "trend_strength_pct": round(trend_strength, 2), "dominant_period_days": round(dominant_period, 0), "periodicity_strength": round(periodicity_strength, 1), "current_deviation_pct": round(current_deviation, 2), "position": position, "signal": signal, "current_price": round(close[-1], 2), "current_trend": round(trend[-1], 2), "description": desc, "code": code, "analysis_days": len(trade_data) } } except Exception as e: logger.error(f"[Trend Decompose] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} async def calc_price_entropy(code: str, days: int = 60) -> Dict[str, Any]: """ 计算价格信息熵(衡量市场混乱程度) 熵值越高表示市场越混乱/随机,越低表示趋势越明显 Args: code: 股票代码 days: 分析天数 Returns: 价格熵值分析结果 """ try: trade_data = await db.get_stock_trade_data(code, limit=days) if len(trade_data) < 20: return {"success": False, "error": "数据不足"} trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) close = np.array([float(d['close_price']) for d in trade_data]) # 计算日收益率 returns = np.diff(close) / close[:-1] # 将收益率离散化为bins n_bins = 10 hist, bin_edges = np.histogram(returns, bins=n_bins) # 计算概率分布 prob = hist / np.sum(hist) prob = prob[prob > 0] # 去掉0概率 # 计算香农熵 entropy = -np.sum(prob * np.log2(prob)) # 最大熵(均匀分布) max_entropy = np.log2(n_bins) # 标准化熵值(0-1之间) normalized_entropy = entropy / max_entropy # 近期熵值变化 recent_returns = returns[-10:] recent_hist, _ = np.histogram(recent_returns, bins=5) recent_prob = recent_hist / np.sum(recent_hist) recent_prob = recent_prob[recent_prob > 0] recent_entropy = -np.sum(recent_prob * np.log2(recent_prob)) recent_max_entropy = np.log2(5) recent_normalized = recent_entropy / recent_max_entropy # 判断市场状态 if normalized_entropy > 0.85: market_state = "高度混乱" signal = "随机性强,难以预测" elif normalized_entropy > 0.7: market_state = "中度混乱" signal = "市场分歧大" elif normalized_entropy > 0.5: market_state = "适度有序" signal = "有一定规律可循" else: market_state = "高度有序" signal = "趋势明显,易于预测" # 熵值变化趋势 entropy_trend = "增加" if recent_normalized > normalized_entropy else "减少" desc = (f"价格熵值={entropy:.2f}(标准化{normalized_entropy:.2f})," f"{market_state},{signal}。近期熵值{entropy_trend}") return { "success": True, "data": { "entropy": round(entropy, 3), "normalized_entropy": round(normalized_entropy, 3), "recent_entropy": round(recent_normalized, 3), "entropy_trend": entropy_trend, "market_state": market_state, "signal": signal, "description": desc, "code": code, "analysis_days": days } } except Exception as e: logger.error(f"[Entropy] 计算失败: {e}", exc_info=True) return {"success": False, "error": str(e)} # ==================== 工具注册(供MCP使用) ==================== # 包装函数:将 args 字典解构为函数参数 # MCP 调用时传入的是 args: Dict[str, Any],需要解构后调用实际函数 async def _wrap_get_macd_signal(args: Dict[str, Any]): return await get_macd_signal(args.get("code"), args.get("days", 60)) async def _wrap_check_oscillator_status(args: Dict[str, Any]): return await check_oscillator_status(args.get("code"), args.get("days", 60)) async def _wrap_analyze_bollinger_bands(args: Dict[str, Any]): return await analyze_bollinger_bands(args.get("code"), args.get("days", 60), args.get("period", 20)) async def _wrap_calc_stop_loss_atr(args: Dict[str, Any]): return await calc_stop_loss_atr(args.get("code"), args.get("days", 30), args.get("atr_multiplier", 2.0)) async def _wrap_analyze_market_heat(args: Dict[str, Any]): return await analyze_market_heat(args.get("code"), args.get("days", 30)) async def _wrap_check_volume_price_divergence(args: Dict[str, Any]): return await check_volume_price_divergence(args.get("code"), args.get("days", 20)) async def _wrap_check_new_high_breakout(args: Dict[str, Any]): return await check_new_high_breakout(args.get("code"), args.get("days", 60)) async def _wrap_identify_candlestick_pattern(args: Dict[str, Any]): return await identify_candlestick_pattern(args.get("code"), args.get("days", 10)) async def _wrap_find_price_gaps(args: Dict[str, Any]): return await find_price_gaps(args.get("code"), args.get("days", 30)) async def _wrap_calc_max_drawdown(args: Dict[str, Any]): return await calc_max_drawdown(args.get("code"), args.get("days", 250)) async def _wrap_check_valuation_rank(args: Dict[str, Any]): return await check_valuation_rank(args.get("code"), args.get("years", 3)) async def _wrap_calc_price_zscore(args: Dict[str, Any]): return await calc_price_zscore(args.get("code"), args.get("period", 60)) async def _wrap_calc_market_profile_vpoc(args: Dict[str, Any]): return await calc_market_profile_vpoc(args.get("code"), args.get("date")) async def _wrap_calc_realized_volatility(args: Dict[str, Any]): return await calc_realized_volatility(args.get("code"), args.get("date")) async def _wrap_analyze_buying_pressure(args: Dict[str, Any]): return await analyze_buying_pressure(args.get("code"), args.get("date")) async def _wrap_get_comprehensive_analysis(args: Dict[str, Any]): return await get_comprehensive_analysis(args.get("code")) async def _wrap_calc_rsi_divergence(args: Dict[str, Any]): return await calc_rsi_divergence(args.get("code"), args.get("days", 60), args.get("rsi_period", 14)) async def _wrap_calc_bollinger_squeeze(args: Dict[str, Any]): return await calc_bollinger_squeeze(args.get("code"), args.get("days", 60), args.get("period", 20)) async def _wrap_analyze_obv_trend(args: Dict[str, Any]): return await analyze_obv_trend(args.get("code"), args.get("days", 60)) async def _wrap_calc_amihud_illiquidity(args: Dict[str, Any]): return await calc_amihud_illiquidity(args.get("code"), args.get("days", 20)) async def _wrap_calc_parkinson_volatility(args: Dict[str, Any]): return await calc_parkinson_volatility(args.get("code"), args.get("date")) async def _wrap_calc_trend_slope(args: Dict[str, Any]): return await calc_trend_slope(args.get("code"), args.get("days", 20)) async def _wrap_calc_hurst_exponent(args: Dict[str, Any]): return await calc_hurst_exponent(args.get("code"), args.get("days", 100)) async def _wrap_test_cointegration(args: Dict[str, Any]): return await test_cointegration(args.get("code1"), args.get("code2"), args.get("days", 250)) async def _wrap_calc_kelly_position(args: Dict[str, Any]): return await calc_kelly_position(args.get("win_rate"), args.get("win_loss_ratio"), args.get("max_position", 0.25)) async def _wrap_search_similar_kline(args: Dict[str, Any]): return await search_similar_kline(args.get("code"), args.get("lookback", 10), args.get("top_n", 5)) async def _wrap_decompose_trend_simple(args: Dict[str, Any]): return await decompose_trend_simple(args.get("code"), args.get("days", 120)) async def _wrap_calc_price_entropy(args: Dict[str, Any]): return await calc_price_entropy(args.get("code"), args.get("days", 60), args.get("bins", 10)) QUANT_TOOLS = { # 经典技术指标 "get_macd_signal": _wrap_get_macd_signal, "check_oscillator_status": _wrap_check_oscillator_status, "analyze_bollinger_bands": _wrap_analyze_bollinger_bands, "calc_stop_loss_atr": _wrap_calc_stop_loss_atr, # 资金与情绪 "analyze_market_heat": _wrap_analyze_market_heat, "check_volume_price_divergence": _wrap_check_volume_price_divergence, # 形态与突破 "check_new_high_breakout": _wrap_check_new_high_breakout, "identify_candlestick_pattern": _wrap_identify_candlestick_pattern, "find_price_gaps": _wrap_find_price_gaps, # 风险与估值 "calc_max_drawdown": _wrap_calc_max_drawdown, "check_valuation_rank": _wrap_check_valuation_rank, "calc_price_zscore": _wrap_calc_price_zscore, # 分钟级高阶算子 "calc_market_profile_vpoc": _wrap_calc_market_profile_vpoc, "calc_realized_volatility": _wrap_calc_realized_volatility, "analyze_buying_pressure": _wrap_analyze_buying_pressure, # 综合分析 "get_comprehensive_analysis": _wrap_get_comprehensive_analysis, # ==================== 新增12个因子 ==================== # RSI背离检测 "calc_rsi_divergence": _wrap_calc_rsi_divergence, # 布林带挤压 "calc_bollinger_squeeze": _wrap_calc_bollinger_squeeze, # OBV能量潮独立分析 "analyze_obv_trend": _wrap_analyze_obv_trend, # Amihud非流动性因子 "calc_amihud_illiquidity": _wrap_calc_amihud_illiquidity, # 帕金森波动率 "calc_parkinson_volatility": _wrap_calc_parkinson_volatility, # 趋势线性回归斜率 "calc_trend_slope": _wrap_calc_trend_slope, # Hurst指数 "calc_hurst_exponent": _wrap_calc_hurst_exponent, # 协整性测试 "test_cointegration": _wrap_test_cointegration, # 凯利公式仓位 "calc_kelly_position": _wrap_calc_kelly_position, # 相似K线检索 "search_similar_kline": _wrap_search_similar_kline, # 趋势分解(简化版,不依赖Prophet) "decompose_trend_simple": _wrap_decompose_trend_simple, # 价格熵值计算 "calc_price_entropy": _wrap_calc_price_entropy, }