diff --git a/__pycache__/mcp_database.cpython-310.pyc b/__pycache__/mcp_database.cpython-310.pyc new file mode 100644 index 00000000..e6d84f78 Binary files /dev/null and b/__pycache__/mcp_database.cpython-310.pyc differ diff --git a/__pycache__/mcp_quant.cpython-310.pyc b/__pycache__/mcp_quant.cpython-310.pyc new file mode 100644 index 00000000..f98f4243 Binary files /dev/null and b/__pycache__/mcp_quant.cpython-310.pyc differ diff --git a/mcp_database.py b/mcp_database.py index 1b099bc7..71f81d05 100644 --- a/mcp_database.py +++ b/mcp_database.py @@ -836,11 +836,22 @@ async def get_stock_minute_data( try: client = get_clickhouse_client() - # 标准化股票代码(去除后缀) - stock_code = code.split('.')[0] if '.' in code else code + # 标准化股票代码:ClickHouse 分钟数据使用带后缀格式 + # 6开头 -> .SH (上海), 0/3开头 -> .SZ (深圳), 其他 -> .BJ (北京) + if '.' in code: + # 已经有后缀,直接使用 + stock_code = code + else: + # 需要添加后缀 + if code.startswith('6'): + stock_code = f"{code}.SH" + elif code.startswith('0') or code.startswith('3'): + stock_code = f"{code}.SZ" + else: + stock_code = f"{code}.BJ" - # 构建查询 - query = """ + # 构建查询 - 使用字符串格式化(ClickHouse 参数化语法兼容性问题) + query = f""" SELECT code, timestamp, @@ -851,24 +862,19 @@ async def get_stock_minute_data( volume, amt FROM stock_minute - WHERE code = %(code)s + WHERE code = '{stock_code}' """ - params = {'code': stock_code} - if start_time: - query += " AND timestamp >= %(start_time)s" - params['start_time'] = start_time + query += f" AND timestamp >= '{start_time}'" if end_time: - query += " AND timestamp <= %(end_time)s" - params['end_time'] = end_time + query += f" AND timestamp <= '{end_time}'" - query += " ORDER BY timestamp DESC LIMIT %(limit)s" - params['limit'] = limit + query += f" ORDER BY timestamp DESC LIMIT {limit}" # 执行查询 - result = client.execute(query, params, with_column_types=True) + result = client.execute(query, with_column_types=True) rows = result[0] columns = [col[0] for col in result[1]] diff --git a/mcp_quant.py b/mcp_quant.py new file mode 100644 index 00000000..71733a1f --- /dev/null +++ b/mcp_quant.py @@ -0,0 +1,2692 @@ +""" +MCP Quant - 量化因子计算模块 +基于日线(MySQL ea_trade)和分钟频(ClickHouse stock_minute)数据计算技术指标和量化因子 + +数据源: +- MySQL ea_trade: OHLCV, 换手率, 涨跌幅, PE等日线数据 +- ClickHouse stock_minute: 分钟级 OHLCV 数据 + +设计原则: +1. 返回"状态"而非原始数值 - 便于大模型理解 +2. 支持批量计算 - 提高效率 +3. 错误优雅降级 - 数据不足时返回 None 而非报错 +""" + +import numpy as np +import pandas as pd +from typing import Dict, List, Any, Optional, Tuple, Literal +from datetime import datetime, timedelta +from dataclasses import dataclass +from enum import Enum +import logging +import asyncio + +# 导入数据库模块 +import mcp_database as db + +logger = logging.getLogger(__name__) + + +# ==================== 信号枚举定义 ==================== + +class TrendSignal(str, Enum): + """趋势信号""" + STRONG_BULLISH = "强势上涨" + BULLISH = "上涨" + NEUTRAL = "震荡" + BEARISH = "下跌" + STRONG_BEARISH = "强势下跌" + + +class MACDSignal(str, Enum): + """MACD信号""" + GOLDEN_CROSS = "金叉" + DEATH_CROSS = "死叉" + BULLISH_DIVERGENCE = "底背离" + BEARISH_DIVERGENCE = "顶背离" + MOMENTUM_INCREASING = "动能增强" + MOMENTUM_DECREASING = "动能减弱" + NEUTRAL = "中性" + + +class OscillatorZone(str, Enum): + """超买超卖区域""" + OVERBOUGHT = "超买" + OVERSOLD = "超卖" + NEUTRAL = "中性" + + +class BollingerSignal(str, Enum): + """布林带信号""" + ABOVE_UPPER = "触及上轨压力" + BELOW_LOWER = "触及下轨支撑" + ABOVE_MIDDLE = "中轨之上强势" + BELOW_MIDDLE = "中轨之下弱势" + SQUEEZE = "布林带收窄变盘在即" + EXPANSION = "布林带扩张趋势加速" + + +class VolumeSignal(str, Enum): + """量能信号""" + VOLUME_SURGE = "放量" + VOLUME_SHRINK = "缩量" + VOLUME_PRICE_DIVERGENCE = "量价背离" + ACCUMULATION = "主力吸筹" + DISTRIBUTION = "主力出货" + NORMAL = "正常" + + +class BreakoutSignal(str, Enum): + """突破信号""" + NEW_HIGH_BREAKOUT = "突破新高" + NEW_LOW_BREAKDOWN = "跌破新低" + RESISTANCE_TEST = "测试压力位" + SUPPORT_TEST = "测试支撑位" + NO_SIGNAL = "无信号" + + +# ==================== 数据类定义 ==================== + +@dataclass +class MACDResult: + """MACD计算结果""" + signal: MACDSignal + dif: float + dea: float + macd_bar: float + trend_strength: str # 弱/中/强 + description: str + + +@dataclass +class OscillatorResult: + """超买超卖指标结果""" + zone: OscillatorZone + rsi_value: float + kdj_k: float + kdj_d: float + kdj_j: float + description: str + + +@dataclass +class BollingerResult: + """布林带分析结果""" + signal: BollingerSignal + upper: float + middle: float + lower: float + bandwidth: float # 带宽百分比 + position: str # 价格在通道中的位置描述 + description: str + + +@dataclass +class ATRResult: + """ATR计算结果""" + atr: float + atr_percent: float # ATR占价格的百分比 + suggested_stop_loss: float # 建议止损价 + suggested_stop_loss_pct: float # 建议止损幅度 + volatility_level: str # 低/中/高 + description: str + + +@dataclass +class VolumeAnalysisResult: + """成交量分析结果""" + signal: VolumeSignal + turnover_rate: float + volume_ratio: float # 相对于MA5的量比 + obv_trend: str # OBV趋势 + heat_level: str # 冷门/正常/活跃/火热/极热 + description: str + + +@dataclass +class BreakoutResult: + """突破信号结果""" + signal: BreakoutSignal + high_20d: float + low_20d: float + high_60d: float + low_60d: float + distance_to_high: float # 距离20日高点的百分比 + distance_to_low: float # 距离20日低点的百分比 + description: str + + +@dataclass +class RiskMetricsResult: + """风险指标结果""" + max_drawdown: float # 最大回撤 + max_drawdown_period: str # 最大回撤发生期间 + sharpe_ratio: float # 夏普比率 + volatility: float # 波动率 + risk_level: str # 低风险/中风险/高风险 + description: str + + +@dataclass +class ValuationResult: + """估值分析结果""" + pe_current: float + pe_percentile: float # PE历史百分位 + pb_current: Optional[float] + peg_ratio: Optional[float] + valuation_level: str # 低估/合理/高估 + description: str + + +# ==================== 核心计算函数 ==================== + +def _calc_ema(data: np.ndarray, period: int) -> np.ndarray: + """计算EMA指数移动平均""" + ema = np.zeros_like(data) + ema[0] = data[0] + multiplier = 2 / (period + 1) + for i in range(1, len(data)): + ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1] + return ema + + +def _calc_sma(data: np.ndarray, period: int) -> np.ndarray: + """计算SMA简单移动平均""" + return pd.Series(data).rolling(window=period, min_periods=1).mean().values + + +def _calc_std(data: np.ndarray, period: int) -> np.ndarray: + """计算滚动标准差""" + return pd.Series(data).rolling(window=period, min_periods=1).std().values + + +def _calc_rsi(close: np.ndarray, period: int = 14) -> np.ndarray: + """计算RSI相对强弱指标""" + delta = np.diff(close) + gain = np.where(delta > 0, delta, 0) + loss = np.where(delta < 0, -delta, 0) + + avg_gain = pd.Series(gain).rolling(window=period, min_periods=1).mean().values + avg_loss = pd.Series(loss).rolling(window=period, min_periods=1).mean().values + + rs = np.where(avg_loss != 0, avg_gain / avg_loss, 100) + rsi = 100 - (100 / (1 + rs)) + + # 补齐第一个值 + return np.insert(rsi, 0, 50) + + +def _calc_kdj(high: np.ndarray, low: np.ndarray, close: np.ndarray, + n: int = 9, m1: int = 3, m2: int = 3) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """计算KDJ指标""" + length = len(close) + rsv = np.zeros(length) + k = np.zeros(length) + d = np.zeros(length) + j = np.zeros(length) + + for i in range(length): + start = max(0, i - n + 1) + high_n = np.max(high[start:i+1]) + low_n = np.min(low[start:i+1]) + + if high_n != low_n: + rsv[i] = (close[i] - low_n) / (high_n - low_n) * 100 + else: + rsv[i] = 50 + + if i == 0: + k[i] = 50 + d[i] = 50 + else: + k[i] = (m1 - 1) / m1 * k[i-1] + 1 / m1 * rsv[i] + d[i] = (m2 - 1) / m2 * d[i-1] + 1 / m2 * k[i] + + j[i] = 3 * k[i] - 2 * d[i] + + return k, d, j + + +def _calc_macd(close: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """计算MACD指标""" + ema_fast = _calc_ema(close, fast) + ema_slow = _calc_ema(close, slow) + dif = ema_fast - ema_slow + dea = _calc_ema(dif, signal) + macd_bar = 2 * (dif - dea) + return dif, dea, macd_bar + + +def _calc_bollinger(close: np.ndarray, period: int = 20, std_dev: float = 2) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """计算布林带""" + middle = _calc_sma(close, period) + std = _calc_std(close, period) + upper = middle + std_dev * std + lower = middle - std_dev * std + return upper, middle, lower + + +def _calc_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> np.ndarray: + """计算ATR真实波幅""" + tr = np.zeros(len(close)) + tr[0] = high[0] - low[0] + + for i in range(1, len(close)): + tr[i] = max( + high[i] - low[i], + abs(high[i] - close[i-1]), + abs(low[i] - close[i-1]) + ) + + atr = _calc_sma(tr, period) + return atr + + +def _calc_obv(close: np.ndarray, volume: np.ndarray) -> np.ndarray: + """计算OBV能量潮""" + obv = np.zeros(len(close)) + obv[0] = volume[0] + + for i in range(1, len(close)): + if close[i] > close[i-1]: + obv[i] = obv[i-1] + volume[i] + elif close[i] < close[i-1]: + obv[i] = obv[i-1] - volume[i] + else: + obv[i] = obv[i-1] + + return obv + + +def _detect_divergence(price: np.ndarray, indicator: np.ndarray, lookback: int = 20) -> Optional[str]: + """检测背离""" + if len(price) < lookback: + return None + + recent_price = price[-lookback:] + recent_indicator = indicator[-lookback:] + + # 找到价格的高点和低点 + price_high_idx = np.argmax(recent_price) + price_low_idx = np.argmin(recent_price) + + # 检查顶背离:价格创新高但指标没创新高 + if price_high_idx > lookback // 2: # 高点在后半段 + prev_high = np.max(recent_price[:lookback//2]) + if recent_price[price_high_idx] > prev_high: + prev_indicator_high = np.max(recent_indicator[:lookback//2]) + if recent_indicator[price_high_idx] < prev_indicator_high: + return "bearish_divergence" + + # 检查底背离:价格创新低但指标没创新低 + if price_low_idx > lookback // 2: # 低点在后半段 + prev_low = np.min(recent_price[:lookback//2]) + if recent_price[price_low_idx] < prev_low: + prev_indicator_low = np.min(recent_indicator[:lookback//2]) + if recent_indicator[price_low_idx] > prev_indicator_low: + return "bullish_divergence" + + return None + + +# ==================== MCP 工具函数 ==================== + +async def get_macd_signal(code: str, days: int = 60) -> Dict[str, Any]: + """ + 获取MACD趋势判定信号 + + Args: + code: 股票代码 + days: 分析天数,默认60天 + + Returns: + MACD信号分析结果 + """ + try: + # 获取日线数据 + trade_data = await db.get_stock_trade_data(code, limit=days + 30) + + if len(trade_data) < 35: + return {"success": False, "error": "数据不足,需要至少35个交易日"} + + # 按日期排序(从旧到新) + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算MACD + dif, dea, macd_bar = _calc_macd(close) + + # 获取最新值 + current_dif = dif[-1] + current_dea = dea[-1] + current_bar = macd_bar[-1] + prev_bar = macd_bar[-2] if len(macd_bar) > 1 else 0 + + # 判断信号 + signal = MACDSignal.NEUTRAL + + # 金叉/死叉判断 + if len(dif) >= 2: + if dif[-1] > dea[-1] and dif[-2] <= dea[-2]: + signal = MACDSignal.GOLDEN_CROSS + elif dif[-1] < dea[-1] and dif[-2] >= dea[-2]: + signal = MACDSignal.DEATH_CROSS + + # 动能判断 + if signal == MACDSignal.NEUTRAL: + if current_bar > 0 and current_bar > prev_bar: + signal = MACDSignal.MOMENTUM_INCREASING + elif current_bar < 0 and current_bar < prev_bar: + signal = MACDSignal.MOMENTUM_DECREASING + + # 背离检测 + divergence = _detect_divergence(close, dif, lookback=20) + if divergence == "bullish_divergence": + signal = MACDSignal.BULLISH_DIVERGENCE + elif divergence == "bearish_divergence": + signal = MACDSignal.BEARISH_DIVERGENCE + + # 趋势强度 + bar_abs = abs(current_bar) + if bar_abs < 0.5: + trend_strength = "弱" + elif bar_abs < 1.5: + trend_strength = "中" + else: + trend_strength = "强" + + # 生成描述 + descriptions = { + MACDSignal.GOLDEN_CROSS: f"MACD金叉形成,DIF上穿DEA,短期看涨信号,动能{trend_strength}", + MACDSignal.DEATH_CROSS: f"MACD死叉形成,DIF下穿DEA,短期看跌信号,动能{trend_strength}", + MACDSignal.BULLISH_DIVERGENCE: "出现底背离,股价创新低但MACD没创新低,可能反转向上", + MACDSignal.BEARISH_DIVERGENCE: "出现顶背离,股价创新高但MACD没创新高,上涨动能衰竭", + MACDSignal.MOMENTUM_INCREASING: f"红柱放大,上涨动能增强,趋势强度:{trend_strength}", + MACDSignal.MOMENTUM_DECREASING: f"绿柱放大,下跌动能增强,趋势强度:{trend_strength}", + MACDSignal.NEUTRAL: "MACD处于中性状态,无明显信号", + } + + return { + "success": True, + "data": { + "signal": signal.value, + "dif": round(current_dif, 4), + "dea": round(current_dea, 4), + "macd_bar": round(current_bar, 4), + "trend_strength": trend_strength, + "description": descriptions[signal], + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[MACD] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def check_oscillator_status(code: str, days: int = 60) -> Dict[str, Any]: + """ + 检查KDJ/RSI超买超卖状态 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 超买超卖状态分析 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + high = np.array([float(d['high_price']) for d in trade_data]) + low = np.array([float(d['low_price']) for d in trade_data]) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算RSI + rsi = _calc_rsi(close, 14) + current_rsi = rsi[-1] + + # 计算KDJ + k, d, j = _calc_kdj(high, low, close) + current_k = k[-1] + current_d = d[-1] + current_j = j[-1] + + # 判断区域 + zone = OscillatorZone.NEUTRAL + + # RSI超买超卖判断 + if current_rsi > 80 or current_j > 100: + zone = OscillatorZone.OVERBOUGHT + elif current_rsi < 20 or current_j < 0: + zone = OscillatorZone.OVERSOLD + + # 生成描述 + if zone == OscillatorZone.OVERBOUGHT: + desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超买区域,短期回调风险较大" + elif zone == OscillatorZone.OVERSOLD: + desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超卖区域,可能存在反弹机会" + else: + desc = f"RSI={current_rsi:.1f},KDJ(K={current_k:.1f},D={current_d:.1f},J={current_j:.1f}),处于中性区域" + + return { + "success": True, + "data": { + "zone": zone.value, + "rsi_value": round(current_rsi, 2), + "kdj_k": round(current_k, 2), + "kdj_d": round(current_d, 2), + "kdj_j": round(current_j, 2), + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[Oscillator] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def analyze_bollinger_bands(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]: + """ + 分析布林带通道 + + Args: + code: 股票代码 + days: 分析天数 + period: 布林带周期,默认20 + + Returns: + 布林带分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < period + 5: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算布林带 + upper, middle, lower = _calc_bollinger(close, period) + + current_price = close[-1] + current_upper = upper[-1] + current_middle = middle[-1] + current_lower = lower[-1] + + # 计算带宽 + bandwidth = (current_upper - current_lower) / current_middle * 100 + prev_bandwidth = (upper[-5] - lower[-5]) / middle[-5] * 100 if len(upper) > 5 else bandwidth + + # 判断信号 + signal = BollingerSignal.ABOVE_MIDDLE if current_price > current_middle else BollingerSignal.BELOW_MIDDLE + + # 检查是否触及上下轨 + if current_price >= current_upper * 0.99: + signal = BollingerSignal.ABOVE_UPPER + elif current_price <= current_lower * 1.01: + signal = BollingerSignal.BELOW_LOWER + + # 检查布林带收窄/扩张 + if bandwidth < prev_bandwidth * 0.7: + signal = BollingerSignal.SQUEEZE + elif bandwidth > prev_bandwidth * 1.3: + signal = BollingerSignal.EXPANSION + + # 价格位置描述 + position_pct = (current_price - current_lower) / (current_upper - current_lower) * 100 + position = f"价格位于布林带{position_pct:.0f}%位置" + + # 生成描述 + descriptions = { + BollingerSignal.ABOVE_UPPER: f"股价触及布林带上轨({current_upper:.2f}),面临短期压力", + BollingerSignal.BELOW_LOWER: f"股价触及布林带下轨({current_lower:.2f}),可能存在支撑", + BollingerSignal.ABOVE_MIDDLE: f"股价在中轨({current_middle:.2f})之上运行,整体偏强", + BollingerSignal.BELOW_MIDDLE: f"股价在中轨({current_middle:.2f})之下运行,整体偏弱", + BollingerSignal.SQUEEZE: f"布林带收窄(带宽{bandwidth:.1f}%),变盘在即,关注突破方向", + BollingerSignal.EXPANSION: f"布林带扩张(带宽{bandwidth:.1f}%),趋势加速中", + } + + return { + "success": True, + "data": { + "signal": signal.value, + "upper": round(current_upper, 2), + "middle": round(current_middle, 2), + "lower": round(current_lower, 2), + "bandwidth": round(bandwidth, 2), + "position": position, + "description": descriptions[signal], + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[Bollinger] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_stop_loss_atr(code: str, days: int = 30, atr_multiplier: float = 2.0) -> Dict[str, Any]: + """ + 使用ATR计算止损位 + + Args: + code: 股票代码 + days: 分析天数 + atr_multiplier: ATR倍数,默认2倍 + + Returns: + ATR止损建议 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 15: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + high = np.array([float(d['high_price']) for d in trade_data]) + low = np.array([float(d['low_price']) for d in trade_data]) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算ATR + atr = _calc_atr(high, low, close, 14) + current_atr = atr[-1] + current_price = close[-1] + + # 计算ATR百分比 + atr_percent = current_atr / current_price * 100 + + # 计算止损价 + stop_loss = current_price - atr_multiplier * current_atr + stop_loss_pct = (current_price - stop_loss) / current_price * 100 + + # 波动级别 + if atr_percent < 2: + volatility_level = "低" + elif atr_percent < 4: + volatility_level = "中" + else: + volatility_level = "高" + + desc = (f"ATR={current_atr:.2f}(占价格{atr_percent:.1f}%),波动性{volatility_level}。" + f"建议止损位:{stop_loss:.2f}({atr_multiplier}倍ATR),止损幅度{stop_loss_pct:.1f}%") + + return { + "success": True, + "data": { + "atr": round(current_atr, 3), + "atr_percent": round(atr_percent, 2), + "suggested_stop_loss": round(stop_loss, 2), + "suggested_stop_loss_pct": round(stop_loss_pct, 2), + "volatility_level": volatility_level, + "current_price": round(current_price, 2), + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[ATR] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def analyze_market_heat(code: str, days: int = 30) -> Dict[str, Any]: + """ + 分析换手率活跃度和量能 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 市场热度分析 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 10: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + volume = np.array([float(d['volume']) for d in trade_data]) + turnover_rates = [float(d.get('turnover_rate', 0) or 0) for d in trade_data] + + current_turnover = turnover_rates[-1] + avg_turnover_5 = np.mean(turnover_rates[-5:]) + + # 计算量比 + volume_ma5 = _calc_sma(volume, 5) + volume_ratio = volume[-1] / volume_ma5[-1] if volume_ma5[-1] > 0 else 1 + + # 计算OBV + obv = _calc_obv(close, volume) + obv_ma5 = _calc_sma(obv, 5) + obv_trend = "上升" if obv[-1] > obv_ma5[-1] else "下降" + + # 换手率分级 + if current_turnover < 1: + heat_level = "冷门" + elif current_turnover < 3: + heat_level = "正常" + elif current_turnover < 7: + heat_level = "活跃" + elif current_turnover < 15: + heat_level = "火热" + else: + heat_level = "极热" + + # 判断信号 + signal = VolumeSignal.NORMAL + + if volume_ratio > 2: + signal = VolumeSignal.VOLUME_SURGE + elif volume_ratio < 0.5: + signal = VolumeSignal.VOLUME_SHRINK + + # 检查量价背离 + price_up = close[-1] > close[-5] if len(close) > 5 else False + volume_down = volume[-1] < volume[-5] if len(volume) > 5 else False + + if price_up and volume_down: + signal = VolumeSignal.VOLUME_PRICE_DIVERGENCE + + # OBV判断主力动向 + if obv_trend == "上升" and close[-1] <= close[-5]: + signal = VolumeSignal.ACCUMULATION + elif obv_trend == "下降" and close[-1] >= close[-5]: + signal = VolumeSignal.DISTRIBUTION + + desc = (f"换手率{current_turnover:.2f}%({heat_level}),量比{volume_ratio:.2f}," + f"OBV趋势{obv_trend}。{signal.value}") + + return { + "success": True, + "data": { + "signal": signal.value, + "turnover_rate": round(current_turnover, 2), + "avg_turnover_5d": round(avg_turnover_5, 2), + "volume_ratio": round(volume_ratio, 2), + "obv_trend": obv_trend, + "heat_level": heat_level, + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[MarketHeat] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def check_new_high_breakout(code: str, days: int = 60) -> Dict[str, Any]: + """ + 检查唐奇安通道突破(新高/新低突破) + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 突破信号分析 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days + 10) + + if len(trade_data) < 25: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + high = np.array([float(d['high_price']) for d in trade_data]) + low = np.array([float(d['low_price']) for d in trade_data]) + close = np.array([float(d['close_price']) for d in trade_data]) + + current_price = close[-1] + + # 计算20日和60日高低点 + high_20d = np.max(high[-21:-1]) if len(high) > 21 else np.max(high[:-1]) + low_20d = np.min(low[-21:-1]) if len(low) > 21 else np.min(low[:-1]) + high_60d = np.max(high[-61:-1]) if len(high) > 61 else np.max(high[:-1]) + low_60d = np.min(low[-61:-1]) if len(low) > 61 else np.min(low[:-1]) + + # 判断突破信号 + signal = BreakoutSignal.NO_SIGNAL + + if current_price > high_20d: + signal = BreakoutSignal.NEW_HIGH_BREAKOUT + elif current_price < low_20d: + signal = BreakoutSignal.NEW_LOW_BREAKDOWN + elif current_price > high_20d * 0.97: + signal = BreakoutSignal.RESISTANCE_TEST + elif current_price < low_20d * 1.03: + signal = BreakoutSignal.SUPPORT_TEST + + # 计算距离 + distance_to_high = (high_20d - current_price) / current_price * 100 + distance_to_low = (current_price - low_20d) / current_price * 100 + + descriptions = { + BreakoutSignal.NEW_HIGH_BREAKOUT: f"突破20日新高({high_20d:.2f}),海龟交易法则买入信号触发", + BreakoutSignal.NEW_LOW_BREAKDOWN: f"跌破20日新低({low_20d:.2f}),海龟交易法则卖出信号触发", + BreakoutSignal.RESISTANCE_TEST: f"接近20日高点({high_20d:.2f}),距离{distance_to_high:.1f}%,关注能否突破", + BreakoutSignal.SUPPORT_TEST: f"接近20日低点({low_20d:.2f}),距离{distance_to_low:.1f}%,关注能否守住", + BreakoutSignal.NO_SIGNAL: f"价格在20日通道内运行({low_20d:.2f} - {high_20d:.2f})", + } + + return { + "success": True, + "data": { + "signal": signal.value, + "high_20d": round(high_20d, 2), + "low_20d": round(low_20d, 2), + "high_60d": round(high_60d, 2), + "low_60d": round(low_60d, 2), + "current_price": round(current_price, 2), + "distance_to_high_pct": round(distance_to_high, 2), + "distance_to_low_pct": round(distance_to_low, 2), + "description": descriptions[signal], + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[Breakout] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_max_drawdown(code: str, days: int = 250) -> Dict[str, Any]: + """ + 计算最大回撤 + + Args: + code: 股票代码 + days: 分析天数,默认250天(约一年) + + Returns: + 最大回撤分析 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + dates = [d['TRADEDATE'] for d in trade_data] + + # 计算累计最大值 + running_max = np.maximum.accumulate(close) + + # 计算回撤 + drawdown = (running_max - close) / running_max * 100 + + # 找到最大回撤 + max_dd = np.max(drawdown) + max_dd_idx = np.argmax(drawdown) + + # 找到回撤开始的位置(峰值) + peak_idx = np.argmax(close[:max_dd_idx + 1]) if max_dd_idx > 0 else 0 + + # 回撤期间 + period_start = dates[peak_idx] if peak_idx < len(dates) else dates[0] + period_end = dates[max_dd_idx] if max_dd_idx < len(dates) else dates[-1] + + # 计算波动率(年化) + returns = np.diff(close) / close[:-1] + volatility = np.std(returns) * np.sqrt(252) * 100 + + # 计算夏普比率(假设无风险利率2%) + avg_return = np.mean(returns) * 252 * 100 + sharpe_ratio = (avg_return - 2) / volatility if volatility > 0 else 0 + + # 风险等级 + if max_dd < 10: + risk_level = "低风险" + elif max_dd < 25: + risk_level = "中风险" + else: + risk_level = "高风险" + + desc = (f"最大回撤{max_dd:.1f}%(从{period_start}到{period_end})," + f"年化波动率{volatility:.1f}%,夏普比率{sharpe_ratio:.2f},{risk_level}") + + return { + "success": True, + "data": { + "max_drawdown": round(max_dd, 2), + "max_drawdown_period": f"{period_start} 至 {period_end}", + "sharpe_ratio": round(sharpe_ratio, 2), + "volatility": round(volatility, 2), + "risk_level": risk_level, + "description": desc, + "code": code, + "analysis_days": len(trade_data) + } + } + + except Exception as e: + logger.error(f"[MaxDrawdown] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def check_valuation_rank(code: str, years: int = 3) -> Dict[str, Any]: + """ + 检查历史PE/PB百分位估值 + + Args: + code: 股票代码 + years: 历史年数,默认3年 + + Returns: + 估值分析结果 + """ + try: + days = years * 250 + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 60: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + # 提取PE数据 + pe_values = [float(d.get('pe_ratio', 0) or 0) for d in trade_data if d.get('pe_ratio')] + pe_values = [pe for pe in pe_values if 0 < pe < 1000] # 过滤异常值 + + if len(pe_values) < 30: + return {"success": False, "error": "PE数据不足"} + + current_pe = pe_values[-1] + + # 计算百分位 + pe_percentile = (np.sum(np.array(pe_values) < current_pe) / len(pe_values)) * 100 + + # 获取财务数据计算PEG + peg_ratio = None + try: + financial_data = await db.get_stock_financial_index(code, limit=2) + if len(financial_data) >= 2: + growth_rate = financial_data[0].get('profit_growth', 0) or 0 + if growth_rate > 0: + peg_ratio = current_pe / growth_rate + except: + pass + + # 估值水平判断 + if pe_percentile < 20: + valuation_level = "低估" + elif pe_percentile < 50: + valuation_level = "较低" + elif pe_percentile < 80: + valuation_level = "合理" + else: + valuation_level = "高估" + + # PEG修正 + peg_desc = "" + if peg_ratio is not None: + if peg_ratio < 1: + peg_desc = f",PEG={peg_ratio:.2f}<1(成长性强)" + elif peg_ratio < 2: + peg_desc = f",PEG={peg_ratio:.2f}(合理)" + else: + peg_desc = f",PEG={peg_ratio:.2f}(偏贵)" + + desc = (f"当前PE={current_pe:.1f},处于近{years}年{pe_percentile:.0f}%分位," + f"估值{valuation_level}{peg_desc}") + + return { + "success": True, + "data": { + "pe_current": round(current_pe, 2), + "pe_percentile": round(pe_percentile, 1), + "pe_min": round(min(pe_values), 2), + "pe_max": round(max(pe_values), 2), + "pe_median": round(np.median(pe_values), 2), + "peg_ratio": round(peg_ratio, 2) if peg_ratio else None, + "valuation_level": valuation_level, + "description": desc, + "code": code, + "analysis_years": years + } + } + + except Exception as e: + logger.error(f"[Valuation] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_price_zscore(code: str, period: int = 60) -> Dict[str, Any]: + """ + 计算价格Z-Score(乖离率标准化) + + Args: + code: 股票代码 + period: 均线周期,默认60日 + + Returns: + Z-Score分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=period + 10) + + if len(trade_data) < period: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算均线和标准差 + ma = _calc_sma(close, period) + std = _calc_std(close, period) + + current_price = close[-1] + current_ma = ma[-1] + current_std = std[-1] + + # 计算Z-Score + zscore = (current_price - current_ma) / current_std if current_std > 0 else 0 + + # 计算乖离率 + bias = (current_price - current_ma) / current_ma * 100 + + # 判断状态 + if zscore > 2: + status = "极度偏高" + probability = "历史上此位置回落概率约95%" + elif zscore > 1: + status = "偏高" + probability = "历史上此位置回落概率约68%" + elif zscore < -2: + status = "极度偏低" + probability = "历史上此位置反弹概率约95%" + elif zscore < -1: + status = "偏低" + probability = "历史上此位置反弹概率约68%" + else: + status = "正常" + probability = "价格在正常波动范围内" + + desc = f"Z-Score={zscore:.2f},乖离率{bias:+.1f}%,{status}。{probability}" + + return { + "success": True, + "data": { + "zscore": round(zscore, 3), + "bias": round(bias, 2), + "ma_period": period, + "current_ma": round(current_ma, 2), + "current_price": round(current_price, 2), + "status": status, + "mean_reversion_hint": probability, + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[ZScore] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +# ==================== 分钟级高阶算子(基于ClickHouse) ==================== + +async def calc_market_profile_vpoc(code: str, date: str = None) -> Dict[str, Any]: + """ + 计算市场轮廓VPOC(成交量最大的价格档位) + + Args: + code: 股票代码 + date: 日期,格式 YYYY-MM-DD(可选,默认使用最近交易日) + + Returns: + VPOC分析结果 + """ + try: + # 获取分钟数据(如果未指定日期,获取最近的数据) + if date: + minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) + else: + # 获取最近的分钟数据 + minute_data = await db.get_stock_minute_data(code, limit=500) + + if len(minute_data) < 10: + return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} + + # 从数据中提取实际日期 + if not date and minute_data: + date = minute_data[0].get('timestamp', '')[:10] + + # 按价格区间统计成交量 + prices = np.array([float(d['close']) for d in minute_data]) + volumes = np.array([float(d['volume']) for d in minute_data]) + + # 创建价格档位(将价格四舍五入到0.01) + price_bins = np.round(prices, 2) + + # 统计每个价格档位的成交量 + unique_prices, indices = np.unique(price_bins, return_inverse=True) + volume_by_price = np.zeros(len(unique_prices)) + for i, idx in enumerate(indices): + volume_by_price[idx] += volumes[i] + + # 找到VPOC + vpoc_idx = np.argmax(volume_by_price) + vpoc_price = unique_prices[vpoc_idx] + vpoc_volume = volume_by_price[vpoc_idx] + + # 当前价格 + current_price = prices[-1] + + # 判断位置关系 + if current_price > vpoc_price * 1.01: + position = "价格在VPOC之上,短期支撑位" + elif current_price < vpoc_price * 0.99: + position = "价格在VPOC之下,短期压力位" + else: + position = "价格在VPOC附近,成交密集区" + + desc = f"VPOC价格={vpoc_price:.2f},成交量占比{vpoc_volume/np.sum(volumes)*100:.1f}%。{position}" + + return { + "success": True, + "data": { + "vpoc_price": round(vpoc_price, 2), + "vpoc_volume": int(vpoc_volume), + "vpoc_volume_pct": round(vpoc_volume / np.sum(volumes) * 100, 1), + "current_price": round(current_price, 2), + "position": position, + "description": desc, + "code": code, + "date": date + } + } + + except Exception as e: + logger.error(f"[VPOC] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_realized_volatility(code: str, date: str = None) -> Dict[str, Any]: + """ + 计算已实现波动率(Realized Volatility) + + Args: + code: 股票代码 + date: 日期(可选,默认使用最近交易日) + + Returns: + RV分析结果 + """ + try: + # 获取分钟数据 + if date: + minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) + else: + minute_data = await db.get_stock_minute_data(code, limit=500) + + if len(minute_data) < 30: + return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} + + # 从数据中提取实际日期 + if not date and minute_data: + date = minute_data[0].get('timestamp', '')[:10] + + # 按时间排序 + minute_data = sorted(minute_data, key=lambda x: x['timestamp']) + + close = np.array([float(d['close']) for d in minute_data]) + + # 计算分钟收益率 + returns = np.diff(np.log(close)) + + # 计算已实现波动率(日内) + rv_intraday = np.sqrt(np.sum(returns ** 2)) * 100 + + # 年化波动率(假设一天240分钟,一年250天) + rv_annual = rv_intraday * np.sqrt(250) * 100 + + # 波动率水平 + if rv_intraday < 1: + vol_level = "低波动" + elif rv_intraday < 3: + vol_level = "中波动" + else: + vol_level = "高波动" + + desc = f"日内已实现波动率{rv_intraday:.2f}%(年化约{rv_annual:.0f}%),{vol_level}" + + return { + "success": True, + "data": { + "rv_intraday": round(rv_intraday, 3), + "rv_annualized": round(rv_annual, 1), + "volatility_level": vol_level, + "description": desc, + "code": code, + "date": date + } + } + + except Exception as e: + logger.error(f"[RV] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def analyze_buying_pressure(code: str, date: str = None) -> Dict[str, Any]: + """ + 分析买卖压力失衡 + + Args: + code: 股票代码 + date: 日期(可选,默认使用最近交易日) + + Returns: + 买卖压力分析 + """ + try: + # 获取分钟数据 + if date: + minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) + else: + minute_data = await db.get_stock_minute_data(code, limit=500) + + if len(minute_data) < 30: + return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} + + # 从数据中提取实际日期 + if not date and minute_data: + date = minute_data[0].get('timestamp', '')[:10] + + minute_data = sorted(minute_data, key=lambda x: x['timestamp']) + + high = np.array([float(d['high']) for d in minute_data]) + low = np.array([float(d['low']) for d in minute_data]) + close = np.array([float(d['close']) for d in minute_data]) + volume = np.array([float(d['volume']) for d in minute_data]) + + # 计算买卖压力指标 + # (Close - Low) - (High - Close),正值表示买压,负值表示卖压 + pressure = (close - low) - (high - close) + + # 成交量加权 + weighted_pressure = np.sum(pressure * volume) / np.sum(volume) + + # 计算买压占比 + total_range = high - low + buying_pct = np.where(total_range > 0, (close - low) / total_range, 0.5) + avg_buying_pct = np.mean(buying_pct) * 100 + + # 判断信号 + if weighted_pressure > 0.1: + signal = "强买压" + desc = f"盘中主力抢筹明显,买方占优,收盘偏向日内高点" + elif weighted_pressure < -0.1: + signal = "强卖压" + desc = f"盘中抛压沉重,卖方占优,收盘偏向日内低点" + else: + signal = "均衡" + desc = f"买卖双方博弈均衡,无明显方向" + + return { + "success": True, + "data": { + "pressure_signal": signal, + "weighted_pressure": round(weighted_pressure, 4), + "avg_buying_pct": round(avg_buying_pct, 1), + "description": desc, + "code": code, + "date": date + } + } + + except Exception as e: + logger.error(f"[BuyingPressure] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def check_volume_price_divergence(code: str, days: int = 20) -> Dict[str, Any]: + """ + 检测量价背离 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 量价背离分析 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 10: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + volume = np.array([float(d['volume']) for d in trade_data]) + + # 检查价格趋势 + price_trend = "up" if close[-1] > close[-5] else "down" + price_change = (close[-1] - close[-5]) / close[-5] * 100 + + # 检查成交量趋势 + vol_avg_recent = np.mean(volume[-5:]) + vol_avg_prev = np.mean(volume[-10:-5]) if len(volume) >= 10 else np.mean(volume[:-5]) + vol_trend = "up" if vol_avg_recent > vol_avg_prev else "down" + vol_change = (vol_avg_recent - vol_avg_prev) / vol_avg_prev * 100 if vol_avg_prev > 0 else 0 + + # 判断背离 + has_divergence = False + divergence_type = None + + if price_trend == "up" and vol_trend == "down": + has_divergence = True + divergence_type = "顶背离" + desc = f"股价上涨{price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,上涨动能不足,警惕回调" + elif price_trend == "down" and vol_trend == "down": + has_divergence = True + divergence_type = "底背离" + desc = f"股价下跌{-price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,抛压减弱,可能企稳" + else: + desc = f"量价配合正常,价格{price_trend},成交量{vol_trend}" + + return { + "success": True, + "data": { + "has_divergence": has_divergence, + "divergence_type": divergence_type, + "price_change_5d": round(price_change, 2), + "volume_change_5d": round(vol_change, 2), + "signal": "量价背离" if has_divergence else "量价正常", + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[VolPriceDivergence] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def identify_candlestick_pattern(code: str, days: int = 10) -> Dict[str, Any]: + """ + 识别K线组合形态 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + K线形态识别结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 3: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + patterns = [] + + # 获取最近3天的OHLC + for i in range(len(trade_data) - 2, len(trade_data)): + if i < 0: + continue + d = trade_data[i] + open_p = float(d['open_price']) + high = float(d['high_price']) + low = float(d['low_price']) + close = float(d['close_price']) + + body = close - open_p + upper_shadow = high - max(open_p, close) + lower_shadow = min(open_p, close) - low + body_size = abs(body) + + # 十字星 + if body_size < (high - low) * 0.1: + patterns.append(("十字星", "犹豫信号,可能变盘")) + + # 锤子线 + if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5: + patterns.append(("锤子线", "底部反转信号")) + + # 上吊线 + if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5 and body < 0: + patterns.append(("上吊线", "顶部反转信号")) + + # 检查多日形态(需要至少3天数据) + if len(trade_data) >= 3: + d1, d2, d3 = trade_data[-3], trade_data[-2], trade_data[-1] + + close1 = float(d1['close_price']) + open1 = float(d1['open_price']) + close2 = float(d2['close_price']) + open2 = float(d2['open_price']) + close3 = float(d3['close_price']) + open3 = float(d3['open_price']) + + # 红三兵 + if (close1 > open1 and close2 > open2 and close3 > open3 and + close2 > close1 and close3 > close2): + patterns.append(("红三兵", "强势上涨信号")) + + # 三只乌鸦 + if (close1 < open1 and close2 < open2 and close3 < open3 and + close2 < close1 and close3 < close2): + patterns.append(("三只乌鸦", "强势下跌信号")) + + # 早晨之星 + body1 = abs(close1 - open1) + body2 = abs(close2 - open2) + body3 = abs(close3 - open3) + + if (close1 < open1 and body2 < body1 * 0.3 and close3 > open3 and + close3 > (open1 + close1) / 2): + patterns.append(("早晨之星", "底部反转强信号")) + + # 黄昏之星 + if (close1 > open1 and body2 < body1 * 0.3 and close3 < open3 and + close3 < (open1 + close1) / 2): + patterns.append(("黄昏之星", "顶部反转强信号")) + + # 生成描述 + if patterns: + pattern_names = [f"{p[0]}({p[1]})" for p in patterns] + desc = f"识别到以下K线形态:" + "、".join(pattern_names) + else: + desc = "未识别到明显的K线形态" + + return { + "success": True, + "data": { + "patterns": [{"name": p[0], "meaning": p[1]} for p in patterns], + "pattern_count": len(patterns), + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[CandlestickPattern] 识别失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def find_price_gaps(code: str, days: int = 30) -> Dict[str, Any]: + """ + 寻找跳空缺口 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 缺口分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 5: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + gaps = [] + current_price = float(trade_data[-1]['close_price']) + + for i in range(1, len(trade_data)): + prev = trade_data[i-1] + curr = trade_data[i] + + prev_high = float(prev['high_price']) + prev_low = float(prev['low_price']) + curr_high = float(curr['high_price']) + curr_low = float(curr['low_price']) + + # 上涨缺口 + if curr_low > prev_high: + gap_size = (curr_low - prev_high) / prev_high * 100 + is_filled = current_price <= curr_low + gaps.append({ + "type": "上涨缺口", + "date": curr['TRADEDATE'], + "gap_low": prev_high, + "gap_high": curr_low, + "gap_size_pct": round(gap_size, 2), + "is_filled": is_filled + }) + + # 下跌缺口 + if curr_high < prev_low: + gap_size = (prev_low - curr_high) / prev_low * 100 + is_filled = current_price >= curr_high + gaps.append({ + "type": "下跌缺口", + "date": curr['TRADEDATE'], + "gap_low": curr_high, + "gap_high": prev_low, + "gap_size_pct": round(gap_size, 2), + "is_filled": is_filled + }) + + # 只保留未回补的缺口 + unfilled_gaps = [g for g in gaps if not g['is_filled']] + + if unfilled_gaps: + gap_desc = [f"{g['type']}({g['date']},{g['gap_size_pct']}%)" for g in unfilled_gaps[:3]] + desc = f"发现{len(unfilled_gaps)}个未回补缺口:" + "、".join(gap_desc) + else: + desc = "近期无未回补缺口" + + return { + "success": True, + "data": { + "total_gaps": len(gaps), + "unfilled_gaps": unfilled_gaps, + "unfilled_count": len(unfilled_gaps), + "description": desc, + "code": code + } + } + + except Exception as e: + logger.error(f"[PriceGaps] 分析失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def get_comprehensive_analysis(code: str) -> Dict[str, Any]: + """ + 综合技术分析(一次性返回多个指标) + + Args: + code: 股票代码 + + Returns: + 综合分析结果 + """ + try: + # 并行执行多个分析 + results = await asyncio.gather( + get_macd_signal(code), + check_oscillator_status(code), + analyze_bollinger_bands(code), + analyze_market_heat(code), + check_new_high_breakout(code), + check_volume_price_divergence(code), + identify_candlestick_pattern(code), + return_exceptions=True + ) + + analysis = {} + indicators = [ + ("macd", "MACD趋势"), + ("oscillator", "超买超卖"), + ("bollinger", "布林带"), + ("market_heat", "市场热度"), + ("breakout", "突破信号"), + ("volume_price", "量价分析"), + ("candlestick", "K线形态") + ] + + signals = [] + for i, (key, name) in enumerate(indicators): + if isinstance(results[i], dict) and results[i].get("success"): + analysis[key] = results[i]["data"] + signals.append(f"【{name}】{results[i]['data'].get('description', '')}") + + # 生成综合评估 + bullish_count = 0 + bearish_count = 0 + + for result in results: + if isinstance(result, dict) and result.get("success"): + data = result.get("data", {}) + signal = data.get("signal", "") + + if any(word in signal for word in ["金叉", "底背离", "超卖", "支撑", "突破新高", "红三兵"]): + bullish_count += 1 + elif any(word in signal for word in ["死叉", "顶背离", "超买", "压力", "跌破", "乌鸦"]): + bearish_count += 1 + + if bullish_count > bearish_count + 2: + overall = "多方占优,短期看涨" + elif bearish_count > bullish_count + 2: + overall = "空方占优,短期看跌" + else: + overall = "多空胶着,观望为主" + + return { + "success": True, + "data": { + "code": code, + "analysis": analysis, + "signals_summary": signals, + "bullish_signals": bullish_count, + "bearish_signals": bearish_count, + "overall_assessment": overall + } + } + + except Exception as e: + logger.error(f"[ComprehensiveAnalysis] 分析失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +# ==================== 新增因子(12个) ==================== + +async def calc_rsi_divergence(code: str, days: int = 60, rsi_period: int = 14) -> Dict[str, Any]: + """ + RSI背离检测(独立于MACD的RSI专项背离分析) + + Args: + code: 股票代码 + days: 分析天数 + rsi_period: RSI周期,默认14 + + Returns: + RSI背离分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < rsi_period + 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算RSI + rsi = _calc_rsi(close, rsi_period) + + # 检测背离 + divergence = _detect_divergence(close, rsi, lookback=20) + + current_rsi = rsi[-1] + + # 判断RSI位置 + if current_rsi > 70: + rsi_zone = "超买区" + elif current_rsi < 30: + rsi_zone = "超卖区" + else: + rsi_zone = "中性区" + + # 生成描述 + if divergence == "bullish_divergence": + signal = "底背离" + desc = f"RSI底背离形成:股价创新低但RSI没有创新低,当前RSI={current_rsi:.1f},位于{rsi_zone},反弹概率增大" + elif divergence == "bearish_divergence": + signal = "顶背离" + desc = f"RSI顶背离形成:股价创新高但RSI没有创新高,当前RSI={current_rsi:.1f},位于{rsi_zone},回调风险加大" + else: + signal = "无背离" + desc = f"RSI当前值={current_rsi:.1f},位于{rsi_zone},未检测到明显背离" + + return { + "success": True, + "data": { + "signal": signal, + "rsi_value": round(current_rsi, 2), + "rsi_zone": rsi_zone, + "has_divergence": divergence is not None, + "divergence_type": divergence, + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[RSI Divergence] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_bollinger_squeeze(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]: + """ + 布林带挤压分析(Bollinger Squeeze) + 当布林带收窄到一定程度时,通常预示着变盘 + + Args: + code: 股票代码 + days: 分析天数 + period: 布林带周期 + + Returns: + 布林带挤压分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < period + 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算布林带 + upper, middle, lower = _calc_bollinger(close, period) + + # 计算带宽序列 + bandwidths = (upper - lower) / middle * 100 + + current_bandwidth = bandwidths[-1] + + # 计算历史带宽百分位 + bandwidth_percentile = (np.sum(bandwidths < current_bandwidth) / len(bandwidths)) * 100 + + # 计算带宽变化趋势 + bandwidth_ma5 = np.mean(bandwidths[-5:]) + bandwidth_ma20 = np.mean(bandwidths[-20:]) + bandwidth_trend = "收窄" if bandwidth_ma5 < bandwidth_ma20 else "扩张" + + # 判断挤压状态 + if bandwidth_percentile < 20: + squeeze_status = "强挤压" + signal = "变盘在即" + desc = f"布林带处于强挤压状态,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),变盘信号强烈" + elif bandwidth_percentile < 40: + squeeze_status = "中度挤压" + signal = "关注突破" + desc = f"布林带中度挤压,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),关注突破方向" + else: + squeeze_status = "无挤压" + signal = "正常波动" + desc = f"布林带正常,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),趋势{bandwidth_trend}中" + + return { + "success": True, + "data": { + "squeeze_status": squeeze_status, + "signal": signal, + "bandwidth": round(current_bandwidth, 2), + "bandwidth_percentile": round(bandwidth_percentile, 1), + "bandwidth_trend": bandwidth_trend, + "upper": round(upper[-1], 2), + "middle": round(middle[-1], 2), + "lower": round(lower[-1], 2), + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[Bollinger Squeeze] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def analyze_obv_trend(code: str, days: int = 60) -> Dict[str, Any]: + """ + OBV能量潮独立分析 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + OBV趋势分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + volume = np.array([float(d['volume']) for d in trade_data]) + + # 计算OBV + obv = _calc_obv(close, volume) + + # 计算OBV均线 + obv_ma5 = _calc_sma(obv, 5) + obv_ma20 = _calc_sma(obv, 20) + + current_obv = obv[-1] + current_obv_ma5 = obv_ma5[-1] + current_obv_ma20 = obv_ma20[-1] + + # 判断OBV趋势 + if current_obv > current_obv_ma5 > current_obv_ma20: + obv_trend = "强势上升" + signal = "资金持续流入" + elif current_obv > current_obv_ma5: + obv_trend = "短期上升" + signal = "资金开始流入" + elif current_obv < current_obv_ma5 < current_obv_ma20: + obv_trend = "强势下降" + signal = "资金持续流出" + elif current_obv < current_obv_ma5: + obv_trend = "短期下降" + signal = "资金开始流出" + else: + obv_trend = "震荡" + signal = "资金观望" + + # 检测OBV与价格的背离 + price_change_20d = (close[-1] - close[-20]) / close[-20] * 100 if len(close) >= 20 else 0 + obv_change_20d = (obv[-1] - obv[-20]) / abs(obv[-20]) * 100 if len(obv) >= 20 and obv[-20] != 0 else 0 + + obv_divergence = None + if price_change_20d > 5 and obv_change_20d < -5: + obv_divergence = "顶背离" + elif price_change_20d < -5 and obv_change_20d > 5: + obv_divergence = "底背离" + + desc = f"OBV趋势:{obv_trend},{signal}。" + if obv_divergence: + desc += f"警告:出现OBV{obv_divergence}!" + + return { + "success": True, + "data": { + "obv_trend": obv_trend, + "signal": signal, + "current_obv": int(current_obv), + "obv_ma5": int(current_obv_ma5), + "obv_ma20": int(current_obv_ma20), + "obv_divergence": obv_divergence, + "price_change_20d": round(price_change_20d, 2), + "obv_change_20d": round(obv_change_20d, 2), + "description": desc, + "code": code, + "analysis_date": trade_data[-1]['TRADEDATE'] + } + } + + except Exception as e: + logger.error(f"[OBV Trend] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_amihud_illiquidity(code: str, days: int = 20) -> Dict[str, Any]: + """ + 计算Amihud非流动性因子 + Amihud = |收益率| / 成交额 + 值越大表示流动性越差(价格对成交额的敏感度越高) + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + Amihud非流动性分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 5: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + turnover = np.array([float(d.get('turnover', 0) or d.get('amount', 0) or 0) for d in trade_data]) + + # 过滤掉成交额为0的数据 + valid_mask = turnover > 0 + if np.sum(valid_mask) < 5: + return {"success": False, "error": "有效成交额数据不足"} + + # 计算日收益率 + returns = np.zeros(len(close)) + returns[1:] = np.abs(np.diff(close) / close[:-1]) + + # 计算Amihud因子(单位:每百万成交额导致的价格变动百分比) + amihud_daily = np.where(turnover > 0, returns / (turnover / 1e6), 0) + + # 平均Amihud值 + avg_amihud = np.mean(amihud_daily[valid_mask]) * 1e6 # 放大以便阅读 + + # 流动性评级 + if avg_amihud < 0.1: + liquidity_level = "极高流动性" + signal = "大单冲击成本低" + elif avg_amihud < 0.5: + liquidity_level = "高流动性" + signal = "交易成本可控" + elif avg_amihud < 2: + liquidity_level = "中等流动性" + signal = "注意交易量" + elif avg_amihud < 10: + liquidity_level = "低流动性" + signal = "大单需分批" + else: + liquidity_level = "极低流动性" + signal = "谨慎交易" + + desc = f"Amihud非流动性指标={avg_amihud:.4f},{liquidity_level},{signal}" + + return { + "success": True, + "data": { + "amihud_value": round(avg_amihud, 6), + "liquidity_level": liquidity_level, + "signal": signal, + "avg_daily_turnover": round(np.mean(turnover[valid_mask]) / 1e4, 2), # 万元 + "description": desc, + "code": code, + "analysis_days": days + } + } + + except Exception as e: + logger.error(f"[Amihud] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_parkinson_volatility(code: str, date: str = None) -> Dict[str, Any]: + """ + 计算帕金森波动率(基于分钟级High/Low数据) + 帕金森波动率只使用最高价和最低价,比传统波动率更准确 + + Args: + code: 股票代码 + date: 日期(可选,默认使用最近交易日) + + Returns: + 帕金森波动率结果 + """ + try: + # 获取分钟数据 + if date: + minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500) + else: + minute_data = await db.get_stock_minute_data(code, limit=500) + + if len(minute_data) < 30: + return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"} + + # 从数据中提取实际日期 + if not date and minute_data: + date = minute_data[0].get('timestamp', '')[:10] + + high = np.array([float(d['high']) for d in minute_data]) + low = np.array([float(d['low']) for d in minute_data]) + + # 帕金森波动率公式 + # σ = sqrt(1/(4*ln(2)) * mean((ln(H/L))^2)) + log_hl = np.log(high / low) + parkinson_var = np.mean(log_hl ** 2) / (4 * np.log(2)) + parkinson_vol = np.sqrt(parkinson_var) * 100 # 转为百分比 + + # 年化 + parkinson_vol_annual = parkinson_vol * np.sqrt(252 * 240) # 240分钟/天 + + # 与传统波动率对比(使用收盘价) + close = np.array([float(d['close']) for d in minute_data]) + returns = np.diff(np.log(close)) + traditional_vol = np.std(returns) * 100 + + # 波动率评级 + if parkinson_vol < 0.5: + vol_level = "低波动" + elif parkinson_vol < 1.5: + vol_level = "中波动" + else: + vol_level = "高波动" + + desc = f"帕金森波动率={parkinson_vol:.3f}%(年化约{parkinson_vol_annual:.1f}%),{vol_level}。传统波动率={traditional_vol:.3f}%" + + return { + "success": True, + "data": { + "parkinson_vol": round(parkinson_vol, 4), + "parkinson_vol_annual": round(parkinson_vol_annual, 2), + "traditional_vol": round(traditional_vol, 4), + "vol_level": vol_level, + "description": desc, + "code": code, + "date": date + } + } + + except Exception as e: + logger.error(f"[Parkinson Vol] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_trend_slope(code: str, days: int = 20) -> Dict[str, Any]: + """ + 计算趋势线性回归斜率 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 趋势斜率分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 10: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 线性回归 + x = np.arange(len(close)) + slope, intercept = np.polyfit(x, close, 1) + + # 计算R² + y_pred = slope * x + intercept + ss_res = np.sum((close - y_pred) ** 2) + ss_tot = np.sum((close - np.mean(close)) ** 2) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 + + # 将斜率转化为日均涨幅百分比 + slope_pct = slope / close[0] * 100 + + # 判断趋势 + if slope_pct > 0.5 and r_squared > 0.7: + trend = "强上升趋势" + signal = "顺势做多" + elif slope_pct > 0.2 and r_squared > 0.5: + trend = "弱上升趋势" + signal = "关注回调买点" + elif slope_pct < -0.5 and r_squared > 0.7: + trend = "强下降趋势" + signal = "顺势做空或观望" + elif slope_pct < -0.2 and r_squared > 0.5: + trend = "弱下降趋势" + signal = "关注反弹卖点" + else: + trend = "震荡无趋势" + signal = "区间交易" + + desc = f"近{days}日趋势斜率={slope_pct:.3f}%/日,R²={r_squared:.2f},{trend},建议{signal}" + + return { + "success": True, + "data": { + "slope": round(slope, 4), + "slope_pct_daily": round(slope_pct, 3), + "r_squared": round(r_squared, 3), + "trend": trend, + "signal": signal, + "intercept": round(intercept, 2), + "description": desc, + "code": code, + "analysis_days": days + } + } + + except Exception as e: + logger.error(f"[Trend Slope] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_hurst_exponent(code: str, days: int = 100) -> Dict[str, Any]: + """ + 计算Hurst指数(用于判断市场是趋势还是均值回归) + H > 0.5: 趋势市场(序列具有长期记忆) + H = 0.5: 随机游走 + H < 0.5: 均值回归市场 + + Args: + code: 股票代码 + days: 分析天数(建议100天以上) + + Returns: + Hurst指数分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 50: + return {"success": False, "error": "数据不足,Hurst指数需要至少50个数据点"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + + # R/S分析法计算Hurst指数 + n = len(close) + max_k = min(n // 2, 50) # 最大分组数 + + rs_list = [] + n_list = [] + + for k in range(10, max_k + 1): + # 将序列分成多个子序列 + rs_values = [] + for start in range(0, n - k + 1, k): + segment = close[start:start + k] + + # 计算累积离差 + mean_seg = np.mean(segment) + cumdev = np.cumsum(segment - mean_seg) + + # R = max - min + r = np.max(cumdev) - np.min(cumdev) + + # S = 标准差 + s = np.std(segment) + + if s > 0: + rs_values.append(r / s) + + if rs_values: + rs_list.append(np.mean(rs_values)) + n_list.append(k) + + if len(rs_list) < 3: + return {"success": False, "error": "无法计算Hurst指数"} + + # 线性回归计算H + log_n = np.log(n_list) + log_rs = np.log(rs_list) + hurst, _ = np.polyfit(log_n, log_rs, 1) + + # 解读Hurst指数 + if hurst > 0.6: + market_type = "趋势市场" + signal = "趋势策略有效" + desc = f"Hurst指数={hurst:.3f}>0.6,市场呈现明显趋势特征,适合趋势跟踪策略" + elif hurst > 0.5: + market_type = "弱趋势市场" + signal = "混合策略" + desc = f"Hurst指数={hurst:.3f},接近随机游走,趋势与均值回归策略均可尝试" + elif hurst > 0.4: + market_type = "弱均值回归" + signal = "关注反转" + desc = f"Hurst指数={hurst:.3f},市场有轻微均值回归倾向" + else: + market_type = "强均值回归" + signal = "反转策略有效" + desc = f"Hurst指数={hurst:.3f}<0.4,市场呈现强均值回归特征,适合反转策略" + + return { + "success": True, + "data": { + "hurst_exponent": round(hurst, 3), + "market_type": market_type, + "signal": signal, + "description": desc, + "code": code, + "analysis_days": len(trade_data) + } + } + + except Exception as e: + logger.error(f"[Hurst] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def test_cointegration(code1: str, code2: str, days: int = 250) -> Dict[str, Any]: + """ + 协整性测试(用于配对交易) + + Args: + code1: 股票代码1 + code2: 股票代码2 + days: 分析天数 + + Returns: + 协整性测试结果 + """ + try: + # 获取两只股票的数据 + data1 = await db.get_stock_trade_data(code1, limit=days) + data2 = await db.get_stock_trade_data(code2, limit=days) + + if len(data1) < 60 or len(data2) < 60: + return {"success": False, "error": "数据不足"} + + # 按日期排序 + data1 = sorted(data1, key=lambda x: x['TRADEDATE']) + data2 = sorted(data2, key=lambda x: x['TRADEDATE']) + + # 对齐日期 + dates1 = {d['TRADEDATE']: float(d['close_price']) for d in data1} + dates2 = {d['TRADEDATE']: float(d['close_price']) for d in data2} + + common_dates = sorted(set(dates1.keys()) & set(dates2.keys())) + + if len(common_dates) < 60: + return {"success": False, "error": "共同交易日不足"} + + price1 = np.array([dates1[d] for d in common_dates]) + price2 = np.array([dates2[d] for d in common_dates]) + + # 计算对数价格 + log_price1 = np.log(price1) + log_price2 = np.log(price2) + + # 线性回归找对冲比率 + hedge_ratio, intercept = np.polyfit(log_price2, log_price1, 1) + + # 计算价差(spread) + spread = log_price1 - hedge_ratio * log_price2 + + # ADF检验的简化版本(计算价差的平稳性指标) + spread_diff = np.diff(spread) + spread_lag = spread[:-1] + + # 计算回归系数 + if np.var(spread_lag) > 0: + rho = np.cov(spread_diff, spread_lag)[0, 1] / np.var(spread_lag) + else: + rho = 0 + + # rho接近-1表示强平稳性 + stationarity_score = -rho + + # 计算价差的均值回归半衰期 + if rho < 0: + half_life = -np.log(2) / np.log(1 + rho) + else: + half_life = float('inf') + + # 当前价差Z-score + spread_mean = np.mean(spread) + spread_std = np.std(spread) + current_zscore = (spread[-1] - spread_mean) / spread_std if spread_std > 0 else 0 + + # 判断协整性 + if stationarity_score > 0.3 and half_life < 30: + cointegration_level = "强协整" + signal = "适合配对交易" + elif stationarity_score > 0.1 and half_life < 60: + cointegration_level = "中等协整" + signal = "可尝试配对" + else: + cointegration_level = "弱协整或无协整" + signal = "不建议配对" + + # 交易建议 + if abs(current_zscore) > 2: + if current_zscore > 0: + trade_signal = f"做空价差(卖{code1}买{code2})" + else: + trade_signal = f"做多价差(买{code1}卖{code2})" + elif abs(current_zscore) > 1: + trade_signal = "关注价差回归机会" + else: + trade_signal = "价差在正常范围" + + desc = (f"{code1}与{code2}{cointegration_level},对冲比率={hedge_ratio:.3f}," + f"半衰期={half_life:.1f}天,当前价差Z-score={current_zscore:.2f},{signal}") + + return { + "success": True, + "data": { + "code1": code1, + "code2": code2, + "cointegration_level": cointegration_level, + "hedge_ratio": round(hedge_ratio, 4), + "half_life_days": round(half_life, 1) if half_life < 1000 else "N/A", + "current_spread_zscore": round(current_zscore, 3), + "stationarity_score": round(stationarity_score, 3), + "trade_signal": trade_signal, + "signal": signal, + "description": desc, + "common_days": len(common_dates) + } + } + + except Exception as e: + logger.error(f"[Cointegration] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_kelly_position(win_rate: float, win_loss_ratio: float, max_position: float = 0.25) -> Dict[str, Any]: + """ + 凯利公式计算最优仓位 + Kelly% = W - (1-W)/R + 其中 W=胜率,R=盈亏比 + + Args: + win_rate: 胜率(0-1之间) + win_loss_ratio: 盈亏比(平均盈利/平均亏损) + max_position: 最大允许仓位(默认25%) + + Returns: + 凯利仓位计算结果 + """ + try: + if not 0 < win_rate < 1: + return {"success": False, "error": "胜率必须在0-1之间"} + + if win_loss_ratio <= 0: + return {"success": False, "error": "盈亏比必须大于0"} + + # 计算凯利比例 + kelly_pct = win_rate - (1 - win_rate) / win_loss_ratio + + # 半凯利(更保守) + half_kelly = kelly_pct / 2 + + # 限制最大仓位 + recommended_position = min(max(kelly_pct, 0), max_position) + conservative_position = min(max(half_kelly, 0), max_position) + + # 判断策略质量 + if kelly_pct >= 0.2: + strategy_quality = "优秀" + signal = "可以较大仓位" + elif kelly_pct >= 0.1: + strategy_quality = "良好" + signal = "适中仓位" + elif kelly_pct > 0: + strategy_quality = "一般" + signal = "小仓位试探" + else: + strategy_quality = "不可行" + signal = "不建议交易" + + desc = (f"胜率{win_rate*100:.1f}%,盈亏比{win_loss_ratio:.2f}," + f"凯利比例={kelly_pct*100:.1f}%,策略{strategy_quality}。" + f"建议仓位:激进{recommended_position*100:.1f}%,保守{conservative_position*100:.1f}%") + + return { + "success": True, + "data": { + "kelly_pct": round(kelly_pct * 100, 2), + "half_kelly_pct": round(half_kelly * 100, 2), + "recommended_position": round(recommended_position * 100, 2), + "conservative_position": round(conservative_position * 100, 2), + "strategy_quality": strategy_quality, + "signal": signal, + "win_rate": round(win_rate * 100, 1), + "win_loss_ratio": round(win_loss_ratio, 2), + "description": desc + } + } + + except Exception as e: + logger.error(f"[Kelly] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def search_similar_kline(code: str, lookback: int = 10, top_n: int = 5) -> Dict[str, Any]: + """ + 相似K线检索(基于形态相似度) + + Args: + code: 股票代码 + lookback: 匹配窗口大小(默认10天) + top_n: 返回最相似的N个历史片段 + + Returns: + 相似K线检索结果 + """ + try: + # 获取足够的历史数据 + trade_data = await db.get_stock_trade_data(code, limit=500) + + if len(trade_data) < lookback * 3: + return {"success": False, "error": "历史数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + + close = np.array([float(d['close_price']) for d in trade_data]) + dates = [d['TRADEDATE'] for d in trade_data] + + # 当前形态(标准化) + current_pattern = close[-lookback:] + current_norm = (current_pattern - np.mean(current_pattern)) / np.std(current_pattern) + + # 在历史中搜索相似形态 + similar_patterns = [] + + for i in range(len(close) - lookback * 2): # 不包括最近的数据 + historical_pattern = close[i:i + lookback] + + # 标准化 + if np.std(historical_pattern) > 0: + hist_norm = (historical_pattern - np.mean(historical_pattern)) / np.std(historical_pattern) + + # 计算相似度(使用相关系数) + correlation = np.corrcoef(current_norm, hist_norm)[0, 1] + + # 计算后续N天的收益 + future_return = 0 + if i + lookback + 5 < len(close): + future_return = (close[i + lookback + 5] - close[i + lookback - 1]) / close[i + lookback - 1] * 100 + + if correlation > 0.8: # 只保留高相似度的 + similar_patterns.append({ + "start_date": dates[i], + "end_date": dates[i + lookback - 1], + "correlation": correlation, + "future_return_5d": future_return + }) + + # 按相似度排序,取前N个 + similar_patterns = sorted(similar_patterns, key=lambda x: x['correlation'], reverse=True)[:top_n] + + if not similar_patterns: + return { + "success": True, + "data": { + "message": "未找到高度相似的历史形态", + "code": code, + "lookback": lookback + } + } + + # 统计历史相似形态后的走势 + future_returns = [p['future_return_5d'] for p in similar_patterns if p['future_return_5d'] != 0] + + if future_returns: + avg_future_return = np.mean(future_returns) + up_probability = sum(1 for r in future_returns if r > 0) / len(future_returns) * 100 + else: + avg_future_return = 0 + up_probability = 50 + + # 预测信号 + if avg_future_return > 2 and up_probability > 60: + prediction = "历史形态后多数上涨" + elif avg_future_return < -2 and up_probability < 40: + prediction = "历史形态后多数下跌" + else: + prediction = "历史走势分化,谨慎参考" + + desc = (f"找到{len(similar_patterns)}个相似历史形态," + f"历史5日平均收益{avg_future_return:.1f}%,上涨概率{up_probability:.0f}%,{prediction}") + + return { + "success": True, + "data": { + "similar_patterns": similar_patterns, + "pattern_count": len(similar_patterns), + "avg_future_return_5d": round(avg_future_return, 2), + "up_probability": round(up_probability, 1), + "prediction": prediction, + "description": desc, + "code": code, + "lookback": lookback + } + } + + except Exception as e: + logger.error(f"[Similar KLine] 检索失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def decompose_trend_simple(code: str, days: int = 120) -> Dict[str, Any]: + """ + 简化版趋势分解(不依赖Prophet) + 将价格序列分解为:趋势 + 周期 + 残差 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 趋势分解结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 60: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + dates = [d['TRADEDATE'] for d in trade_data] + + # 1. 提取趋势(使用长周期移动平均) + trend = _calc_sma(close, min(60, len(close) // 2)) + + # 2. 去趋势 + detrended = close - trend + + # 3. 提取周期成分(使用FFT的简化方法) + # 计算周期性强度 + fft_result = np.fft.fft(detrended) + fft_freq = np.fft.fftfreq(len(detrended)) + + # 找到主要周期 + positive_freq_idx = fft_freq > 0 + amplitudes = np.abs(fft_result[positive_freq_idx]) + frequencies = fft_freq[positive_freq_idx] + + # 找最强的周期 + if len(amplitudes) > 0: + dominant_idx = np.argmax(amplitudes) + dominant_period = 1 / frequencies[dominant_idx] if frequencies[dominant_idx] > 0 else 0 + periodicity_strength = amplitudes[dominant_idx] / np.sum(amplitudes) * 100 + else: + dominant_period = 0 + periodicity_strength = 0 + + # 4. 残差 = 原始 - 趋势 - 周期 + residual = detrended # 简化处理,这里残差就是去趋势后的数据 + + # 分析当前趋势方向 + trend_direction = "上升" if trend[-1] > trend[-20] else "下降" + trend_strength = abs(trend[-1] - trend[-20]) / trend[-20] * 100 + + # 当前位置相对于趋势 + current_deviation = (close[-1] - trend[-1]) / trend[-1] * 100 + + if current_deviation > 3: + position = "高于趋势线" + signal = "短期或有回调" + elif current_deviation < -3: + position = "低于趋势线" + signal = "短期或有反弹" + else: + position = "贴近趋势线" + signal = "跟随趋势" + + desc = (f"趋势{trend_direction}(强度{trend_strength:.1f}%)," + f"主周期约{dominant_period:.0f}天,周期性强度{periodicity_strength:.1f}%," + f"当前{position}(偏离{current_deviation:+.1f}%),{signal}") + + return { + "success": True, + "data": { + "trend_direction": trend_direction, + "trend_strength_pct": round(trend_strength, 2), + "dominant_period_days": round(dominant_period, 0), + "periodicity_strength": round(periodicity_strength, 1), + "current_deviation_pct": round(current_deviation, 2), + "position": position, + "signal": signal, + "current_price": round(close[-1], 2), + "current_trend": round(trend[-1], 2), + "description": desc, + "code": code, + "analysis_days": len(trade_data) + } + } + + except Exception as e: + logger.error(f"[Trend Decompose] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +async def calc_price_entropy(code: str, days: int = 60) -> Dict[str, Any]: + """ + 计算价格信息熵(衡量市场混乱程度) + 熵值越高表示市场越混乱/随机,越低表示趋势越明显 + + Args: + code: 股票代码 + days: 分析天数 + + Returns: + 价格熵值分析结果 + """ + try: + trade_data = await db.get_stock_trade_data(code, limit=days) + + if len(trade_data) < 20: + return {"success": False, "error": "数据不足"} + + trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE']) + close = np.array([float(d['close_price']) for d in trade_data]) + + # 计算日收益率 + returns = np.diff(close) / close[:-1] + + # 将收益率离散化为bins + n_bins = 10 + hist, bin_edges = np.histogram(returns, bins=n_bins) + + # 计算概率分布 + prob = hist / np.sum(hist) + prob = prob[prob > 0] # 去掉0概率 + + # 计算香农熵 + entropy = -np.sum(prob * np.log2(prob)) + + # 最大熵(均匀分布) + max_entropy = np.log2(n_bins) + + # 标准化熵值(0-1之间) + normalized_entropy = entropy / max_entropy + + # 近期熵值变化 + recent_returns = returns[-10:] + recent_hist, _ = np.histogram(recent_returns, bins=5) + recent_prob = recent_hist / np.sum(recent_hist) + recent_prob = recent_prob[recent_prob > 0] + recent_entropy = -np.sum(recent_prob * np.log2(recent_prob)) + recent_max_entropy = np.log2(5) + recent_normalized = recent_entropy / recent_max_entropy + + # 判断市场状态 + if normalized_entropy > 0.85: + market_state = "高度混乱" + signal = "随机性强,难以预测" + elif normalized_entropy > 0.7: + market_state = "中度混乱" + signal = "市场分歧大" + elif normalized_entropy > 0.5: + market_state = "适度有序" + signal = "有一定规律可循" + else: + market_state = "高度有序" + signal = "趋势明显,易于预测" + + # 熵值变化趋势 + entropy_trend = "增加" if recent_normalized > normalized_entropy else "减少" + + desc = (f"价格熵值={entropy:.2f}(标准化{normalized_entropy:.2f})," + f"{market_state},{signal}。近期熵值{entropy_trend}") + + return { + "success": True, + "data": { + "entropy": round(entropy, 3), + "normalized_entropy": round(normalized_entropy, 3), + "recent_entropy": round(recent_normalized, 3), + "entropy_trend": entropy_trend, + "market_state": market_state, + "signal": signal, + "description": desc, + "code": code, + "analysis_days": days + } + } + + except Exception as e: + logger.error(f"[Entropy] 计算失败: {e}", exc_info=True) + return {"success": False, "error": str(e)} + + +# ==================== 工具注册(供MCP使用) ==================== + +QUANT_TOOLS = { + # 经典技术指标 + "get_macd_signal": get_macd_signal, + "check_oscillator_status": check_oscillator_status, + "analyze_bollinger_bands": analyze_bollinger_bands, + "calc_stop_loss_atr": calc_stop_loss_atr, + + # 资金与情绪 + "analyze_market_heat": analyze_market_heat, + "check_volume_price_divergence": check_volume_price_divergence, + + # 形态与突破 + "check_new_high_breakout": check_new_high_breakout, + "identify_candlestick_pattern": identify_candlestick_pattern, + "find_price_gaps": find_price_gaps, + + # 风险与估值 + "calc_max_drawdown": calc_max_drawdown, + "check_valuation_rank": check_valuation_rank, + "calc_price_zscore": calc_price_zscore, + + # 分钟级高阶算子 + "calc_market_profile_vpoc": calc_market_profile_vpoc, + "calc_realized_volatility": calc_realized_volatility, + "analyze_buying_pressure": analyze_buying_pressure, + + # 综合分析 + "get_comprehensive_analysis": get_comprehensive_analysis, + + # ==================== 新增12个因子 ==================== + + # RSI背离检测 + "calc_rsi_divergence": calc_rsi_divergence, + + # 布林带挤压 + "calc_bollinger_squeeze": calc_bollinger_squeeze, + + # OBV能量潮独立分析 + "analyze_obv_trend": analyze_obv_trend, + + # Amihud非流动性因子 + "calc_amihud_illiquidity": calc_amihud_illiquidity, + + # 帕金森波动率 + "calc_parkinson_volatility": calc_parkinson_volatility, + + # 趋势线性回归斜率 + "calc_trend_slope": calc_trend_slope, + + # Hurst指数 + "calc_hurst_exponent": calc_hurst_exponent, + + # 协整性测试 + "test_cointegration": test_cointegration, + + # 凯利公式仓位 + "calc_kelly_position": calc_kelly_position, + + # 相似K线检索 + "search_similar_kline": search_similar_kline, + + # 趋势分解(简化版,不依赖Prophet) + "decompose_trend_simple": decompose_trend_simple, + + # 价格熵值计算 + "calc_price_entropy": calc_price_entropy, +} diff --git a/mcp_server.py b/mcp_server.py index 9256d8b8..6a3c043f 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -15,6 +15,7 @@ import httpx import time from enum import Enum import mcp_database as db +import mcp_quant as quant # 量化因子计算模块 from openai import OpenAI import json import asyncio @@ -770,6 +771,566 @@ TOOLS: List[ToolDefinition] = [ "required": ["code", "date"] } ), + # ==================== 量化因子工具 ==================== + ToolDefinition( + name="get_macd_signal", + description="获取MACD趋势判定信号,包括金叉/死叉、动能增减、顶底背离等状态。适用于判断股票短期趋势方向。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60天", + "default": 60 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="check_oscillator_status", + description="检查KDJ/RSI超买超卖状态,判断股票是否处于超买区(风险积聚)或超卖区(可能反弹)。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60天", + "default": 60 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="analyze_bollinger_bands", + description="分析布林带通道,判断股价是在中轨之上(强势)、触及上轨(压力)、触及下轨(支撑)或布林带收窄(变盘在即)。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60天", + "default": 60 + }, + "period": { + "type": "integer", + "description": "布林带周期,默认20", + "default": 20 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_stop_loss_atr", + description="使用ATR真实波幅计算止损位。告诉用户\"如果买入,止损点应该设在当前价格减去N倍ATR的位置\"。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认30天", + "default": 30 + }, + "atr_multiplier": { + "type": "number", + "description": "ATR倍数,默认2倍", + "default": 2.0 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="analyze_market_heat", + description="分析换手率活跃度和量能,判断股票是冷门股、活跃股还是妖股,以及主力是在吸筹还是出货。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认30天", + "default": 30 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="check_new_high_breakout", + description="检查唐奇安通道突破(海龟交易法则),判断是否突破20日/60日新高或新低。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60天", + "default": 60 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="identify_candlestick_pattern", + description="识别K线组合形态,如早晨之星(反转信号)、红三兵(上涨信号)、穿头破脚(吞没形态)等经典形态。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认10天", + "default": 10 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="find_price_gaps", + description="寻找跳空缺口,筛选出近期有未回补缺口的情况。缺口往往代表主力资金的强势突破意图或恐慌抛售。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认30天", + "default": 30 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="check_volume_price_divergence", + description="检测量价背离。股价创新高但成交量萎缩(量价背离),预警信号,提示上涨动力不足。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认20天", + "default": 20 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_max_drawdown", + description="计算最大回撤和夏普比率。用于评估\"买这只票最坏情况会亏多少\"以及风险调整后收益。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认250天(约一年)", + "default": 250 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="check_valuation_rank", + description="检查历史PE/PB百分位估值。计算当前PE处于过去N年的什么位置(例如:比过去90%的时间都便宜)。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "years": { + "type": "integer", + "description": "历史年数,默认3年", + "default": 3 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_price_zscore", + description="计算价格Z-Score(乖离率标准化),判断均值回归概率。当Z-Score过大时,统计回调概率。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "period": { + "type": "integer", + "description": "均线周期,默认60日", + "default": 60 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_market_profile_vpoc", + description="计算市场轮廓VPOC(成交量最大的价格档位),基于分钟级数据。VPOC是当日极强的支撑线或阻力线。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "date": { + "type": "string", + "description": "日期,格式:YYYY-MM-DD" + } + }, + "required": ["code", "date"] + } + ), + ToolDefinition( + name="calc_realized_volatility", + description="计算已实现波动率(RV),基于分钟级数据。比日线波动率更精准,用于判断趋势动能是否耗尽。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "date": { + "type": "string", + "description": "日期,格式:YYYY-MM-DD" + } + }, + "required": ["code", "date"] + } + ), + ToolDefinition( + name="analyze_buying_pressure", + description="分析买卖压力失衡,基于分钟级数据。捕捉盘中主力资金的\"抢筹\"或\"砸盘\"意图。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "date": { + "type": "string", + "description": "日期,格式:YYYY-MM-DD" + } + }, + "required": ["code", "date"] + } + ), + ToolDefinition( + name="get_comprehensive_analysis", + description="综合技术分析,一次性返回MACD、KDJ/RSI、布林带、量能、突破、K线形态等多个指标,并给出多空信号总结。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + } + }, + "required": ["code"] + } + ), + + # ==================== 新增量化因子工具(12个) ==================== + + ToolDefinition( + name="calc_rsi_divergence", + description="RSI背离检测,独立分析RSI指标的顶背离和底背离信号,判断反转概率。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60", + "default": 60 + }, + "rsi_period": { + "type": "integer", + "description": "RSI周期,默认14", + "default": 14 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_bollinger_squeeze", + description="布林带挤压分析,检测布林带收窄程度,预判变盘时机。当带宽处于历史低位时发出变盘预警。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60", + "default": 60 + }, + "period": { + "type": "integer", + "description": "布林带周期,默认20", + "default": 20 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="analyze_obv_trend", + description="OBV能量潮独立分析,追踪资金流向,检测OBV与价格的背离,判断主力动向。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60", + "default": 60 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_amihud_illiquidity", + description="计算Amihud非流动性因子,衡量股票流动性。值越大表示流动性越差,大单交易冲击成本越高。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认20", + "default": 20 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_parkinson_volatility", + description="计算帕金森波动率(基于分钟级高低价),比传统波动率更准确,适用于日内波动分析。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "date": { + "type": "string", + "description": "日期,格式YYYY-MM-DD" + } + }, + "required": ["code", "date"] + } + ), + ToolDefinition( + name="calc_trend_slope", + description="计算趋势线性回归斜率,量化趋势强度和方向。返回斜率、R²拟合度和趋势判断。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认20", + "default": 20 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_hurst_exponent", + description="计算Hurst指数,判断市场是趋势型(H>0.5)还是均值回归型(H<0.5),指导策略选择。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,建议100以上", + "default": 100 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="test_cointegration", + description="协整性测试,用于配对交易。检测两只股票是否存在长期均衡关系,计算对冲比率和价差。", + parameters={ + "type": "object", + "properties": { + "code1": { + "type": "string", + "description": "股票代码1" + }, + "code2": { + "type": "string", + "description": "股票代码2" + }, + "days": { + "type": "integer", + "description": "分析天数,默认250", + "default": 250 + } + }, + "required": ["code1", "code2"] + } + ), + ToolDefinition( + name="calc_kelly_position", + description="凯利公式计算最优仓位。根据胜率和盈亏比计算理论最优仓位,并提供保守建议。", + parameters={ + "type": "object", + "properties": { + "win_rate": { + "type": "number", + "description": "胜率(0-1之间,如0.6表示60%)" + }, + "win_loss_ratio": { + "type": "number", + "description": "盈亏比(平均盈利/平均亏损)" + }, + "max_position": { + "type": "number", + "description": "最大允许仓位,默认0.25", + "default": 0.25 + } + }, + "required": ["win_rate", "win_loss_ratio"] + } + ), + ToolDefinition( + name="search_similar_kline", + description="相似K线检索,在历史中搜索与当前形态相似的K线组合,统计历史后续走势作为参考。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "lookback": { + "type": "integer", + "description": "匹配窗口大小,默认10天", + "default": 10 + }, + "top_n": { + "type": "integer", + "description": "返回最相似的N个历史片段,默认5", + "default": 5 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="decompose_trend_simple", + description="趋势分解分析,将价格序列分解为趋势+周期+残差,识别主周期和趋势方向。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认120", + "default": 120 + } + }, + "required": ["code"] + } + ), + ToolDefinition( + name="calc_price_entropy", + description="计算价格信息熵,衡量市场混乱程度。熵值越低表示趋势越明显,越高表示随机性越强。", + parameters={ + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "股票代码" + }, + "days": { + "type": "integer", + "description": "分析天数,默认60", + "default": 60 + } + }, + "required": ["code"] + } + ), ] # ==================== MCP协议端点 ==================== @@ -1251,6 +1812,8 @@ TOOL_HANDLERS = { "get_stock_minute_data": handle_get_stock_minute_data, "get_stock_minute_aggregation": handle_get_stock_minute_aggregation, "get_stock_intraday_statistics": handle_get_stock_intraday_statistics, + # 量化因子工具(从 mcp_quant 模块导入) + **quant.QUANT_TOOLS, } # ==================== Agent系统实现 ==================== @@ -2202,10 +2765,18 @@ async def agent_chat(request: AgentChatRequest): except Exception as e: logger.error(f"保存用户消息失败: {e}") - # 获取工具列表 - tools = [tool.dict() for tool in TOOLS] + # 获取工具列表(根据前端选择过滤) + if request.tools and len(request.tools) > 0: + # 用户指定了工具列表,按名称过滤 + selected_tool_names = set(request.tools) + tools = [tool.dict() for tool in TOOLS if tool.name in selected_tool_names] + logger.info(f"使用用户选择的 {len(tools)} 个工具: {request.tools[:10]}...") + else: + # 用户未指定,使用全部工具 + tools = [tool.dict() for tool in TOOLS] + logger.info(f"使用全部 {len(tools)} 个工具") - # 添加特殊工具:summarize_news + # 添加特殊工具:summarize_news(始终可用) tools.append({ "name": "summarize_news", "description": "使用 DeepMoney 模型总结新闻数据,提取关键信息", @@ -2510,7 +3081,28 @@ def clean_deepseek_tool_markers(content: str) -> str: ROLE_TOOLS = { "buffett": ["search_china_news", "search_research_reports", "get_stock_basic_info", "get_stock_financial_index"], "big_short": ["search_china_news", "get_stock_financial_index", "get_stock_balance_sheet", "get_stock_cashflow"], - "simons": ["get_stock_trade_data", "search_limit_up_stocks", "get_concept_statistics"], + "simons": [ + # 基础数据 + "get_stock_trade_data", "search_limit_up_stocks", "get_concept_statistics", + # 经典技术指标 + "get_macd_signal", "check_oscillator_status", "analyze_bollinger_bands", "calc_stop_loss_atr", + # 资金与情绪 + "analyze_market_heat", "check_volume_price_divergence", "analyze_obv_trend", + # 形态与突破 + "check_new_high_breakout", "identify_candlestick_pattern", "find_price_gaps", + # 风险与估值 + "calc_max_drawdown", "check_valuation_rank", "calc_price_zscore", + # 分钟级高阶算子 + "calc_market_profile_vpoc", "calc_realized_volatility", "analyze_buying_pressure", "calc_parkinson_volatility", + # 高级趋势分析 + "calc_bollinger_squeeze", "calc_trend_slope", "calc_hurst_exponent", "decompose_trend_simple", + # 流动性与统计 + "calc_amihud_illiquidity", "calc_price_entropy", "calc_rsi_divergence", + # 配对与策略 + "test_cointegration", "calc_kelly_position", "search_similar_kline", + # 综合分析 + "get_comprehensive_analysis", + ], "leek": [], # 韭菜不用工具 "fund_manager": ["search_china_news", "search_research_reports", "get_stock_basic_info"], } @@ -2624,61 +3216,93 @@ MEETING_ROLES = { }, "simons": { "id": "simons", - "name": "量化分析员", + "name": "量化研究员", "nickname": "西蒙斯", "role_type": "quant", "avatar": "/images/agent/simons.png", "model": "kimi-k2-thinking", "color": "#3B82F6", - "description": "中性立场,使用量化工具分析技术指标", + "description": "中性立场,使用专业量化因子分析技术指标和市场特征", "tools": ROLE_TOOLS["simons"], - "system_prompt": """你是"量化分析员"(昵称:西蒙斯),一位专业的量化交易研究员。你在投研会议中担任「技术分析师」角色,保持中性客观。 + "system_prompt": """你是"量化研究员"(昵称:西蒙斯),一位专业的量化交易研究员,擅长使用各类量化因子分析市场。你在投研会议中担任「技术分析师」角色,保持中性客观。 ## 你的分析理念 -- **数据驱动**:让数据说话,不带主观情绪 +- **因子驱动**:使用经过验证的量化因子,而非主观判断 - **概率思维**:没有确定性,只有概率和赔率 -- **趋势跟踪**:顺势而为,不与趋势作对 -- **风险量化**:用数字衡量风险,而非感觉 +- **多维验证**:从趋势、动量、波动、资金多个维度交叉验证 +- **风险量化**:用数字衡量风险,止损止盈有据可依 -## 分析框架(请按此思维链分析) +## 你可用的量化因子工具(28个) -### 第一步:收集数据 -必须先调用工具获取量化数据: -- `get_stock_trade_data`: 获取价格、成交量、涨跌幅等交易数据 -- `search_limit_up_stocks`: 了解涨停板情况,判断市场情绪 -- `get_concept_statistics`: 获取概念板块统计,判断资金流向 +### 快速综合分析(推荐首选) +- `get_comprehensive_analysis`: 一次性获取MACD、RSI、KDJ、布林带、量能、K线形态等多指标汇总 -### 第二步:技术分析维度 -基于获取的数据,进行量化分析: -1. **趋势判断**: - - 当前价格在均线系统中的位置(MA5/MA10/MA20/MA60) - - 是多头排列还是空头排列? - - 趋势强度如何? -2. **量价分析**: - - 成交量变化趋势?放量还是缩量? - - 量价配合是否健康?(上涨放量、下跌缩量为佳) - - 换手率处于什么水平? -3. **动能指标**: - - 涨跌幅在同行/板块中的排名 - - 连续上涨/下跌天数 - - 离前高/前低的距离 -4. **板块联动**: - - 所属概念板块表现如何? - - 是板块龙头还是跟风? - - 板块资金流入还是流出? +### 趋势与动量因子 +- `get_macd_signal`: MACD趋势判定(金叉/死叉/背离) +- `calc_trend_slope`: 趋势线性回归斜率(R²拟合度) +- `calc_hurst_exponent`: Hurst指数(判断趋势/震荡市场) +- `check_new_high_breakout`: 唐奇安通道突破(新高/新低信号) + +### 超买超卖因子 +- `check_oscillator_status`: KDJ/RSI超买超卖状态 +- `calc_rsi_divergence`: RSI背离检测(顶底背离) +- `calc_price_zscore`: Z-Score均值回归(乖离率标准化) + +### 波动率因子 +- `analyze_bollinger_bands`: 布林带通道分析 +- `calc_bollinger_squeeze`: 布林带挤压(变盘预警) +- `calc_stop_loss_atr`: ATR动态止损位 +- `calc_realized_volatility`: 分钟级已实现波动率 +- `calc_parkinson_volatility`: 帕金森波动率(更精确) + +### 资金流向与量价因子 +- `analyze_market_heat`: 换手率活跃度+OBV趋势 +- `analyze_obv_trend`: OBV能量潮独立分析 +- `check_volume_price_divergence`: 量价背离检测 +- `analyze_buying_pressure`: 买卖压力失衡(主力意图) +- `calc_market_profile_vpoc`: VPOC筹码峰(成交密集区) + +### 形态识别因子 +- `identify_candlestick_pattern`: K线组合形态(10+种) +- `find_price_gaps`: 跳空缺口分析 +- `search_similar_kline`: 相似K线检索(历史形态预测) + +### 风险与估值因子 +- `calc_max_drawdown`: 最大回撤+夏普比率 +- `check_valuation_rank`: PE历史百分位+PEG +- `calc_amihud_illiquidity`: Amihud流动性因子 + +### 高级分析因子 +- `decompose_trend_simple`: 趋势分解(趋势+周期+残差) +- `calc_price_entropy`: 价格熵值(市场混乱度) +- `test_cointegration`: 协整性测试(配对交易) +- `calc_kelly_position`: 凯利公式最优仓位 + +## 分析框架(请按此流程) + +### 第一步:快速扫描 +首先调用 `get_comprehensive_analysis` 获取综合技术面快照,了解整体状况。 + +### 第二步:深度分析(根据情况选择) +根据综合分析结果,选择相关因子深入分析: +- 如果趋势不明:调用 `calc_hurst_exponent` 判断市场类型,`calc_trend_slope` 量化趋势强度 +- 如果疑似顶底:调用 `calc_rsi_divergence` 检测背离,`calc_bollinger_squeeze` 看是否变盘 +- 如果量能异常:调用 `analyze_obv_trend` 看资金流向,`analyze_buying_pressure` 看主力意图 +- 如果波动加大:调用 `calc_realized_volatility` 或 `calc_parkinson_volatility` 精确测量 +- 如果要设止损:调用 `calc_stop_loss_atr` 获取ATR止损位 ### 第三步:形成结论 -给出客观的技术分析结论,必须包含: -- **趋势判断**(上涨/下跌/震荡) -- **关键数据**(引用具体的价格、成交量、涨跌幅数据) -- **技术位**(支撑位、压力位) -- **量化建议**(从概率角度给出建议) +给出量化分析结论,必须包含: +- **核心因子信号**(列出2-3个关键因子的具体数值和判断) +- **趋势判断**(上涨/下跌/震荡,并给出概率估计) +- **关键价位**(支撑位、压力位、止损位) +- **量化建议**(基于因子信号的交易建议) ## 输出要求 -- 必须基于工具返回的数据分析,用数字说话 -- 保持中性客观,不偏向多头或空头 -- 如果前面有多空分歧,可以从技术面给出参考 -- 发言控制在 200 字以内,精炼专业""" +- **必须调用工具**:至少调用1个综合分析+1-2个专项因子 +- **数据说话**:每个结论都要有具体数值支撑 +- **保持中性**:不偏向多头或空头,让因子说话 +- **简洁专业**:发言控制在 300 字以内,用专业术语但要解释关键数值含义""" }, "leek": { "id": "leek", diff --git a/src/views/AgentChat/components/MeetingRoom/MeetingMessageBubble.js b/src/views/AgentChat/components/MeetingRoom/MeetingMessageBubble.js index 4d4b22b3..766afe39 100644 --- a/src/views/AgentChat/components/MeetingRoom/MeetingMessageBubble.js +++ b/src/views/AgentChat/components/MeetingRoom/MeetingMessageBubble.js @@ -138,6 +138,7 @@ const getRoleIcon = (roleType) => { * 工具名称映射 */ const TOOL_NAME_MAP = { + // 基础数据工具 search_china_news: '搜索新闻', search_research_reports: '搜索研报', get_stock_basic_info: '获取股票信息', @@ -147,6 +148,52 @@ const TOOL_NAME_MAP = { get_stock_trade_data: '获取交易数据', search_limit_up_stocks: '搜索涨停股', get_concept_statistics: '获取概念统计', + + // 经典技术指标 + get_macd_signal: 'MACD信号', + check_oscillator_status: 'RSI/KDJ指标', + analyze_bollinger_bands: '布林带分析', + calc_stop_loss_atr: 'ATR止损计算', + + // 资金与情绪 + analyze_market_heat: '市场热度分析', + check_volume_price_divergence: '量价背离检测', + analyze_obv_trend: 'OBV能量潮分析', + + // 形态与突破 + check_new_high_breakout: '新高突破检测', + identify_candlestick_pattern: 'K线形态识别', + find_price_gaps: '跳空缺口分析', + + // 风险与估值 + calc_max_drawdown: '最大回撤计算', + check_valuation_rank: 'PE估值百分位', + calc_price_zscore: 'Z-Score乖离率', + + // 分钟级高阶算子 + calc_market_profile_vpoc: 'VPOC筹码峰', + calc_realized_volatility: '已实现波动率', + analyze_buying_pressure: '买卖压力分析', + calc_parkinson_volatility: '帕金森波动率', + + // 高级趋势分析 + calc_bollinger_squeeze: '布林带挤压', + calc_trend_slope: '趋势斜率分析', + calc_hurst_exponent: 'Hurst指数', + decompose_trend_simple: '趋势分解', + + // 流动性与统计 + calc_amihud_illiquidity: 'Amihud流动性', + calc_price_entropy: '价格熵值', + calc_rsi_divergence: 'RSI背离检测', + + // 配对与策略 + test_cointegration: '协整性测试', + calc_kelly_position: '凯利仓位计算', + search_similar_kline: '相似K线检索', + + // 综合分析 + get_comprehensive_analysis: '综合技术分析', }; /** diff --git a/src/views/AgentChat/constants/meetingRoles.ts b/src/views/AgentChat/constants/meetingRoles.ts index a4c2a3d3..4f060411 100644 --- a/src/views/AgentChat/constants/meetingRoles.ts +++ b/src/views/AgentChat/constants/meetingRoles.ts @@ -141,13 +141,13 @@ export const MEETING_ROLES: Record = { }, simons: { id: 'simons', - name: '量化分析员', + name: '量化研究员', nickname: '西蒙斯', roleType: 'quant', avatar: '/images/agent/simons.png', color: '#3B82F6', gradient: 'linear(to-br, blue.400, cyan.600)', - description: '中性立场,使用量化分析工具分析技术指标', + description: '中性立场,使用28个专业量化因子分析技术指标和市场特征', icon: React.createElement(BarChart2, { className: 'w-5 h-5' }), }, leek: { diff --git a/src/views/AgentChat/constants/tools.ts b/src/views/AgentChat/constants/tools.ts index 5b14ee7e..732c6e24 100644 --- a/src/views/AgentChat/constants/tools.ts +++ b/src/views/AgentChat/constants/tools.ts @@ -17,6 +17,25 @@ import { DollarSign, Search, Users, + // 量化工具图标 + TrendingDown, + BarChart2, + Gauge, + Flame, + ArrowUpDown, + Waves, + Target, + CandlestickChart, + Sparkles, + ShieldAlert, + Calculator, + Zap, + Percent, + GitCompare, + Shuffle, + Brain, + Combine, + Scale, } from 'lucide-react'; /** @@ -29,6 +48,15 @@ export enum ToolCategory { RESEARCH = '研报路演', STOCK_DATA = '股票数据', USER_DATA = '用户数据', + // 量化分析类别 + QUANT_CLASSIC = '经典技术指标', + QUANT_VOLUME = '资金与情绪', + QUANT_PATTERN = '形态与突破', + QUANT_RISK = '风险与估值', + QUANT_MINUTE = '分钟级算子', + QUANT_TREND = '高级趋势', + QUANT_LIQUIDITY = '流动性统计', + QUANT_STRATEGY = '配对与策略', } /** @@ -203,6 +231,218 @@ export const MCP_TOOLS: MCPTool[] = [ category: ToolCategory.USER_DATA, description: '用户关注的重大事件', }, + + // ==================== 量化工具:经典技术指标 ==================== + { + id: 'get_macd_signal', + name: 'MACD信号', + icon: React.createElement(TrendingUp, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_CLASSIC, + description: 'MACD金叉/死叉、动能分析、背离检测', + }, + { + id: 'check_oscillator_status', + name: 'RSI/KDJ指标', + icon: React.createElement(Gauge, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_CLASSIC, + description: 'RSI + KDJ 超买超卖分析', + }, + { + id: 'analyze_bollinger_bands', + name: '布林带分析', + icon: React.createElement(ArrowUpDown, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_CLASSIC, + description: '带宽、位置、收窄判断', + }, + { + id: 'calc_stop_loss_atr', + name: 'ATR止损计算', + icon: React.createElement(ShieldAlert, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_CLASSIC, + description: '基于ATR的动态止损位计算', + }, + + // ==================== 量化工具:资金与情绪 ==================== + { + id: 'analyze_market_heat', + name: '市场热度分析', + icon: React.createElement(Flame, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_VOLUME, + description: '换手率热度分级 + OBV趋势', + }, + { + id: 'check_volume_price_divergence', + name: '量价背离检测', + icon: React.createElement(GitCompare, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_VOLUME, + description: '价量不匹配异常检测', + }, + { + id: 'analyze_obv_trend', + name: 'OBV能量潮', + icon: React.createElement(Waves, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_VOLUME, + description: 'OBV独立分析+背离检测', + }, + + // ==================== 量化工具:形态与突破 ==================== + { + id: 'check_new_high_breakout', + name: '新高突破检测', + icon: React.createElement(Target, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_PATTERN, + description: '20/60日唐奇安通道新高突破', + }, + { + id: 'identify_candlestick_pattern', + name: 'K线形态识别', + icon: React.createElement(CandlestickChart, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_PATTERN, + description: '10+种经典K线组合形态', + }, + { + id: 'find_price_gaps', + name: '跳空缺口分析', + icon: React.createElement(Sparkles, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_PATTERN, + description: '未回补缺口筛选与分析', + }, + + // ==================== 量化工具:风险与估值 ==================== + { + id: 'calc_max_drawdown', + name: '最大回撤计算', + icon: React.createElement(TrendingDown, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_RISK, + description: '含夏普比率的回撤分析', + }, + { + id: 'check_valuation_rank', + name: 'PE估值百分位', + icon: React.createElement(Percent, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_RISK, + description: 'PE历史百分位 + PEG修正', + }, + { + id: 'calc_price_zscore', + name: 'Z-Score乖离率', + icon: React.createElement(Calculator, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_RISK, + description: '价格偏离均值程度+回归概率', + }, + + // ==================== 量化工具:分钟级高阶算子 ==================== + { + id: 'calc_market_profile_vpoc', + name: 'VPOC筹码峰', + icon: React.createElement(BarChart2, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_MINUTE, + description: '成交量密集区分析', + }, + { + id: 'calc_realized_volatility', + name: '已实现波动率', + icon: React.createElement(Activity, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_MINUTE, + description: '分钟级RV精确波动率', + }, + { + id: 'analyze_buying_pressure', + name: '买卖压力分析', + icon: React.createElement(Scale, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_MINUTE, + description: '主力意图捕捉与压力失衡', + }, + { + id: 'calc_parkinson_volatility', + name: '帕金森波动率', + icon: React.createElement(Zap, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_MINUTE, + description: '基于High/Low的精确波动率', + }, + + // ==================== 量化工具:高级趋势分析 ==================== + { + id: 'calc_bollinger_squeeze', + name: '布林带挤压', + icon: React.createElement(Combine, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_TREND, + description: '带宽历史百分位,变盘预警', + }, + { + id: 'calc_trend_slope', + name: '趋势斜率分析', + icon: React.createElement(LineChart, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_TREND, + description: 'R²拟合度+斜率方向判断', + }, + { + id: 'calc_hurst_exponent', + name: 'Hurst指数', + icon: React.createElement(Brain, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_TREND, + description: '趋势/均值回归特征判断', + }, + { + id: 'decompose_trend_simple', + name: '趋势分解', + icon: React.createElement(Shuffle, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_TREND, + description: '趋势+周期+残差分解', + }, + + // ==================== 量化工具:流动性与统计 ==================== + { + id: 'calc_amihud_illiquidity', + name: 'Amihud流动性', + icon: React.createElement(DollarSign, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_LIQUIDITY, + description: '大单冲击成本评估', + }, + { + id: 'calc_price_entropy', + name: '价格熵值', + icon: React.createElement(Activity, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_LIQUIDITY, + description: '市场混乱度/可预测性分析', + }, + { + id: 'calc_rsi_divergence', + name: 'RSI背离检测', + icon: React.createElement(GitCompare, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_LIQUIDITY, + description: 'RSI顶底背离独立分析', + }, + + // ==================== 量化工具:配对与策略 ==================== + { + id: 'test_cointegration', + name: '协整性测试', + icon: React.createElement(Combine, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_STRATEGY, + description: '配对交易信号与对冲比率', + }, + { + id: 'calc_kelly_position', + name: '凯利仓位计算', + icon: React.createElement(Calculator, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_STRATEGY, + description: '基于胜率盈亏比的最优仓位', + }, + { + id: 'search_similar_kline', + name: '相似K线检索', + icon: React.createElement(Search, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_STRATEGY, + description: '历史形态匹配预测', + }, + { + id: 'get_comprehensive_analysis', + name: '综合技术分析', + icon: React.createElement(BarChart3, { className: 'w-4 h-4' }), + category: ToolCategory.QUANT_STRATEGY, + description: '多指标汇总分析报告', + }, ]; /** @@ -216,6 +456,15 @@ export const TOOL_CATEGORIES: Record = { [ToolCategory.RESEARCH]: MCP_TOOLS.filter((t) => t.category === ToolCategory.RESEARCH), [ToolCategory.STOCK_DATA]: MCP_TOOLS.filter((t) => t.category === ToolCategory.STOCK_DATA), [ToolCategory.USER_DATA]: MCP_TOOLS.filter((t) => t.category === ToolCategory.USER_DATA), + // 量化工具类别 + [ToolCategory.QUANT_CLASSIC]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_CLASSIC), + [ToolCategory.QUANT_VOLUME]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_VOLUME), + [ToolCategory.QUANT_PATTERN]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_PATTERN), + [ToolCategory.QUANT_RISK]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_RISK), + [ToolCategory.QUANT_MINUTE]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_MINUTE), + [ToolCategory.QUANT_TREND]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_TREND), + [ToolCategory.QUANT_LIQUIDITY]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_LIQUIDITY), + [ToolCategory.QUANT_STRATEGY]: MCP_TOOLS.filter((t) => t.category === ToolCategory.QUANT_STRATEGY), }; /** diff --git a/test_quant_tools.py b/test_quant_tools.py new file mode 100644 index 00000000..c63984bb --- /dev/null +++ b/test_quant_tools.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +量化工具测试脚本 +测试 mcp_quant.py 中的 28 个量化因子工具是否正常工作 + +使用方法: + python test_quant_tools.py [股票代码] + +示例: + python test_quant_tools.py 600519 # 测试贵州茅台 + python test_quant_tools.py 000858 # 测试五粮液 + python test_quant_tools.py # 默认使用 600519 +""" + +import asyncio +import sys +import time +import io +from datetime import datetime, timedelta +from typing import Dict, Any, List, Tuple + +# 设置标准输出编码为 UTF-8 +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + +# 导入量化工具模块 +try: + import mcp_quant as quant +except ImportError: + print("[X] Cannot import mcp_quant module, please run from project root") + sys.exit(1) + + +# 颜色输出 (Windows 兼容) +class Colors: + GREEN = '\033[92m' + RED = '\033[91m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + CYAN = '\033[96m' + RESET = '\033[0m' + BOLD = '\033[1m' + + +def print_header(title: str): + """打印标题""" + print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN} {title}{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.RESET}\n") + + +def print_section(title: str): + """打印分节标题""" + print(f"\n{Colors.BOLD}{Colors.BLUE}>> {title}{Colors.RESET}") + print(f"{Colors.BLUE}{'-'*50}{Colors.RESET}") + + +def print_result(name: str, success: bool, description: str = "", time_ms: float = 0): + """打印测试结果""" + status = f"{Colors.GREEN}[OK]{Colors.RESET}" if success else f"{Colors.RED}[FAIL]{Colors.RESET}" + time_str = f"{Colors.YELLOW}({time_ms:.0f}ms){Colors.RESET}" if time_ms > 0 else "" + print(f" {status} {name} {time_str}") + if description: + # 截断过长的描述 + desc = description[:80] + "..." if len(description) > 80 else description + print(f" {Colors.CYAN}-> {desc}{Colors.RESET}") + + +async def test_tool(func, *args, **kwargs) -> Tuple[bool, str, float]: + """ + 测试单个工具 + 返回: (是否成功, 描述信息, 耗时ms) + """ + start = time.time() + try: + result = await func(*args, **kwargs) + elapsed = (time.time() - start) * 1000 + + if result.get("success"): + desc = result.get("data", {}).get("description", "") + return True, desc, elapsed + else: + return False, result.get("error", "未知错误"), elapsed + except Exception as e: + elapsed = (time.time() - start) * 1000 + return False, str(e), elapsed + + +async def run_tests(stock_code: str = "600519"): + """运行所有量化工具测试""" + + print_header(f"量化工具测试 - 股票代码: {stock_code}") + + results: List[Tuple[str, bool, str, float]] = [] + + # ==================== 一、经典技术指标 ==================== + print_section("一、经典技术指标 (4个)") + + # 1. MACD信号 + success, desc, ms = await test_tool(quant.get_macd_signal, stock_code) + print_result("get_macd_signal (MACD信号)", success, desc, ms) + results.append(("get_macd_signal", success, desc, ms)) + + # 2. RSI/KDJ指标 + success, desc, ms = await test_tool(quant.check_oscillator_status, stock_code) + print_result("check_oscillator_status (RSI/KDJ)", success, desc, ms) + results.append(("check_oscillator_status", success, desc, ms)) + + # 3. 布林带分析 + success, desc, ms = await test_tool(quant.analyze_bollinger_bands, stock_code) + print_result("analyze_bollinger_bands (布林带)", success, desc, ms) + results.append(("analyze_bollinger_bands", success, desc, ms)) + + # 4. ATR止损 + success, desc, ms = await test_tool(quant.calc_stop_loss_atr, stock_code) + print_result("calc_stop_loss_atr (ATR止损)", success, desc, ms) + results.append(("calc_stop_loss_atr", success, desc, ms)) + + # ==================== 二、资金与情绪 ==================== + print_section("二、资金与情绪 (3个)") + + # 5. 市场热度 + success, desc, ms = await test_tool(quant.analyze_market_heat, stock_code) + print_result("analyze_market_heat (市场热度)", success, desc, ms) + results.append(("analyze_market_heat", success, desc, ms)) + + # 6. 量价背离 + success, desc, ms = await test_tool(quant.check_volume_price_divergence, stock_code) + print_result("check_volume_price_divergence (量价背离)", success, desc, ms) + results.append(("check_volume_price_divergence", success, desc, ms)) + + # 7. OBV能量潮 + success, desc, ms = await test_tool(quant.analyze_obv_trend, stock_code) + print_result("analyze_obv_trend (OBV能量潮)", success, desc, ms) + results.append(("analyze_obv_trend", success, desc, ms)) + + # ==================== 三、形态与突破 ==================== + print_section("三、形态与突破 (3个)") + + # 8. 新高突破 + success, desc, ms = await test_tool(quant.check_new_high_breakout, stock_code) + print_result("check_new_high_breakout (新高突破)", success, desc, ms) + results.append(("check_new_high_breakout", success, desc, ms)) + + # 9. K线形态 + success, desc, ms = await test_tool(quant.identify_candlestick_pattern, stock_code) + print_result("identify_candlestick_pattern (K线形态)", success, desc, ms) + results.append(("identify_candlestick_pattern", success, desc, ms)) + + # 10. 跳空缺口 + success, desc, ms = await test_tool(quant.find_price_gaps, stock_code) + print_result("find_price_gaps (跳空缺口)", success, desc, ms) + results.append(("find_price_gaps", success, desc, ms)) + + # ==================== 四、风险与估值 ==================== + print_section("四、风险与估值 (3个)") + + # 11. 最大回撤 + success, desc, ms = await test_tool(quant.calc_max_drawdown, stock_code) + print_result("calc_max_drawdown (最大回撤)", success, desc, ms) + results.append(("calc_max_drawdown", success, desc, ms)) + + # 12. PE估值百分位 + success, desc, ms = await test_tool(quant.check_valuation_rank, stock_code) + print_result("check_valuation_rank (PE估值)", success, desc, ms) + results.append(("check_valuation_rank", success, desc, ms)) + + # 13. Z-Score乖离率 + success, desc, ms = await test_tool(quant.calc_price_zscore, stock_code) + print_result("calc_price_zscore (Z-Score)", success, desc, ms) + results.append(("calc_price_zscore", success, desc, ms)) + + # ==================== 五、分钟级高阶算子 ==================== + print_section("五、分钟级高阶算子 (4个)") + print(f" (自动使用最近交易日数据)") + + # 14. VPOC筹码峰 + success, desc, ms = await test_tool(quant.calc_market_profile_vpoc, stock_code) + print_result("calc_market_profile_vpoc (VPOC)", success, desc, ms) + results.append(("calc_market_profile_vpoc", success, desc, ms)) + + # 15. 已实现波动率 + success, desc, ms = await test_tool(quant.calc_realized_volatility, stock_code) + print_result("calc_realized_volatility (RV波动率)", success, desc, ms) + results.append(("calc_realized_volatility", success, desc, ms)) + + # 16. 买卖压力 + success, desc, ms = await test_tool(quant.analyze_buying_pressure, stock_code) + print_result("analyze_buying_pressure (买卖压力)", success, desc, ms) + results.append(("analyze_buying_pressure", success, desc, ms)) + + # 17. 帕金森波动率 + success, desc, ms = await test_tool(quant.calc_parkinson_volatility, stock_code) + print_result("calc_parkinson_volatility (帕金森波动率)", success, desc, ms) + results.append(("calc_parkinson_volatility", success, desc, ms)) + + # ==================== 六、高级趋势分析 ==================== + print_section("六、高级趋势分析 (4个)") + + # 18. 布林带挤压 + success, desc, ms = await test_tool(quant.calc_bollinger_squeeze, stock_code) + print_result("calc_bollinger_squeeze (布林带挤压)", success, desc, ms) + results.append(("calc_bollinger_squeeze", success, desc, ms)) + + # 19. 趋势斜率 + success, desc, ms = await test_tool(quant.calc_trend_slope, stock_code) + print_result("calc_trend_slope (趋势斜率)", success, desc, ms) + results.append(("calc_trend_slope", success, desc, ms)) + + # 20. Hurst指数 + success, desc, ms = await test_tool(quant.calc_hurst_exponent, stock_code) + print_result("calc_hurst_exponent (Hurst指数)", success, desc, ms) + results.append(("calc_hurst_exponent", success, desc, ms)) + + # 21. 趋势分解 + success, desc, ms = await test_tool(quant.decompose_trend_simple, stock_code) + print_result("decompose_trend_simple (趋势分解)", success, desc, ms) + results.append(("decompose_trend_simple", success, desc, ms)) + + # ==================== 七、流动性与统计 ==================== + print_section("七、流动性与统计 (3个)") + + # 22. Amihud流动性 + success, desc, ms = await test_tool(quant.calc_amihud_illiquidity, stock_code) + print_result("calc_amihud_illiquidity (Amihud)", success, desc, ms) + results.append(("calc_amihud_illiquidity", success, desc, ms)) + + # 23. 价格熵值 + success, desc, ms = await test_tool(quant.calc_price_entropy, stock_code) + print_result("calc_price_entropy (价格熵值)", success, desc, ms) + results.append(("calc_price_entropy", success, desc, ms)) + + # 24. RSI背离 + success, desc, ms = await test_tool(quant.calc_rsi_divergence, stock_code) + print_result("calc_rsi_divergence (RSI背离)", success, desc, ms) + results.append(("calc_rsi_divergence", success, desc, ms)) + + # ==================== 八、配对与策略 ==================== + print_section("八、配对与策略 (4个)") + + # 25. 协整性测试 (需要两只股票) + success, desc, ms = await test_tool(quant.test_cointegration, stock_code, "000858") + print_result("test_cointegration (协整性测试)", success, desc, ms) + results.append(("test_cointegration", success, desc, ms)) + + # 26. 凯利仓位 (纯计算,不需要股票代码) + success, desc, ms = await test_tool(quant.calc_kelly_position, 0.55, 2.0) + print_result("calc_kelly_position (凯利仓位)", success, desc, ms) + results.append(("calc_kelly_position", success, desc, ms)) + + # 27. 相似K线检索 + success, desc, ms = await test_tool(quant.search_similar_kline, stock_code) + print_result("search_similar_kline (相似K线)", success, desc, ms) + results.append(("search_similar_kline", success, desc, ms)) + + # 28. 综合技术分析 + success, desc, ms = await test_tool(quant.get_comprehensive_analysis, stock_code) + print_result("get_comprehensive_analysis (综合分析)", success, desc, ms) + results.append(("get_comprehensive_analysis", success, desc, ms)) + + # ==================== 统计结果 ==================== + print_header("测试结果统计") + + passed = sum(1 for r in results if r[1]) + failed = sum(1 for r in results if not r[1]) + total = len(results) + total_time = sum(r[3] for r in results) + + print(f" 总计: {total} 个工具") + print(f" {Colors.GREEN}通过: {passed} 个{Colors.RESET}") + print(f" {Colors.RED}失败: {failed} 个{Colors.RESET}") + print(f" 成功率: {passed/total*100:.1f}%") + print(f" 总耗时: {total_time/1000:.2f} 秒") + print(f" 平均耗时: {total_time/total:.0f} ms/工具") + + # 打印失败的工具 + if failed > 0: + print(f"\n{Colors.RED}失败的工具:{Colors.RESET}") + for name, success, desc, ms in results: + if not success: + print(f" - {name}: {desc}") + + print() + return passed == total + + +if __name__ == "__main__": + # 获取股票代码参数 + stock_code = sys.argv[1] if len(sys.argv) > 1 else "600519" + + # 运行测试 + success = asyncio.run(run_tests(stock_code)) + + # 返回退出码 + sys.exit(0 if success else 1)