2781 lines
94 KiB
Python
2781 lines
94 KiB
Python
"""
|
||
MCP Quant - 量化因子计算模块
|
||
基于日线(MySQL ea_trade)和分钟频(ClickHouse stock_minute)数据计算技术指标和量化因子
|
||
|
||
数据源:
|
||
- MySQL ea_trade: OHLCV, 换手率, 涨跌幅, PE等日线数据
|
||
- ClickHouse stock_minute: 分钟级 OHLCV 数据
|
||
|
||
设计原则:
|
||
1. 返回"状态"而非原始数值 - 便于大模型理解
|
||
2. 支持批量计算 - 提高效率
|
||
3. 错误优雅降级 - 数据不足时返回 None 而非报错
|
||
"""
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
from typing import Dict, List, Any, Optional, Tuple, Literal
|
||
from datetime import datetime, timedelta
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
import logging
|
||
import asyncio
|
||
|
||
# 导入数据库模块
|
||
import mcp_database as db
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ==================== 信号枚举定义 ====================
|
||
|
||
class TrendSignal(str, Enum):
|
||
"""趋势信号"""
|
||
STRONG_BULLISH = "强势上涨"
|
||
BULLISH = "上涨"
|
||
NEUTRAL = "震荡"
|
||
BEARISH = "下跌"
|
||
STRONG_BEARISH = "强势下跌"
|
||
|
||
|
||
class MACDSignal(str, Enum):
|
||
"""MACD信号"""
|
||
GOLDEN_CROSS = "金叉"
|
||
DEATH_CROSS = "死叉"
|
||
BULLISH_DIVERGENCE = "底背离"
|
||
BEARISH_DIVERGENCE = "顶背离"
|
||
MOMENTUM_INCREASING = "动能增强"
|
||
MOMENTUM_DECREASING = "动能减弱"
|
||
NEUTRAL = "中性"
|
||
|
||
|
||
class OscillatorZone(str, Enum):
|
||
"""超买超卖区域"""
|
||
OVERBOUGHT = "超买"
|
||
OVERSOLD = "超卖"
|
||
NEUTRAL = "中性"
|
||
|
||
|
||
class BollingerSignal(str, Enum):
|
||
"""布林带信号"""
|
||
ABOVE_UPPER = "触及上轨压力"
|
||
BELOW_LOWER = "触及下轨支撑"
|
||
ABOVE_MIDDLE = "中轨之上强势"
|
||
BELOW_MIDDLE = "中轨之下弱势"
|
||
SQUEEZE = "布林带收窄变盘在即"
|
||
EXPANSION = "布林带扩张趋势加速"
|
||
|
||
|
||
class VolumeSignal(str, Enum):
|
||
"""量能信号"""
|
||
VOLUME_SURGE = "放量"
|
||
VOLUME_SHRINK = "缩量"
|
||
VOLUME_PRICE_DIVERGENCE = "量价背离"
|
||
ACCUMULATION = "主力吸筹"
|
||
DISTRIBUTION = "主力出货"
|
||
NORMAL = "正常"
|
||
|
||
|
||
class BreakoutSignal(str, Enum):
|
||
"""突破信号"""
|
||
NEW_HIGH_BREAKOUT = "突破新高"
|
||
NEW_LOW_BREAKDOWN = "跌破新低"
|
||
RESISTANCE_TEST = "测试压力位"
|
||
SUPPORT_TEST = "测试支撑位"
|
||
NO_SIGNAL = "无信号"
|
||
|
||
|
||
# ==================== 数据类定义 ====================
|
||
|
||
@dataclass
|
||
class MACDResult:
|
||
"""MACD计算结果"""
|
||
signal: MACDSignal
|
||
dif: float
|
||
dea: float
|
||
macd_bar: float
|
||
trend_strength: str # 弱/中/强
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class OscillatorResult:
|
||
"""超买超卖指标结果"""
|
||
zone: OscillatorZone
|
||
rsi_value: float
|
||
kdj_k: float
|
||
kdj_d: float
|
||
kdj_j: float
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class BollingerResult:
|
||
"""布林带分析结果"""
|
||
signal: BollingerSignal
|
||
upper: float
|
||
middle: float
|
||
lower: float
|
||
bandwidth: float # 带宽百分比
|
||
position: str # 价格在通道中的位置描述
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class ATRResult:
|
||
"""ATR计算结果"""
|
||
atr: float
|
||
atr_percent: float # ATR占价格的百分比
|
||
suggested_stop_loss: float # 建议止损价
|
||
suggested_stop_loss_pct: float # 建议止损幅度
|
||
volatility_level: str # 低/中/高
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class VolumeAnalysisResult:
|
||
"""成交量分析结果"""
|
||
signal: VolumeSignal
|
||
turnover_rate: float
|
||
volume_ratio: float # 相对于MA5的量比
|
||
obv_trend: str # OBV趋势
|
||
heat_level: str # 冷门/正常/活跃/火热/极热
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class BreakoutResult:
|
||
"""突破信号结果"""
|
||
signal: BreakoutSignal
|
||
high_20d: float
|
||
low_20d: float
|
||
high_60d: float
|
||
low_60d: float
|
||
distance_to_high: float # 距离20日高点的百分比
|
||
distance_to_low: float # 距离20日低点的百分比
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class RiskMetricsResult:
|
||
"""风险指标结果"""
|
||
max_drawdown: float # 最大回撤
|
||
max_drawdown_period: str # 最大回撤发生期间
|
||
sharpe_ratio: float # 夏普比率
|
||
volatility: float # 波动率
|
||
risk_level: str # 低风险/中风险/高风险
|
||
description: str
|
||
|
||
|
||
@dataclass
|
||
class ValuationResult:
|
||
"""估值分析结果"""
|
||
pe_current: float
|
||
pe_percentile: float # PE历史百分位
|
||
pb_current: Optional[float]
|
||
peg_ratio: Optional[float]
|
||
valuation_level: str # 低估/合理/高估
|
||
description: str
|
||
|
||
|
||
# ==================== 核心计算函数 ====================
|
||
|
||
def _calc_ema(data: np.ndarray, period: int) -> np.ndarray:
|
||
"""计算EMA指数移动平均"""
|
||
ema = np.zeros_like(data)
|
||
ema[0] = data[0]
|
||
multiplier = 2 / (period + 1)
|
||
for i in range(1, len(data)):
|
||
ema[i] = (data[i] - ema[i-1]) * multiplier + ema[i-1]
|
||
return ema
|
||
|
||
|
||
def _calc_sma(data: np.ndarray, period: int) -> np.ndarray:
|
||
"""计算SMA简单移动平均"""
|
||
return pd.Series(data).rolling(window=period, min_periods=1).mean().values
|
||
|
||
|
||
def _calc_std(data: np.ndarray, period: int) -> np.ndarray:
|
||
"""计算滚动标准差"""
|
||
return pd.Series(data).rolling(window=period, min_periods=1).std().values
|
||
|
||
|
||
def _calc_rsi(close: np.ndarray, period: int = 14) -> np.ndarray:
|
||
"""计算RSI相对强弱指标"""
|
||
delta = np.diff(close)
|
||
gain = np.where(delta > 0, delta, 0)
|
||
loss = np.where(delta < 0, -delta, 0)
|
||
|
||
avg_gain = pd.Series(gain).rolling(window=period, min_periods=1).mean().values
|
||
avg_loss = pd.Series(loss).rolling(window=period, min_periods=1).mean().values
|
||
|
||
rs = np.where(avg_loss != 0, avg_gain / avg_loss, 100)
|
||
rsi = 100 - (100 / (1 + rs))
|
||
|
||
# 补齐第一个值
|
||
return np.insert(rsi, 0, 50)
|
||
|
||
|
||
def _calc_kdj(high: np.ndarray, low: np.ndarray, close: np.ndarray,
|
||
n: int = 9, m1: int = 3, m2: int = 3) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""计算KDJ指标"""
|
||
length = len(close)
|
||
rsv = np.zeros(length)
|
||
k = np.zeros(length)
|
||
d = np.zeros(length)
|
||
j = np.zeros(length)
|
||
|
||
for i in range(length):
|
||
start = max(0, i - n + 1)
|
||
high_n = np.max(high[start:i+1])
|
||
low_n = np.min(low[start:i+1])
|
||
|
||
if high_n != low_n:
|
||
rsv[i] = (close[i] - low_n) / (high_n - low_n) * 100
|
||
else:
|
||
rsv[i] = 50
|
||
|
||
if i == 0:
|
||
k[i] = 50
|
||
d[i] = 50
|
||
else:
|
||
k[i] = (m1 - 1) / m1 * k[i-1] + 1 / m1 * rsv[i]
|
||
d[i] = (m2 - 1) / m2 * d[i-1] + 1 / m2 * k[i]
|
||
|
||
j[i] = 3 * k[i] - 2 * d[i]
|
||
|
||
return k, d, j
|
||
|
||
|
||
def _calc_macd(close: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""计算MACD指标"""
|
||
ema_fast = _calc_ema(close, fast)
|
||
ema_slow = _calc_ema(close, slow)
|
||
dif = ema_fast - ema_slow
|
||
dea = _calc_ema(dif, signal)
|
||
macd_bar = 2 * (dif - dea)
|
||
return dif, dea, macd_bar
|
||
|
||
|
||
def _calc_bollinger(close: np.ndarray, period: int = 20, std_dev: float = 2) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""计算布林带"""
|
||
middle = _calc_sma(close, period)
|
||
std = _calc_std(close, period)
|
||
upper = middle + std_dev * std
|
||
lower = middle - std_dev * std
|
||
return upper, middle, lower
|
||
|
||
|
||
def _calc_atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> np.ndarray:
|
||
"""计算ATR真实波幅"""
|
||
tr = np.zeros(len(close))
|
||
tr[0] = high[0] - low[0]
|
||
|
||
for i in range(1, len(close)):
|
||
tr[i] = max(
|
||
high[i] - low[i],
|
||
abs(high[i] - close[i-1]),
|
||
abs(low[i] - close[i-1])
|
||
)
|
||
|
||
atr = _calc_sma(tr, period)
|
||
return atr
|
||
|
||
|
||
def _calc_obv(close: np.ndarray, volume: np.ndarray) -> np.ndarray:
|
||
"""计算OBV能量潮"""
|
||
obv = np.zeros(len(close))
|
||
obv[0] = volume[0]
|
||
|
||
for i in range(1, len(close)):
|
||
if close[i] > close[i-1]:
|
||
obv[i] = obv[i-1] + volume[i]
|
||
elif close[i] < close[i-1]:
|
||
obv[i] = obv[i-1] - volume[i]
|
||
else:
|
||
obv[i] = obv[i-1]
|
||
|
||
return obv
|
||
|
||
|
||
def _detect_divergence(price: np.ndarray, indicator: np.ndarray, lookback: int = 20) -> Optional[str]:
|
||
"""检测背离"""
|
||
if len(price) < lookback:
|
||
return None
|
||
|
||
recent_price = price[-lookback:]
|
||
recent_indicator = indicator[-lookback:]
|
||
|
||
# 找到价格的高点和低点
|
||
price_high_idx = np.argmax(recent_price)
|
||
price_low_idx = np.argmin(recent_price)
|
||
|
||
# 检查顶背离:价格创新高但指标没创新高
|
||
if price_high_idx > lookback // 2: # 高点在后半段
|
||
prev_high = np.max(recent_price[:lookback//2])
|
||
if recent_price[price_high_idx] > prev_high:
|
||
prev_indicator_high = np.max(recent_indicator[:lookback//2])
|
||
if recent_indicator[price_high_idx] < prev_indicator_high:
|
||
return "bearish_divergence"
|
||
|
||
# 检查底背离:价格创新低但指标没创新低
|
||
if price_low_idx > lookback // 2: # 低点在后半段
|
||
prev_low = np.min(recent_price[:lookback//2])
|
||
if recent_price[price_low_idx] < prev_low:
|
||
prev_indicator_low = np.min(recent_indicator[:lookback//2])
|
||
if recent_indicator[price_low_idx] > prev_indicator_low:
|
||
return "bullish_divergence"
|
||
|
||
return None
|
||
|
||
|
||
# ==================== MCP 工具函数 ====================
|
||
|
||
async def get_macd_signal(code: str, days: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
获取MACD趋势判定信号
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数,默认60天
|
||
|
||
Returns:
|
||
MACD信号分析结果
|
||
"""
|
||
try:
|
||
# 获取日线数据
|
||
trade_data = await db.get_stock_trade_data(code, limit=days + 30)
|
||
|
||
if len(trade_data) < 35:
|
||
return {"success": False, "error": "数据不足,需要至少35个交易日"}
|
||
|
||
# 按日期排序(从旧到新)
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算MACD
|
||
dif, dea, macd_bar = _calc_macd(close)
|
||
|
||
# 获取最新值
|
||
current_dif = dif[-1]
|
||
current_dea = dea[-1]
|
||
current_bar = macd_bar[-1]
|
||
prev_bar = macd_bar[-2] if len(macd_bar) > 1 else 0
|
||
|
||
# 判断信号
|
||
signal = MACDSignal.NEUTRAL
|
||
|
||
# 金叉/死叉判断
|
||
if len(dif) >= 2:
|
||
if dif[-1] > dea[-1] and dif[-2] <= dea[-2]:
|
||
signal = MACDSignal.GOLDEN_CROSS
|
||
elif dif[-1] < dea[-1] and dif[-2] >= dea[-2]:
|
||
signal = MACDSignal.DEATH_CROSS
|
||
|
||
# 动能判断
|
||
if signal == MACDSignal.NEUTRAL:
|
||
if current_bar > 0 and current_bar > prev_bar:
|
||
signal = MACDSignal.MOMENTUM_INCREASING
|
||
elif current_bar < 0 and current_bar < prev_bar:
|
||
signal = MACDSignal.MOMENTUM_DECREASING
|
||
|
||
# 背离检测
|
||
divergence = _detect_divergence(close, dif, lookback=20)
|
||
if divergence == "bullish_divergence":
|
||
signal = MACDSignal.BULLISH_DIVERGENCE
|
||
elif divergence == "bearish_divergence":
|
||
signal = MACDSignal.BEARISH_DIVERGENCE
|
||
|
||
# 趋势强度
|
||
bar_abs = abs(current_bar)
|
||
if bar_abs < 0.5:
|
||
trend_strength = "弱"
|
||
elif bar_abs < 1.5:
|
||
trend_strength = "中"
|
||
else:
|
||
trend_strength = "强"
|
||
|
||
# 生成描述
|
||
descriptions = {
|
||
MACDSignal.GOLDEN_CROSS: f"MACD金叉形成,DIF上穿DEA,短期看涨信号,动能{trend_strength}",
|
||
MACDSignal.DEATH_CROSS: f"MACD死叉形成,DIF下穿DEA,短期看跌信号,动能{trend_strength}",
|
||
MACDSignal.BULLISH_DIVERGENCE: "出现底背离,股价创新低但MACD没创新低,可能反转向上",
|
||
MACDSignal.BEARISH_DIVERGENCE: "出现顶背离,股价创新高但MACD没创新高,上涨动能衰竭",
|
||
MACDSignal.MOMENTUM_INCREASING: f"红柱放大,上涨动能增强,趋势强度:{trend_strength}",
|
||
MACDSignal.MOMENTUM_DECREASING: f"绿柱放大,下跌动能增强,趋势强度:{trend_strength}",
|
||
MACDSignal.NEUTRAL: "MACD处于中性状态,无明显信号",
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"signal": signal.value,
|
||
"dif": round(current_dif, 4),
|
||
"dea": round(current_dea, 4),
|
||
"macd_bar": round(current_bar, 4),
|
||
"trend_strength": trend_strength,
|
||
"description": descriptions[signal],
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[MACD] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def check_oscillator_status(code: str, days: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
检查KDJ/RSI超买超卖状态
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
超买超卖状态分析
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
high = np.array([float(d['high_price']) for d in trade_data])
|
||
low = np.array([float(d['low_price']) for d in trade_data])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算RSI
|
||
rsi = _calc_rsi(close, 14)
|
||
current_rsi = rsi[-1]
|
||
|
||
# 计算KDJ
|
||
k, d, j = _calc_kdj(high, low, close)
|
||
current_k = k[-1]
|
||
current_d = d[-1]
|
||
current_j = j[-1]
|
||
|
||
# 判断区域
|
||
zone = OscillatorZone.NEUTRAL
|
||
|
||
# RSI超买超卖判断
|
||
if current_rsi > 80 or current_j > 100:
|
||
zone = OscillatorZone.OVERBOUGHT
|
||
elif current_rsi < 20 or current_j < 0:
|
||
zone = OscillatorZone.OVERSOLD
|
||
|
||
# 生成描述
|
||
if zone == OscillatorZone.OVERBOUGHT:
|
||
desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超买区域,短期回调风险较大"
|
||
elif zone == OscillatorZone.OVERSOLD:
|
||
desc = f"RSI={current_rsi:.1f},KDJ的J值={current_j:.1f},处于超卖区域,可能存在反弹机会"
|
||
else:
|
||
desc = f"RSI={current_rsi:.1f},KDJ(K={current_k:.1f},D={current_d:.1f},J={current_j:.1f}),处于中性区域"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"zone": zone.value,
|
||
"rsi_value": round(current_rsi, 2),
|
||
"kdj_k": round(current_k, 2),
|
||
"kdj_d": round(current_d, 2),
|
||
"kdj_j": round(current_j, 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Oscillator] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def analyze_bollinger_bands(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]:
|
||
"""
|
||
分析布林带通道
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
period: 布林带周期,默认20
|
||
|
||
Returns:
|
||
布林带分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < period + 5:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算布林带
|
||
upper, middle, lower = _calc_bollinger(close, period)
|
||
|
||
current_price = close[-1]
|
||
current_upper = upper[-1]
|
||
current_middle = middle[-1]
|
||
current_lower = lower[-1]
|
||
|
||
# 计算带宽
|
||
bandwidth = (current_upper - current_lower) / current_middle * 100
|
||
prev_bandwidth = (upper[-5] - lower[-5]) / middle[-5] * 100 if len(upper) > 5 else bandwidth
|
||
|
||
# 判断信号
|
||
signal = BollingerSignal.ABOVE_MIDDLE if current_price > current_middle else BollingerSignal.BELOW_MIDDLE
|
||
|
||
# 检查是否触及上下轨
|
||
if current_price >= current_upper * 0.99:
|
||
signal = BollingerSignal.ABOVE_UPPER
|
||
elif current_price <= current_lower * 1.01:
|
||
signal = BollingerSignal.BELOW_LOWER
|
||
|
||
# 检查布林带收窄/扩张
|
||
if bandwidth < prev_bandwidth * 0.7:
|
||
signal = BollingerSignal.SQUEEZE
|
||
elif bandwidth > prev_bandwidth * 1.3:
|
||
signal = BollingerSignal.EXPANSION
|
||
|
||
# 价格位置描述
|
||
position_pct = (current_price - current_lower) / (current_upper - current_lower) * 100
|
||
position = f"价格位于布林带{position_pct:.0f}%位置"
|
||
|
||
# 生成描述
|
||
descriptions = {
|
||
BollingerSignal.ABOVE_UPPER: f"股价触及布林带上轨({current_upper:.2f}),面临短期压力",
|
||
BollingerSignal.BELOW_LOWER: f"股价触及布林带下轨({current_lower:.2f}),可能存在支撑",
|
||
BollingerSignal.ABOVE_MIDDLE: f"股价在中轨({current_middle:.2f})之上运行,整体偏强",
|
||
BollingerSignal.BELOW_MIDDLE: f"股价在中轨({current_middle:.2f})之下运行,整体偏弱",
|
||
BollingerSignal.SQUEEZE: f"布林带收窄(带宽{bandwidth:.1f}%),变盘在即,关注突破方向",
|
||
BollingerSignal.EXPANSION: f"布林带扩张(带宽{bandwidth:.1f}%),趋势加速中",
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"signal": signal.value,
|
||
"upper": round(current_upper, 2),
|
||
"middle": round(current_middle, 2),
|
||
"lower": round(current_lower, 2),
|
||
"bandwidth": round(bandwidth, 2),
|
||
"position": position,
|
||
"description": descriptions[signal],
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Bollinger] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_stop_loss_atr(code: str, days: int = 30, atr_multiplier: float = 2.0) -> Dict[str, Any]:
|
||
"""
|
||
使用ATR计算止损位
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
atr_multiplier: ATR倍数,默认2倍
|
||
|
||
Returns:
|
||
ATR止损建议
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 15:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
high = np.array([float(d['high_price']) for d in trade_data])
|
||
low = np.array([float(d['low_price']) for d in trade_data])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算ATR
|
||
atr = _calc_atr(high, low, close, 14)
|
||
current_atr = atr[-1]
|
||
current_price = close[-1]
|
||
|
||
# 计算ATR百分比
|
||
atr_percent = current_atr / current_price * 100
|
||
|
||
# 计算止损价
|
||
stop_loss = current_price - atr_multiplier * current_atr
|
||
stop_loss_pct = (current_price - stop_loss) / current_price * 100
|
||
|
||
# 波动级别
|
||
if atr_percent < 2:
|
||
volatility_level = "低"
|
||
elif atr_percent < 4:
|
||
volatility_level = "中"
|
||
else:
|
||
volatility_level = "高"
|
||
|
||
desc = (f"ATR={current_atr:.2f}(占价格{atr_percent:.1f}%),波动性{volatility_level}。"
|
||
f"建议止损位:{stop_loss:.2f}({atr_multiplier}倍ATR),止损幅度{stop_loss_pct:.1f}%")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"atr": round(current_atr, 3),
|
||
"atr_percent": round(atr_percent, 2),
|
||
"suggested_stop_loss": round(stop_loss, 2),
|
||
"suggested_stop_loss_pct": round(stop_loss_pct, 2),
|
||
"volatility_level": volatility_level,
|
||
"current_price": round(current_price, 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ATR] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def analyze_market_heat(code: str, days: int = 30) -> Dict[str, Any]:
|
||
"""
|
||
分析换手率活跃度和量能
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
市场热度分析
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 10:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
volume = np.array([float(d['volume']) for d in trade_data])
|
||
turnover_rates = [float(d.get('turnover_rate', 0) or 0) for d in trade_data]
|
||
|
||
current_turnover = turnover_rates[-1]
|
||
avg_turnover_5 = np.mean(turnover_rates[-5:])
|
||
|
||
# 计算量比
|
||
volume_ma5 = _calc_sma(volume, 5)
|
||
volume_ratio = volume[-1] / volume_ma5[-1] if volume_ma5[-1] > 0 else 1
|
||
|
||
# 计算OBV
|
||
obv = _calc_obv(close, volume)
|
||
obv_ma5 = _calc_sma(obv, 5)
|
||
obv_trend = "上升" if obv[-1] > obv_ma5[-1] else "下降"
|
||
|
||
# 换手率分级
|
||
if current_turnover < 1:
|
||
heat_level = "冷门"
|
||
elif current_turnover < 3:
|
||
heat_level = "正常"
|
||
elif current_turnover < 7:
|
||
heat_level = "活跃"
|
||
elif current_turnover < 15:
|
||
heat_level = "火热"
|
||
else:
|
||
heat_level = "极热"
|
||
|
||
# 判断信号
|
||
signal = VolumeSignal.NORMAL
|
||
|
||
if volume_ratio > 2:
|
||
signal = VolumeSignal.VOLUME_SURGE
|
||
elif volume_ratio < 0.5:
|
||
signal = VolumeSignal.VOLUME_SHRINK
|
||
|
||
# 检查量价背离
|
||
price_up = close[-1] > close[-5] if len(close) > 5 else False
|
||
volume_down = volume[-1] < volume[-5] if len(volume) > 5 else False
|
||
|
||
if price_up and volume_down:
|
||
signal = VolumeSignal.VOLUME_PRICE_DIVERGENCE
|
||
|
||
# OBV判断主力动向
|
||
if obv_trend == "上升" and close[-1] <= close[-5]:
|
||
signal = VolumeSignal.ACCUMULATION
|
||
elif obv_trend == "下降" and close[-1] >= close[-5]:
|
||
signal = VolumeSignal.DISTRIBUTION
|
||
|
||
desc = (f"换手率{current_turnover:.2f}%({heat_level}),量比{volume_ratio:.2f},"
|
||
f"OBV趋势{obv_trend}。{signal.value}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"signal": signal.value,
|
||
"turnover_rate": round(current_turnover, 2),
|
||
"avg_turnover_5d": round(avg_turnover_5, 2),
|
||
"volume_ratio": round(volume_ratio, 2),
|
||
"obv_trend": obv_trend,
|
||
"heat_level": heat_level,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[MarketHeat] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def check_new_high_breakout(code: str, days: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
检查唐奇安通道突破(新高/新低突破)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
突破信号分析
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days + 10)
|
||
|
||
if len(trade_data) < 25:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
high = np.array([float(d['high_price']) for d in trade_data])
|
||
low = np.array([float(d['low_price']) for d in trade_data])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
current_price = close[-1]
|
||
|
||
# 计算20日和60日高低点
|
||
high_20d = np.max(high[-21:-1]) if len(high) > 21 else np.max(high[:-1])
|
||
low_20d = np.min(low[-21:-1]) if len(low) > 21 else np.min(low[:-1])
|
||
high_60d = np.max(high[-61:-1]) if len(high) > 61 else np.max(high[:-1])
|
||
low_60d = np.min(low[-61:-1]) if len(low) > 61 else np.min(low[:-1])
|
||
|
||
# 判断突破信号
|
||
signal = BreakoutSignal.NO_SIGNAL
|
||
|
||
if current_price > high_20d:
|
||
signal = BreakoutSignal.NEW_HIGH_BREAKOUT
|
||
elif current_price < low_20d:
|
||
signal = BreakoutSignal.NEW_LOW_BREAKDOWN
|
||
elif current_price > high_20d * 0.97:
|
||
signal = BreakoutSignal.RESISTANCE_TEST
|
||
elif current_price < low_20d * 1.03:
|
||
signal = BreakoutSignal.SUPPORT_TEST
|
||
|
||
# 计算距离
|
||
distance_to_high = (high_20d - current_price) / current_price * 100
|
||
distance_to_low = (current_price - low_20d) / current_price * 100
|
||
|
||
descriptions = {
|
||
BreakoutSignal.NEW_HIGH_BREAKOUT: f"突破20日新高({high_20d:.2f}),海龟交易法则买入信号触发",
|
||
BreakoutSignal.NEW_LOW_BREAKDOWN: f"跌破20日新低({low_20d:.2f}),海龟交易法则卖出信号触发",
|
||
BreakoutSignal.RESISTANCE_TEST: f"接近20日高点({high_20d:.2f}),距离{distance_to_high:.1f}%,关注能否突破",
|
||
BreakoutSignal.SUPPORT_TEST: f"接近20日低点({low_20d:.2f}),距离{distance_to_low:.1f}%,关注能否守住",
|
||
BreakoutSignal.NO_SIGNAL: f"价格在20日通道内运行({low_20d:.2f} - {high_20d:.2f})",
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"signal": signal.value,
|
||
"high_20d": round(high_20d, 2),
|
||
"low_20d": round(low_20d, 2),
|
||
"high_60d": round(high_60d, 2),
|
||
"low_60d": round(low_60d, 2),
|
||
"current_price": round(current_price, 2),
|
||
"distance_to_high_pct": round(distance_to_high, 2),
|
||
"distance_to_low_pct": round(distance_to_low, 2),
|
||
"description": descriptions[signal],
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Breakout] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_max_drawdown(code: str, days: int = 250) -> Dict[str, Any]:
|
||
"""
|
||
计算最大回撤
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数,默认250天(约一年)
|
||
|
||
Returns:
|
||
最大回撤分析
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
dates = [d['TRADEDATE'] for d in trade_data]
|
||
|
||
# 计算累计最大值
|
||
running_max = np.maximum.accumulate(close)
|
||
|
||
# 计算回撤
|
||
drawdown = (running_max - close) / running_max * 100
|
||
|
||
# 找到最大回撤
|
||
max_dd = np.max(drawdown)
|
||
max_dd_idx = np.argmax(drawdown)
|
||
|
||
# 找到回撤开始的位置(峰值)
|
||
peak_idx = np.argmax(close[:max_dd_idx + 1]) if max_dd_idx > 0 else 0
|
||
|
||
# 回撤期间
|
||
period_start = dates[peak_idx] if peak_idx < len(dates) else dates[0]
|
||
period_end = dates[max_dd_idx] if max_dd_idx < len(dates) else dates[-1]
|
||
|
||
# 计算波动率(年化)
|
||
returns = np.diff(close) / close[:-1]
|
||
volatility = np.std(returns) * np.sqrt(252) * 100
|
||
|
||
# 计算夏普比率(假设无风险利率2%)
|
||
avg_return = np.mean(returns) * 252 * 100
|
||
sharpe_ratio = (avg_return - 2) / volatility if volatility > 0 else 0
|
||
|
||
# 风险等级
|
||
if max_dd < 10:
|
||
risk_level = "低风险"
|
||
elif max_dd < 25:
|
||
risk_level = "中风险"
|
||
else:
|
||
risk_level = "高风险"
|
||
|
||
desc = (f"最大回撤{max_dd:.1f}%(从{period_start}到{period_end}),"
|
||
f"年化波动率{volatility:.1f}%,夏普比率{sharpe_ratio:.2f},{risk_level}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"max_drawdown": round(max_dd, 2),
|
||
"max_drawdown_period": f"{period_start} 至 {period_end}",
|
||
"sharpe_ratio": round(sharpe_ratio, 2),
|
||
"volatility": round(volatility, 2),
|
||
"risk_level": risk_level,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": len(trade_data)
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[MaxDrawdown] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def check_valuation_rank(code: str, years: int = 3) -> Dict[str, Any]:
|
||
"""
|
||
检查历史PE/PB百分位估值
|
||
|
||
Args:
|
||
code: 股票代码
|
||
years: 历史年数,默认3年
|
||
|
||
Returns:
|
||
估值分析结果
|
||
"""
|
||
try:
|
||
days = years * 250
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 60:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
# 提取PE数据
|
||
pe_values = [float(d.get('pe_ratio', 0) or 0) for d in trade_data if d.get('pe_ratio')]
|
||
pe_values = [pe for pe in pe_values if 0 < pe < 1000] # 过滤异常值
|
||
|
||
if len(pe_values) < 30:
|
||
return {"success": False, "error": "PE数据不足"}
|
||
|
||
current_pe = pe_values[-1]
|
||
|
||
# 计算百分位
|
||
pe_percentile = (np.sum(np.array(pe_values) < current_pe) / len(pe_values)) * 100
|
||
|
||
# 获取财务数据计算PEG
|
||
peg_ratio = None
|
||
try:
|
||
financial_data = await db.get_stock_financial_index(code, limit=2)
|
||
if len(financial_data) >= 2:
|
||
growth_rate = financial_data[0].get('profit_growth', 0) or 0
|
||
if growth_rate > 0:
|
||
peg_ratio = current_pe / growth_rate
|
||
except:
|
||
pass
|
||
|
||
# 估值水平判断
|
||
if pe_percentile < 20:
|
||
valuation_level = "低估"
|
||
elif pe_percentile < 50:
|
||
valuation_level = "较低"
|
||
elif pe_percentile < 80:
|
||
valuation_level = "合理"
|
||
else:
|
||
valuation_level = "高估"
|
||
|
||
# PEG修正
|
||
peg_desc = ""
|
||
if peg_ratio is not None:
|
||
if peg_ratio < 1:
|
||
peg_desc = f",PEG={peg_ratio:.2f}<1(成长性强)"
|
||
elif peg_ratio < 2:
|
||
peg_desc = f",PEG={peg_ratio:.2f}(合理)"
|
||
else:
|
||
peg_desc = f",PEG={peg_ratio:.2f}(偏贵)"
|
||
|
||
desc = (f"当前PE={current_pe:.1f},处于近{years}年{pe_percentile:.0f}%分位,"
|
||
f"估值{valuation_level}{peg_desc}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"pe_current": round(current_pe, 2),
|
||
"pe_percentile": round(pe_percentile, 1),
|
||
"pe_min": round(min(pe_values), 2),
|
||
"pe_max": round(max(pe_values), 2),
|
||
"pe_median": round(np.median(pe_values), 2),
|
||
"peg_ratio": round(peg_ratio, 2) if peg_ratio else None,
|
||
"valuation_level": valuation_level,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_years": years
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Valuation] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_price_zscore(code: str, period: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
计算价格Z-Score(乖离率标准化)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
period: 均线周期,默认60日
|
||
|
||
Returns:
|
||
Z-Score分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=period + 10)
|
||
|
||
if len(trade_data) < period:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算均线和标准差
|
||
ma = _calc_sma(close, period)
|
||
std = _calc_std(close, period)
|
||
|
||
current_price = close[-1]
|
||
current_ma = ma[-1]
|
||
current_std = std[-1]
|
||
|
||
# 计算Z-Score
|
||
zscore = (current_price - current_ma) / current_std if current_std > 0 else 0
|
||
|
||
# 计算乖离率
|
||
bias = (current_price - current_ma) / current_ma * 100
|
||
|
||
# 判断状态
|
||
if zscore > 2:
|
||
status = "极度偏高"
|
||
probability = "历史上此位置回落概率约95%"
|
||
elif zscore > 1:
|
||
status = "偏高"
|
||
probability = "历史上此位置回落概率约68%"
|
||
elif zscore < -2:
|
||
status = "极度偏低"
|
||
probability = "历史上此位置反弹概率约95%"
|
||
elif zscore < -1:
|
||
status = "偏低"
|
||
probability = "历史上此位置反弹概率约68%"
|
||
else:
|
||
status = "正常"
|
||
probability = "价格在正常波动范围内"
|
||
|
||
desc = f"Z-Score={zscore:.2f},乖离率{bias:+.1f}%,{status}。{probability}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"zscore": round(zscore, 3),
|
||
"bias": round(bias, 2),
|
||
"ma_period": period,
|
||
"current_ma": round(current_ma, 2),
|
||
"current_price": round(current_price, 2),
|
||
"status": status,
|
||
"mean_reversion_hint": probability,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ZScore] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# ==================== 分钟级高阶算子(基于ClickHouse) ====================
|
||
|
||
async def calc_market_profile_vpoc(code: str, date: str = None) -> Dict[str, Any]:
|
||
"""
|
||
计算市场轮廓VPOC(成交量最大的价格档位)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
date: 日期,格式 YYYY-MM-DD(可选,默认使用最近交易日)
|
||
|
||
Returns:
|
||
VPOC分析结果
|
||
"""
|
||
try:
|
||
# 获取分钟数据(如果未指定日期,获取最近的数据)
|
||
if date:
|
||
minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500)
|
||
else:
|
||
# 获取最近的分钟数据
|
||
minute_data = await db.get_stock_minute_data(code, limit=500)
|
||
|
||
if len(minute_data) < 10:
|
||
return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"}
|
||
|
||
# 从数据中提取实际日期
|
||
if not date and minute_data:
|
||
date = minute_data[0].get('timestamp', '')[:10]
|
||
|
||
# 按价格区间统计成交量
|
||
prices = np.array([float(d['close']) for d in minute_data])
|
||
volumes = np.array([float(d['volume']) for d in minute_data])
|
||
|
||
# 创建价格档位(将价格四舍五入到0.01)
|
||
price_bins = np.round(prices, 2)
|
||
|
||
# 统计每个价格档位的成交量
|
||
unique_prices, indices = np.unique(price_bins, return_inverse=True)
|
||
volume_by_price = np.zeros(len(unique_prices))
|
||
for i, idx in enumerate(indices):
|
||
volume_by_price[idx] += volumes[i]
|
||
|
||
# 找到VPOC
|
||
vpoc_idx = np.argmax(volume_by_price)
|
||
vpoc_price = unique_prices[vpoc_idx]
|
||
vpoc_volume = volume_by_price[vpoc_idx]
|
||
|
||
# 当前价格
|
||
current_price = prices[-1]
|
||
|
||
# 判断位置关系
|
||
if current_price > vpoc_price * 1.01:
|
||
position = "价格在VPOC之上,短期支撑位"
|
||
elif current_price < vpoc_price * 0.99:
|
||
position = "价格在VPOC之下,短期压力位"
|
||
else:
|
||
position = "价格在VPOC附近,成交密集区"
|
||
|
||
desc = f"VPOC价格={vpoc_price:.2f},成交量占比{vpoc_volume/np.sum(volumes)*100:.1f}%。{position}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"vpoc_price": round(vpoc_price, 2),
|
||
"vpoc_volume": int(vpoc_volume),
|
||
"vpoc_volume_pct": round(vpoc_volume / np.sum(volumes) * 100, 1),
|
||
"current_price": round(current_price, 2),
|
||
"position": position,
|
||
"description": desc,
|
||
"code": code,
|
||
"date": date
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[VPOC] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_realized_volatility(code: str, date: str = None) -> Dict[str, Any]:
|
||
"""
|
||
计算已实现波动率(Realized Volatility)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
date: 日期(可选,默认使用最近交易日)
|
||
|
||
Returns:
|
||
RV分析结果
|
||
"""
|
||
try:
|
||
# 获取分钟数据
|
||
if date:
|
||
minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500)
|
||
else:
|
||
minute_data = await db.get_stock_minute_data(code, limit=500)
|
||
|
||
if len(minute_data) < 30:
|
||
return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"}
|
||
|
||
# 从数据中提取实际日期
|
||
if not date and minute_data:
|
||
date = minute_data[0].get('timestamp', '')[:10]
|
||
|
||
# 按时间排序
|
||
minute_data = sorted(minute_data, key=lambda x: x['timestamp'])
|
||
|
||
close = np.array([float(d['close']) for d in minute_data])
|
||
|
||
# 计算分钟收益率
|
||
returns = np.diff(np.log(close))
|
||
|
||
# 计算已实现波动率(日内)
|
||
rv_intraday = np.sqrt(np.sum(returns ** 2)) * 100
|
||
|
||
# 年化波动率(假设一天240分钟,一年250天)
|
||
rv_annual = rv_intraday * np.sqrt(250) * 100
|
||
|
||
# 波动率水平
|
||
if rv_intraday < 1:
|
||
vol_level = "低波动"
|
||
elif rv_intraday < 3:
|
||
vol_level = "中波动"
|
||
else:
|
||
vol_level = "高波动"
|
||
|
||
desc = f"日内已实现波动率{rv_intraday:.2f}%(年化约{rv_annual:.0f}%),{vol_level}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"rv_intraday": round(rv_intraday, 3),
|
||
"rv_annualized": round(rv_annual, 1),
|
||
"volatility_level": vol_level,
|
||
"description": desc,
|
||
"code": code,
|
||
"date": date
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[RV] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def analyze_buying_pressure(code: str, date: str = None) -> Dict[str, Any]:
|
||
"""
|
||
分析买卖压力失衡
|
||
|
||
Args:
|
||
code: 股票代码
|
||
date: 日期(可选,默认使用最近交易日)
|
||
|
||
Returns:
|
||
买卖压力分析
|
||
"""
|
||
try:
|
||
# 获取分钟数据
|
||
if date:
|
||
minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500)
|
||
else:
|
||
minute_data = await db.get_stock_minute_data(code, limit=500)
|
||
|
||
if len(minute_data) < 30:
|
||
return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"}
|
||
|
||
# 从数据中提取实际日期
|
||
if not date and minute_data:
|
||
date = minute_data[0].get('timestamp', '')[:10]
|
||
|
||
minute_data = sorted(minute_data, key=lambda x: x['timestamp'])
|
||
|
||
high = np.array([float(d['high']) for d in minute_data])
|
||
low = np.array([float(d['low']) for d in minute_data])
|
||
close = np.array([float(d['close']) for d in minute_data])
|
||
volume = np.array([float(d['volume']) for d in minute_data])
|
||
|
||
# 计算买卖压力指标
|
||
# (Close - Low) - (High - Close),正值表示买压,负值表示卖压
|
||
pressure = (close - low) - (high - close)
|
||
|
||
# 成交量加权
|
||
weighted_pressure = np.sum(pressure * volume) / np.sum(volume)
|
||
|
||
# 计算买压占比
|
||
total_range = high - low
|
||
buying_pct = np.where(total_range > 0, (close - low) / total_range, 0.5)
|
||
avg_buying_pct = np.mean(buying_pct) * 100
|
||
|
||
# 判断信号
|
||
if weighted_pressure > 0.1:
|
||
signal = "强买压"
|
||
desc = f"盘中主力抢筹明显,买方占优,收盘偏向日内高点"
|
||
elif weighted_pressure < -0.1:
|
||
signal = "强卖压"
|
||
desc = f"盘中抛压沉重,卖方占优,收盘偏向日内低点"
|
||
else:
|
||
signal = "均衡"
|
||
desc = f"买卖双方博弈均衡,无明显方向"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"pressure_signal": signal,
|
||
"weighted_pressure": round(weighted_pressure, 4),
|
||
"avg_buying_pct": round(avg_buying_pct, 1),
|
||
"description": desc,
|
||
"code": code,
|
||
"date": date
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[BuyingPressure] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def check_volume_price_divergence(code: str, days: int = 20) -> Dict[str, Any]:
|
||
"""
|
||
检测量价背离
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
量价背离分析
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 10:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
volume = np.array([float(d['volume']) for d in trade_data])
|
||
|
||
# 检查价格趋势
|
||
price_trend = "up" if close[-1] > close[-5] else "down"
|
||
price_change = (close[-1] - close[-5]) / close[-5] * 100
|
||
|
||
# 检查成交量趋势
|
||
vol_avg_recent = np.mean(volume[-5:])
|
||
vol_avg_prev = np.mean(volume[-10:-5]) if len(volume) >= 10 else np.mean(volume[:-5])
|
||
vol_trend = "up" if vol_avg_recent > vol_avg_prev else "down"
|
||
vol_change = (vol_avg_recent - vol_avg_prev) / vol_avg_prev * 100 if vol_avg_prev > 0 else 0
|
||
|
||
# 判断背离
|
||
has_divergence = False
|
||
divergence_type = None
|
||
|
||
if price_trend == "up" and vol_trend == "down":
|
||
has_divergence = True
|
||
divergence_type = "顶背离"
|
||
desc = f"股价上涨{price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,上涨动能不足,警惕回调"
|
||
elif price_trend == "down" and vol_trend == "down":
|
||
has_divergence = True
|
||
divergence_type = "底背离"
|
||
desc = f"股价下跌{-price_change:.1f}%但成交量萎缩{-vol_change:.1f}%,抛压减弱,可能企稳"
|
||
else:
|
||
desc = f"量价配合正常,价格{price_trend},成交量{vol_trend}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"has_divergence": has_divergence,
|
||
"divergence_type": divergence_type,
|
||
"price_change_5d": round(price_change, 2),
|
||
"volume_change_5d": round(vol_change, 2),
|
||
"signal": "量价背离" if has_divergence else "量价正常",
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[VolPriceDivergence] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def identify_candlestick_pattern(code: str, days: int = 10) -> Dict[str, Any]:
|
||
"""
|
||
识别K线组合形态
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
K线形态识别结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 3:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
patterns = []
|
||
|
||
# 获取最近3天的OHLC
|
||
for i in range(len(trade_data) - 2, len(trade_data)):
|
||
if i < 0:
|
||
continue
|
||
d = trade_data[i]
|
||
open_p = float(d['open_price'])
|
||
high = float(d['high_price'])
|
||
low = float(d['low_price'])
|
||
close = float(d['close_price'])
|
||
|
||
body = close - open_p
|
||
upper_shadow = high - max(open_p, close)
|
||
lower_shadow = min(open_p, close) - low
|
||
body_size = abs(body)
|
||
|
||
# 十字星
|
||
if body_size < (high - low) * 0.1:
|
||
patterns.append(("十字星", "犹豫信号,可能变盘"))
|
||
|
||
# 锤子线
|
||
if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5:
|
||
patterns.append(("锤子线", "底部反转信号"))
|
||
|
||
# 上吊线
|
||
if lower_shadow > body_size * 2 and upper_shadow < body_size * 0.5 and body < 0:
|
||
patterns.append(("上吊线", "顶部反转信号"))
|
||
|
||
# 检查多日形态(需要至少3天数据)
|
||
if len(trade_data) >= 3:
|
||
d1, d2, d3 = trade_data[-3], trade_data[-2], trade_data[-1]
|
||
|
||
close1 = float(d1['close_price'])
|
||
open1 = float(d1['open_price'])
|
||
close2 = float(d2['close_price'])
|
||
open2 = float(d2['open_price'])
|
||
close3 = float(d3['close_price'])
|
||
open3 = float(d3['open_price'])
|
||
|
||
# 红三兵
|
||
if (close1 > open1 and close2 > open2 and close3 > open3 and
|
||
close2 > close1 and close3 > close2):
|
||
patterns.append(("红三兵", "强势上涨信号"))
|
||
|
||
# 三只乌鸦
|
||
if (close1 < open1 and close2 < open2 and close3 < open3 and
|
||
close2 < close1 and close3 < close2):
|
||
patterns.append(("三只乌鸦", "强势下跌信号"))
|
||
|
||
# 早晨之星
|
||
body1 = abs(close1 - open1)
|
||
body2 = abs(close2 - open2)
|
||
body3 = abs(close3 - open3)
|
||
|
||
if (close1 < open1 and body2 < body1 * 0.3 and close3 > open3 and
|
||
close3 > (open1 + close1) / 2):
|
||
patterns.append(("早晨之星", "底部反转强信号"))
|
||
|
||
# 黄昏之星
|
||
if (close1 > open1 and body2 < body1 * 0.3 and close3 < open3 and
|
||
close3 < (open1 + close1) / 2):
|
||
patterns.append(("黄昏之星", "顶部反转强信号"))
|
||
|
||
# 生成描述
|
||
if patterns:
|
||
pattern_names = [f"{p[0]}({p[1]})" for p in patterns]
|
||
desc = f"识别到以下K线形态:" + "、".join(pattern_names)
|
||
else:
|
||
desc = "未识别到明显的K线形态"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"patterns": [{"name": p[0], "meaning": p[1]} for p in patterns],
|
||
"pattern_count": len(patterns),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[CandlestickPattern] 识别失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def find_price_gaps(code: str, days: int = 30) -> Dict[str, Any]:
|
||
"""
|
||
寻找跳空缺口
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
缺口分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 5:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
gaps = []
|
||
current_price = float(trade_data[-1]['close_price'])
|
||
|
||
for i in range(1, len(trade_data)):
|
||
prev = trade_data[i-1]
|
||
curr = trade_data[i]
|
||
|
||
prev_high = float(prev['high_price'])
|
||
prev_low = float(prev['low_price'])
|
||
curr_high = float(curr['high_price'])
|
||
curr_low = float(curr['low_price'])
|
||
|
||
# 上涨缺口
|
||
if curr_low > prev_high:
|
||
gap_size = (curr_low - prev_high) / prev_high * 100
|
||
is_filled = current_price <= curr_low
|
||
gaps.append({
|
||
"type": "上涨缺口",
|
||
"date": curr['TRADEDATE'],
|
||
"gap_low": prev_high,
|
||
"gap_high": curr_low,
|
||
"gap_size_pct": round(gap_size, 2),
|
||
"is_filled": is_filled
|
||
})
|
||
|
||
# 下跌缺口
|
||
if curr_high < prev_low:
|
||
gap_size = (prev_low - curr_high) / prev_low * 100
|
||
is_filled = current_price >= curr_high
|
||
gaps.append({
|
||
"type": "下跌缺口",
|
||
"date": curr['TRADEDATE'],
|
||
"gap_low": curr_high,
|
||
"gap_high": prev_low,
|
||
"gap_size_pct": round(gap_size, 2),
|
||
"is_filled": is_filled
|
||
})
|
||
|
||
# 只保留未回补的缺口
|
||
unfilled_gaps = [g for g in gaps if not g['is_filled']]
|
||
|
||
if unfilled_gaps:
|
||
gap_desc = [f"{g['type']}({g['date']},{g['gap_size_pct']}%)" for g in unfilled_gaps[:3]]
|
||
desc = f"发现{len(unfilled_gaps)}个未回补缺口:" + "、".join(gap_desc)
|
||
else:
|
||
desc = "近期无未回补缺口"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"total_gaps": len(gaps),
|
||
"unfilled_gaps": unfilled_gaps,
|
||
"unfilled_count": len(unfilled_gaps),
|
||
"description": desc,
|
||
"code": code
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[PriceGaps] 分析失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def get_comprehensive_analysis(code: str) -> Dict[str, Any]:
|
||
"""
|
||
综合技术分析(一次性返回多个指标)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
|
||
Returns:
|
||
综合分析结果
|
||
"""
|
||
try:
|
||
# 并行执行多个分析
|
||
results = await asyncio.gather(
|
||
get_macd_signal(code),
|
||
check_oscillator_status(code),
|
||
analyze_bollinger_bands(code),
|
||
analyze_market_heat(code),
|
||
check_new_high_breakout(code),
|
||
check_volume_price_divergence(code),
|
||
identify_candlestick_pattern(code),
|
||
return_exceptions=True
|
||
)
|
||
|
||
analysis = {}
|
||
indicators = [
|
||
("macd", "MACD趋势"),
|
||
("oscillator", "超买超卖"),
|
||
("bollinger", "布林带"),
|
||
("market_heat", "市场热度"),
|
||
("breakout", "突破信号"),
|
||
("volume_price", "量价分析"),
|
||
("candlestick", "K线形态")
|
||
]
|
||
|
||
signals = []
|
||
for i, (key, name) in enumerate(indicators):
|
||
if isinstance(results[i], dict) and results[i].get("success"):
|
||
analysis[key] = results[i]["data"]
|
||
signals.append(f"【{name}】{results[i]['data'].get('description', '')}")
|
||
|
||
# 生成综合评估
|
||
bullish_count = 0
|
||
bearish_count = 0
|
||
|
||
for result in results:
|
||
if isinstance(result, dict) and result.get("success"):
|
||
data = result.get("data", {})
|
||
signal = data.get("signal", "")
|
||
|
||
if any(word in signal for word in ["金叉", "底背离", "超卖", "支撑", "突破新高", "红三兵"]):
|
||
bullish_count += 1
|
||
elif any(word in signal for word in ["死叉", "顶背离", "超买", "压力", "跌破", "乌鸦"]):
|
||
bearish_count += 1
|
||
|
||
if bullish_count > bearish_count + 2:
|
||
overall = "多方占优,短期看涨"
|
||
elif bearish_count > bullish_count + 2:
|
||
overall = "空方占优,短期看跌"
|
||
else:
|
||
overall = "多空胶着,观望为主"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"code": code,
|
||
"analysis": analysis,
|
||
"signals_summary": signals,
|
||
"bullish_signals": bullish_count,
|
||
"bearish_signals": bearish_count,
|
||
"overall_assessment": overall
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ComprehensiveAnalysis] 分析失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# ==================== 新增因子(12个) ====================
|
||
|
||
async def calc_rsi_divergence(code: str, days: int = 60, rsi_period: int = 14) -> Dict[str, Any]:
|
||
"""
|
||
RSI背离检测(独立于MACD的RSI专项背离分析)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
rsi_period: RSI周期,默认14
|
||
|
||
Returns:
|
||
RSI背离分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < rsi_period + 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算RSI
|
||
rsi = _calc_rsi(close, rsi_period)
|
||
|
||
# 检测背离
|
||
divergence = _detect_divergence(close, rsi, lookback=20)
|
||
|
||
current_rsi = rsi[-1]
|
||
|
||
# 判断RSI位置
|
||
if current_rsi > 70:
|
||
rsi_zone = "超买区"
|
||
elif current_rsi < 30:
|
||
rsi_zone = "超卖区"
|
||
else:
|
||
rsi_zone = "中性区"
|
||
|
||
# 生成描述
|
||
if divergence == "bullish_divergence":
|
||
signal = "底背离"
|
||
desc = f"RSI底背离形成:股价创新低但RSI没有创新低,当前RSI={current_rsi:.1f},位于{rsi_zone},反弹概率增大"
|
||
elif divergence == "bearish_divergence":
|
||
signal = "顶背离"
|
||
desc = f"RSI顶背离形成:股价创新高但RSI没有创新高,当前RSI={current_rsi:.1f},位于{rsi_zone},回调风险加大"
|
||
else:
|
||
signal = "无背离"
|
||
desc = f"RSI当前值={current_rsi:.1f},位于{rsi_zone},未检测到明显背离"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"signal": signal,
|
||
"rsi_value": round(current_rsi, 2),
|
||
"rsi_zone": rsi_zone,
|
||
"has_divergence": divergence is not None,
|
||
"divergence_type": divergence,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[RSI Divergence] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_bollinger_squeeze(code: str, days: int = 60, period: int = 20) -> Dict[str, Any]:
|
||
"""
|
||
布林带挤压分析(Bollinger Squeeze)
|
||
当布林带收窄到一定程度时,通常预示着变盘
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
period: 布林带周期
|
||
|
||
Returns:
|
||
布林带挤压分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < period + 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算布林带
|
||
upper, middle, lower = _calc_bollinger(close, period)
|
||
|
||
# 计算带宽序列
|
||
bandwidths = (upper - lower) / middle * 100
|
||
|
||
current_bandwidth = bandwidths[-1]
|
||
|
||
# 计算历史带宽百分位
|
||
bandwidth_percentile = (np.sum(bandwidths < current_bandwidth) / len(bandwidths)) * 100
|
||
|
||
# 计算带宽变化趋势
|
||
bandwidth_ma5 = np.mean(bandwidths[-5:])
|
||
bandwidth_ma20 = np.mean(bandwidths[-20:])
|
||
bandwidth_trend = "收窄" if bandwidth_ma5 < bandwidth_ma20 else "扩张"
|
||
|
||
# 判断挤压状态
|
||
if bandwidth_percentile < 20:
|
||
squeeze_status = "强挤压"
|
||
signal = "变盘在即"
|
||
desc = f"布林带处于强挤压状态,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),变盘信号强烈"
|
||
elif bandwidth_percentile < 40:
|
||
squeeze_status = "中度挤压"
|
||
signal = "关注突破"
|
||
desc = f"布林带中度挤压,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),关注突破方向"
|
||
else:
|
||
squeeze_status = "无挤压"
|
||
signal = "正常波动"
|
||
desc = f"布林带正常,带宽{current_bandwidth:.2f}%(历史{bandwidth_percentile:.0f}%分位),趋势{bandwidth_trend}中"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"squeeze_status": squeeze_status,
|
||
"signal": signal,
|
||
"bandwidth": round(current_bandwidth, 2),
|
||
"bandwidth_percentile": round(bandwidth_percentile, 1),
|
||
"bandwidth_trend": bandwidth_trend,
|
||
"upper": round(upper[-1], 2),
|
||
"middle": round(middle[-1], 2),
|
||
"lower": round(lower[-1], 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Bollinger Squeeze] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def analyze_obv_trend(code: str, days: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
OBV能量潮独立分析
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
OBV趋势分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
volume = np.array([float(d['volume']) for d in trade_data])
|
||
|
||
# 计算OBV
|
||
obv = _calc_obv(close, volume)
|
||
|
||
# 计算OBV均线
|
||
obv_ma5 = _calc_sma(obv, 5)
|
||
obv_ma20 = _calc_sma(obv, 20)
|
||
|
||
current_obv = obv[-1]
|
||
current_obv_ma5 = obv_ma5[-1]
|
||
current_obv_ma20 = obv_ma20[-1]
|
||
|
||
# 判断OBV趋势
|
||
if current_obv > current_obv_ma5 > current_obv_ma20:
|
||
obv_trend = "强势上升"
|
||
signal = "资金持续流入"
|
||
elif current_obv > current_obv_ma5:
|
||
obv_trend = "短期上升"
|
||
signal = "资金开始流入"
|
||
elif current_obv < current_obv_ma5 < current_obv_ma20:
|
||
obv_trend = "强势下降"
|
||
signal = "资金持续流出"
|
||
elif current_obv < current_obv_ma5:
|
||
obv_trend = "短期下降"
|
||
signal = "资金开始流出"
|
||
else:
|
||
obv_trend = "震荡"
|
||
signal = "资金观望"
|
||
|
||
# 检测OBV与价格的背离
|
||
price_change_20d = (close[-1] - close[-20]) / close[-20] * 100 if len(close) >= 20 else 0
|
||
obv_change_20d = (obv[-1] - obv[-20]) / abs(obv[-20]) * 100 if len(obv) >= 20 and obv[-20] != 0 else 0
|
||
|
||
obv_divergence = None
|
||
if price_change_20d > 5 and obv_change_20d < -5:
|
||
obv_divergence = "顶背离"
|
||
elif price_change_20d < -5 and obv_change_20d > 5:
|
||
obv_divergence = "底背离"
|
||
|
||
desc = f"OBV趋势:{obv_trend},{signal}。"
|
||
if obv_divergence:
|
||
desc += f"警告:出现OBV{obv_divergence}!"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"obv_trend": obv_trend,
|
||
"signal": signal,
|
||
"current_obv": int(current_obv),
|
||
"obv_ma5": int(current_obv_ma5),
|
||
"obv_ma20": int(current_obv_ma20),
|
||
"obv_divergence": obv_divergence,
|
||
"price_change_20d": round(price_change_20d, 2),
|
||
"obv_change_20d": round(obv_change_20d, 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_date": trade_data[-1]['TRADEDATE']
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[OBV Trend] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_amihud_illiquidity(code: str, days: int = 20) -> Dict[str, Any]:
|
||
"""
|
||
计算Amihud非流动性因子
|
||
Amihud = |收益率| / 成交额
|
||
值越大表示流动性越差(价格对成交额的敏感度越高)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
Amihud非流动性分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 5:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
turnover = np.array([float(d.get('turnover', 0) or d.get('amount', 0) or 0) for d in trade_data])
|
||
|
||
# 过滤掉成交额为0的数据
|
||
valid_mask = turnover > 0
|
||
if np.sum(valid_mask) < 5:
|
||
return {"success": False, "error": "有效成交额数据不足"}
|
||
|
||
# 计算日收益率
|
||
returns = np.zeros(len(close))
|
||
returns[1:] = np.abs(np.diff(close) / close[:-1])
|
||
|
||
# 计算Amihud因子(单位:每百万成交额导致的价格变动百分比)
|
||
amihud_daily = np.where(turnover > 0, returns / (turnover / 1e6), 0)
|
||
|
||
# 平均Amihud值
|
||
avg_amihud = np.mean(amihud_daily[valid_mask]) * 1e6 # 放大以便阅读
|
||
|
||
# 流动性评级
|
||
if avg_amihud < 0.1:
|
||
liquidity_level = "极高流动性"
|
||
signal = "大单冲击成本低"
|
||
elif avg_amihud < 0.5:
|
||
liquidity_level = "高流动性"
|
||
signal = "交易成本可控"
|
||
elif avg_amihud < 2:
|
||
liquidity_level = "中等流动性"
|
||
signal = "注意交易量"
|
||
elif avg_amihud < 10:
|
||
liquidity_level = "低流动性"
|
||
signal = "大单需分批"
|
||
else:
|
||
liquidity_level = "极低流动性"
|
||
signal = "谨慎交易"
|
||
|
||
desc = f"Amihud非流动性指标={avg_amihud:.4f},{liquidity_level},{signal}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"amihud_value": round(avg_amihud, 6),
|
||
"liquidity_level": liquidity_level,
|
||
"signal": signal,
|
||
"avg_daily_turnover": round(np.mean(turnover[valid_mask]) / 1e4, 2), # 万元
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": days
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Amihud] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_parkinson_volatility(code: str, date: str = None) -> Dict[str, Any]:
|
||
"""
|
||
计算帕金森波动率(基于分钟级High/Low数据)
|
||
帕金森波动率只使用最高价和最低价,比传统波动率更准确
|
||
|
||
Args:
|
||
code: 股票代码
|
||
date: 日期(可选,默认使用最近交易日)
|
||
|
||
Returns:
|
||
帕金森波动率结果
|
||
"""
|
||
try:
|
||
# 获取分钟数据
|
||
if date:
|
||
minute_data = await db.get_stock_minute_data(code, start_time=date, end_time=date + " 23:59:59", limit=500)
|
||
else:
|
||
minute_data = await db.get_stock_minute_data(code, limit=500)
|
||
|
||
if len(minute_data) < 30:
|
||
return {"success": False, "error": "分钟数据不足,可能该股票无分钟级数据或非交易时段"}
|
||
|
||
# 从数据中提取实际日期
|
||
if not date and minute_data:
|
||
date = minute_data[0].get('timestamp', '')[:10]
|
||
|
||
high = np.array([float(d['high']) for d in minute_data])
|
||
low = np.array([float(d['low']) for d in minute_data])
|
||
|
||
# 帕金森波动率公式
|
||
# σ = sqrt(1/(4*ln(2)) * mean((ln(H/L))^2))
|
||
log_hl = np.log(high / low)
|
||
parkinson_var = np.mean(log_hl ** 2) / (4 * np.log(2))
|
||
parkinson_vol = np.sqrt(parkinson_var) * 100 # 转为百分比
|
||
|
||
# 年化
|
||
parkinson_vol_annual = parkinson_vol * np.sqrt(252 * 240) # 240分钟/天
|
||
|
||
# 与传统波动率对比(使用收盘价)
|
||
close = np.array([float(d['close']) for d in minute_data])
|
||
returns = np.diff(np.log(close))
|
||
traditional_vol = np.std(returns) * 100
|
||
|
||
# 波动率评级
|
||
if parkinson_vol < 0.5:
|
||
vol_level = "低波动"
|
||
elif parkinson_vol < 1.5:
|
||
vol_level = "中波动"
|
||
else:
|
||
vol_level = "高波动"
|
||
|
||
desc = f"帕金森波动率={parkinson_vol:.3f}%(年化约{parkinson_vol_annual:.1f}%),{vol_level}。传统波动率={traditional_vol:.3f}%"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"parkinson_vol": round(parkinson_vol, 4),
|
||
"parkinson_vol_annual": round(parkinson_vol_annual, 2),
|
||
"traditional_vol": round(traditional_vol, 4),
|
||
"vol_level": vol_level,
|
||
"description": desc,
|
||
"code": code,
|
||
"date": date
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Parkinson Vol] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_trend_slope(code: str, days: int = 20) -> Dict[str, Any]:
|
||
"""
|
||
计算趋势线性回归斜率
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
趋势斜率分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 10:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 线性回归
|
||
x = np.arange(len(close))
|
||
slope, intercept = np.polyfit(x, close, 1)
|
||
|
||
# 计算R²
|
||
y_pred = slope * x + intercept
|
||
ss_res = np.sum((close - y_pred) ** 2)
|
||
ss_tot = np.sum((close - np.mean(close)) ** 2)
|
||
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
|
||
|
||
# 将斜率转化为日均涨幅百分比
|
||
slope_pct = slope / close[0] * 100
|
||
|
||
# 判断趋势
|
||
if slope_pct > 0.5 and r_squared > 0.7:
|
||
trend = "强上升趋势"
|
||
signal = "顺势做多"
|
||
elif slope_pct > 0.2 and r_squared > 0.5:
|
||
trend = "弱上升趋势"
|
||
signal = "关注回调买点"
|
||
elif slope_pct < -0.5 and r_squared > 0.7:
|
||
trend = "强下降趋势"
|
||
signal = "顺势做空或观望"
|
||
elif slope_pct < -0.2 and r_squared > 0.5:
|
||
trend = "弱下降趋势"
|
||
signal = "关注反弹卖点"
|
||
else:
|
||
trend = "震荡无趋势"
|
||
signal = "区间交易"
|
||
|
||
desc = f"近{days}日趋势斜率={slope_pct:.3f}%/日,R²={r_squared:.2f},{trend},建议{signal}"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"slope": round(slope, 4),
|
||
"slope_pct_daily": round(slope_pct, 3),
|
||
"r_squared": round(r_squared, 3),
|
||
"trend": trend,
|
||
"signal": signal,
|
||
"intercept": round(intercept, 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": days
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Trend Slope] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_hurst_exponent(code: str, days: int = 100) -> Dict[str, Any]:
|
||
"""
|
||
计算Hurst指数(用于判断市场是趋势还是均值回归)
|
||
H > 0.5: 趋势市场(序列具有长期记忆)
|
||
H = 0.5: 随机游走
|
||
H < 0.5: 均值回归市场
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数(建议100天以上)
|
||
|
||
Returns:
|
||
Hurst指数分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 50:
|
||
return {"success": False, "error": "数据不足,Hurst指数需要至少50个数据点"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# R/S分析法计算Hurst指数
|
||
n = len(close)
|
||
max_k = min(n // 2, 50) # 最大分组数
|
||
|
||
rs_list = []
|
||
n_list = []
|
||
|
||
for k in range(10, max_k + 1):
|
||
# 将序列分成多个子序列
|
||
rs_values = []
|
||
for start in range(0, n - k + 1, k):
|
||
segment = close[start:start + k]
|
||
|
||
# 计算累积离差
|
||
mean_seg = np.mean(segment)
|
||
cumdev = np.cumsum(segment - mean_seg)
|
||
|
||
# R = max - min
|
||
r = np.max(cumdev) - np.min(cumdev)
|
||
|
||
# S = 标准差
|
||
s = np.std(segment)
|
||
|
||
if s > 0:
|
||
rs_values.append(r / s)
|
||
|
||
if rs_values:
|
||
rs_list.append(np.mean(rs_values))
|
||
n_list.append(k)
|
||
|
||
if len(rs_list) < 3:
|
||
return {"success": False, "error": "无法计算Hurst指数"}
|
||
|
||
# 线性回归计算H
|
||
log_n = np.log(n_list)
|
||
log_rs = np.log(rs_list)
|
||
hurst, _ = np.polyfit(log_n, log_rs, 1)
|
||
|
||
# 解读Hurst指数
|
||
if hurst > 0.6:
|
||
market_type = "趋势市场"
|
||
signal = "趋势策略有效"
|
||
desc = f"Hurst指数={hurst:.3f}>0.6,市场呈现明显趋势特征,适合趋势跟踪策略"
|
||
elif hurst > 0.5:
|
||
market_type = "弱趋势市场"
|
||
signal = "混合策略"
|
||
desc = f"Hurst指数={hurst:.3f},接近随机游走,趋势与均值回归策略均可尝试"
|
||
elif hurst > 0.4:
|
||
market_type = "弱均值回归"
|
||
signal = "关注反转"
|
||
desc = f"Hurst指数={hurst:.3f},市场有轻微均值回归倾向"
|
||
else:
|
||
market_type = "强均值回归"
|
||
signal = "反转策略有效"
|
||
desc = f"Hurst指数={hurst:.3f}<0.4,市场呈现强均值回归特征,适合反转策略"
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"hurst_exponent": round(hurst, 3),
|
||
"market_type": market_type,
|
||
"signal": signal,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": len(trade_data)
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Hurst] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def test_cointegration(code1: str, code2: str, days: int = 250) -> Dict[str, Any]:
|
||
"""
|
||
协整性测试(用于配对交易)
|
||
|
||
Args:
|
||
code1: 股票代码1
|
||
code2: 股票代码2
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
协整性测试结果
|
||
"""
|
||
try:
|
||
# 获取两只股票的数据
|
||
data1 = await db.get_stock_trade_data(code1, limit=days)
|
||
data2 = await db.get_stock_trade_data(code2, limit=days)
|
||
|
||
if len(data1) < 60 or len(data2) < 60:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
# 按日期排序
|
||
data1 = sorted(data1, key=lambda x: x['TRADEDATE'])
|
||
data2 = sorted(data2, key=lambda x: x['TRADEDATE'])
|
||
|
||
# 对齐日期
|
||
dates1 = {d['TRADEDATE']: float(d['close_price']) for d in data1}
|
||
dates2 = {d['TRADEDATE']: float(d['close_price']) for d in data2}
|
||
|
||
common_dates = sorted(set(dates1.keys()) & set(dates2.keys()))
|
||
|
||
if len(common_dates) < 60:
|
||
return {"success": False, "error": "共同交易日不足"}
|
||
|
||
price1 = np.array([dates1[d] for d in common_dates])
|
||
price2 = np.array([dates2[d] for d in common_dates])
|
||
|
||
# 计算对数价格
|
||
log_price1 = np.log(price1)
|
||
log_price2 = np.log(price2)
|
||
|
||
# 线性回归找对冲比率
|
||
hedge_ratio, intercept = np.polyfit(log_price2, log_price1, 1)
|
||
|
||
# 计算价差(spread)
|
||
spread = log_price1 - hedge_ratio * log_price2
|
||
|
||
# ADF检验的简化版本(计算价差的平稳性指标)
|
||
spread_diff = np.diff(spread)
|
||
spread_lag = spread[:-1]
|
||
|
||
# 计算回归系数
|
||
if np.var(spread_lag) > 0:
|
||
rho = np.cov(spread_diff, spread_lag)[0, 1] / np.var(spread_lag)
|
||
else:
|
||
rho = 0
|
||
|
||
# rho接近-1表示强平稳性
|
||
stationarity_score = -rho
|
||
|
||
# 计算价差的均值回归半衰期
|
||
if rho < 0:
|
||
half_life = -np.log(2) / np.log(1 + rho)
|
||
else:
|
||
half_life = float('inf')
|
||
|
||
# 当前价差Z-score
|
||
spread_mean = np.mean(spread)
|
||
spread_std = np.std(spread)
|
||
current_zscore = (spread[-1] - spread_mean) / spread_std if spread_std > 0 else 0
|
||
|
||
# 判断协整性
|
||
if stationarity_score > 0.3 and half_life < 30:
|
||
cointegration_level = "强协整"
|
||
signal = "适合配对交易"
|
||
elif stationarity_score > 0.1 and half_life < 60:
|
||
cointegration_level = "中等协整"
|
||
signal = "可尝试配对"
|
||
else:
|
||
cointegration_level = "弱协整或无协整"
|
||
signal = "不建议配对"
|
||
|
||
# 交易建议
|
||
if abs(current_zscore) > 2:
|
||
if current_zscore > 0:
|
||
trade_signal = f"做空价差(卖{code1}买{code2})"
|
||
else:
|
||
trade_signal = f"做多价差(买{code1}卖{code2})"
|
||
elif abs(current_zscore) > 1:
|
||
trade_signal = "关注价差回归机会"
|
||
else:
|
||
trade_signal = "价差在正常范围"
|
||
|
||
desc = (f"{code1}与{code2}{cointegration_level},对冲比率={hedge_ratio:.3f},"
|
||
f"半衰期={half_life:.1f}天,当前价差Z-score={current_zscore:.2f},{signal}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"code1": code1,
|
||
"code2": code2,
|
||
"cointegration_level": cointegration_level,
|
||
"hedge_ratio": round(hedge_ratio, 4),
|
||
"half_life_days": round(half_life, 1) if half_life < 1000 else "N/A",
|
||
"current_spread_zscore": round(current_zscore, 3),
|
||
"stationarity_score": round(stationarity_score, 3),
|
||
"trade_signal": trade_signal,
|
||
"signal": signal,
|
||
"description": desc,
|
||
"common_days": len(common_dates)
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Cointegration] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_kelly_position(win_rate: float, win_loss_ratio: float, max_position: float = 0.25) -> Dict[str, Any]:
|
||
"""
|
||
凯利公式计算最优仓位
|
||
Kelly% = W - (1-W)/R
|
||
其中 W=胜率,R=盈亏比
|
||
|
||
Args:
|
||
win_rate: 胜率(0-1之间)
|
||
win_loss_ratio: 盈亏比(平均盈利/平均亏损)
|
||
max_position: 最大允许仓位(默认25%)
|
||
|
||
Returns:
|
||
凯利仓位计算结果
|
||
"""
|
||
try:
|
||
if not 0 < win_rate < 1:
|
||
return {"success": False, "error": "胜率必须在0-1之间"}
|
||
|
||
if win_loss_ratio <= 0:
|
||
return {"success": False, "error": "盈亏比必须大于0"}
|
||
|
||
# 计算凯利比例
|
||
kelly_pct = win_rate - (1 - win_rate) / win_loss_ratio
|
||
|
||
# 半凯利(更保守)
|
||
half_kelly = kelly_pct / 2
|
||
|
||
# 限制最大仓位
|
||
recommended_position = min(max(kelly_pct, 0), max_position)
|
||
conservative_position = min(max(half_kelly, 0), max_position)
|
||
|
||
# 判断策略质量
|
||
if kelly_pct >= 0.2:
|
||
strategy_quality = "优秀"
|
||
signal = "可以较大仓位"
|
||
elif kelly_pct >= 0.1:
|
||
strategy_quality = "良好"
|
||
signal = "适中仓位"
|
||
elif kelly_pct > 0:
|
||
strategy_quality = "一般"
|
||
signal = "小仓位试探"
|
||
else:
|
||
strategy_quality = "不可行"
|
||
signal = "不建议交易"
|
||
|
||
desc = (f"胜率{win_rate*100:.1f}%,盈亏比{win_loss_ratio:.2f},"
|
||
f"凯利比例={kelly_pct*100:.1f}%,策略{strategy_quality}。"
|
||
f"建议仓位:激进{recommended_position*100:.1f}%,保守{conservative_position*100:.1f}%")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"kelly_pct": round(kelly_pct * 100, 2),
|
||
"half_kelly_pct": round(half_kelly * 100, 2),
|
||
"recommended_position": round(recommended_position * 100, 2),
|
||
"conservative_position": round(conservative_position * 100, 2),
|
||
"strategy_quality": strategy_quality,
|
||
"signal": signal,
|
||
"win_rate": round(win_rate * 100, 1),
|
||
"win_loss_ratio": round(win_loss_ratio, 2),
|
||
"description": desc
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Kelly] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def search_similar_kline(code: str, lookback: int = 10, top_n: int = 5) -> Dict[str, Any]:
|
||
"""
|
||
相似K线检索(基于形态相似度)
|
||
|
||
Args:
|
||
code: 股票代码
|
||
lookback: 匹配窗口大小(默认10天)
|
||
top_n: 返回最相似的N个历史片段
|
||
|
||
Returns:
|
||
相似K线检索结果
|
||
"""
|
||
try:
|
||
# 获取足够的历史数据
|
||
trade_data = await db.get_stock_trade_data(code, limit=500)
|
||
|
||
if len(trade_data) < lookback * 3:
|
||
return {"success": False, "error": "历史数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
dates = [d['TRADEDATE'] for d in trade_data]
|
||
|
||
# 当前形态(标准化)
|
||
current_pattern = close[-lookback:]
|
||
current_norm = (current_pattern - np.mean(current_pattern)) / np.std(current_pattern)
|
||
|
||
# 在历史中搜索相似形态
|
||
similar_patterns = []
|
||
|
||
for i in range(len(close) - lookback * 2): # 不包括最近的数据
|
||
historical_pattern = close[i:i + lookback]
|
||
|
||
# 标准化
|
||
if np.std(historical_pattern) > 0:
|
||
hist_norm = (historical_pattern - np.mean(historical_pattern)) / np.std(historical_pattern)
|
||
|
||
# 计算相似度(使用相关系数)
|
||
correlation = np.corrcoef(current_norm, hist_norm)[0, 1]
|
||
|
||
# 计算后续N天的收益
|
||
future_return = 0
|
||
if i + lookback + 5 < len(close):
|
||
future_return = (close[i + lookback + 5] - close[i + lookback - 1]) / close[i + lookback - 1] * 100
|
||
|
||
if correlation > 0.8: # 只保留高相似度的
|
||
similar_patterns.append({
|
||
"start_date": dates[i],
|
||
"end_date": dates[i + lookback - 1],
|
||
"correlation": correlation,
|
||
"future_return_5d": future_return
|
||
})
|
||
|
||
# 按相似度排序,取前N个
|
||
similar_patterns = sorted(similar_patterns, key=lambda x: x['correlation'], reverse=True)[:top_n]
|
||
|
||
if not similar_patterns:
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"message": "未找到高度相似的历史形态",
|
||
"code": code,
|
||
"lookback": lookback
|
||
}
|
||
}
|
||
|
||
# 统计历史相似形态后的走势
|
||
future_returns = [p['future_return_5d'] for p in similar_patterns if p['future_return_5d'] != 0]
|
||
|
||
if future_returns:
|
||
avg_future_return = np.mean(future_returns)
|
||
up_probability = sum(1 for r in future_returns if r > 0) / len(future_returns) * 100
|
||
else:
|
||
avg_future_return = 0
|
||
up_probability = 50
|
||
|
||
# 预测信号
|
||
if avg_future_return > 2 and up_probability > 60:
|
||
prediction = "历史形态后多数上涨"
|
||
elif avg_future_return < -2 and up_probability < 40:
|
||
prediction = "历史形态后多数下跌"
|
||
else:
|
||
prediction = "历史走势分化,谨慎参考"
|
||
|
||
desc = (f"找到{len(similar_patterns)}个相似历史形态,"
|
||
f"历史5日平均收益{avg_future_return:.1f}%,上涨概率{up_probability:.0f}%,{prediction}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"similar_patterns": similar_patterns,
|
||
"pattern_count": len(similar_patterns),
|
||
"avg_future_return_5d": round(avg_future_return, 2),
|
||
"up_probability": round(up_probability, 1),
|
||
"prediction": prediction,
|
||
"description": desc,
|
||
"code": code,
|
||
"lookback": lookback
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Similar KLine] 检索失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def decompose_trend_simple(code: str, days: int = 120) -> Dict[str, Any]:
|
||
"""
|
||
简化版趋势分解(不依赖Prophet)
|
||
将价格序列分解为:趋势 + 周期 + 残差
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
趋势分解结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 60:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
dates = [d['TRADEDATE'] for d in trade_data]
|
||
|
||
# 1. 提取趋势(使用长周期移动平均)
|
||
trend = _calc_sma(close, min(60, len(close) // 2))
|
||
|
||
# 2. 去趋势
|
||
detrended = close - trend
|
||
|
||
# 3. 提取周期成分(使用FFT的简化方法)
|
||
# 计算周期性强度
|
||
fft_result = np.fft.fft(detrended)
|
||
fft_freq = np.fft.fftfreq(len(detrended))
|
||
|
||
# 找到主要周期
|
||
positive_freq_idx = fft_freq > 0
|
||
amplitudes = np.abs(fft_result[positive_freq_idx])
|
||
frequencies = fft_freq[positive_freq_idx]
|
||
|
||
# 找最强的周期
|
||
if len(amplitudes) > 0:
|
||
dominant_idx = np.argmax(amplitudes)
|
||
dominant_period = 1 / frequencies[dominant_idx] if frequencies[dominant_idx] > 0 else 0
|
||
periodicity_strength = amplitudes[dominant_idx] / np.sum(amplitudes) * 100
|
||
else:
|
||
dominant_period = 0
|
||
periodicity_strength = 0
|
||
|
||
# 4. 残差 = 原始 - 趋势 - 周期
|
||
residual = detrended # 简化处理,这里残差就是去趋势后的数据
|
||
|
||
# 分析当前趋势方向
|
||
trend_direction = "上升" if trend[-1] > trend[-20] else "下降"
|
||
trend_strength = abs(trend[-1] - trend[-20]) / trend[-20] * 100
|
||
|
||
# 当前位置相对于趋势
|
||
current_deviation = (close[-1] - trend[-1]) / trend[-1] * 100
|
||
|
||
if current_deviation > 3:
|
||
position = "高于趋势线"
|
||
signal = "短期或有回调"
|
||
elif current_deviation < -3:
|
||
position = "低于趋势线"
|
||
signal = "短期或有反弹"
|
||
else:
|
||
position = "贴近趋势线"
|
||
signal = "跟随趋势"
|
||
|
||
desc = (f"趋势{trend_direction}(强度{trend_strength:.1f}%),"
|
||
f"主周期约{dominant_period:.0f}天,周期性强度{periodicity_strength:.1f}%,"
|
||
f"当前{position}(偏离{current_deviation:+.1f}%),{signal}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"trend_direction": trend_direction,
|
||
"trend_strength_pct": round(trend_strength, 2),
|
||
"dominant_period_days": round(dominant_period, 0),
|
||
"periodicity_strength": round(periodicity_strength, 1),
|
||
"current_deviation_pct": round(current_deviation, 2),
|
||
"position": position,
|
||
"signal": signal,
|
||
"current_price": round(close[-1], 2),
|
||
"current_trend": round(trend[-1], 2),
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": len(trade_data)
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Trend Decompose] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
async def calc_price_entropy(code: str, days: int = 60) -> Dict[str, Any]:
|
||
"""
|
||
计算价格信息熵(衡量市场混乱程度)
|
||
熵值越高表示市场越混乱/随机,越低表示趋势越明显
|
||
|
||
Args:
|
||
code: 股票代码
|
||
days: 分析天数
|
||
|
||
Returns:
|
||
价格熵值分析结果
|
||
"""
|
||
try:
|
||
trade_data = await db.get_stock_trade_data(code, limit=days)
|
||
|
||
if len(trade_data) < 20:
|
||
return {"success": False, "error": "数据不足"}
|
||
|
||
trade_data = sorted(trade_data, key=lambda x: x['TRADEDATE'])
|
||
close = np.array([float(d['close_price']) for d in trade_data])
|
||
|
||
# 计算日收益率
|
||
returns = np.diff(close) / close[:-1]
|
||
|
||
# 将收益率离散化为bins
|
||
n_bins = 10
|
||
hist, bin_edges = np.histogram(returns, bins=n_bins)
|
||
|
||
# 计算概率分布
|
||
prob = hist / np.sum(hist)
|
||
prob = prob[prob > 0] # 去掉0概率
|
||
|
||
# 计算香农熵
|
||
entropy = -np.sum(prob * np.log2(prob))
|
||
|
||
# 最大熵(均匀分布)
|
||
max_entropy = np.log2(n_bins)
|
||
|
||
# 标准化熵值(0-1之间)
|
||
normalized_entropy = entropy / max_entropy
|
||
|
||
# 近期熵值变化
|
||
recent_returns = returns[-10:]
|
||
recent_hist, _ = np.histogram(recent_returns, bins=5)
|
||
recent_prob = recent_hist / np.sum(recent_hist)
|
||
recent_prob = recent_prob[recent_prob > 0]
|
||
recent_entropy = -np.sum(recent_prob * np.log2(recent_prob))
|
||
recent_max_entropy = np.log2(5)
|
||
recent_normalized = recent_entropy / recent_max_entropy
|
||
|
||
# 判断市场状态
|
||
if normalized_entropy > 0.85:
|
||
market_state = "高度混乱"
|
||
signal = "随机性强,难以预测"
|
||
elif normalized_entropy > 0.7:
|
||
market_state = "中度混乱"
|
||
signal = "市场分歧大"
|
||
elif normalized_entropy > 0.5:
|
||
market_state = "适度有序"
|
||
signal = "有一定规律可循"
|
||
else:
|
||
market_state = "高度有序"
|
||
signal = "趋势明显,易于预测"
|
||
|
||
# 熵值变化趋势
|
||
entropy_trend = "增加" if recent_normalized > normalized_entropy else "减少"
|
||
|
||
desc = (f"价格熵值={entropy:.2f}(标准化{normalized_entropy:.2f}),"
|
||
f"{market_state},{signal}。近期熵值{entropy_trend}")
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"entropy": round(entropy, 3),
|
||
"normalized_entropy": round(normalized_entropy, 3),
|
||
"recent_entropy": round(recent_normalized, 3),
|
||
"entropy_trend": entropy_trend,
|
||
"market_state": market_state,
|
||
"signal": signal,
|
||
"description": desc,
|
||
"code": code,
|
||
"analysis_days": days
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Entropy] 计算失败: {e}", exc_info=True)
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# ==================== 工具注册(供MCP使用) ====================
|
||
|
||
# 包装函数:将 args 字典解构为函数参数
|
||
# MCP 调用时传入的是 args: Dict[str, Any],需要解构后调用实际函数
|
||
|
||
async def _wrap_get_macd_signal(args: Dict[str, Any]):
|
||
return await get_macd_signal(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_check_oscillator_status(args: Dict[str, Any]):
|
||
return await check_oscillator_status(args.get("code"), args.get("days", 30))
|
||
|
||
async def _wrap_analyze_bollinger_bands(args: Dict[str, Any]):
|
||
return await analyze_bollinger_bands(args.get("code"), args.get("days", 60), args.get("period", 20))
|
||
|
||
async def _wrap_calc_stop_loss_atr(args: Dict[str, Any]):
|
||
return await calc_stop_loss_atr(args.get("code"), args.get("days", 30), args.get("multiplier", 2.0))
|
||
|
||
async def _wrap_analyze_market_heat(args: Dict[str, Any]):
|
||
return await analyze_market_heat(args.get("code"), args.get("days", 20))
|
||
|
||
async def _wrap_check_volume_price_divergence(args: Dict[str, Any]):
|
||
return await check_volume_price_divergence(args.get("code"), args.get("days", 30))
|
||
|
||
async def _wrap_check_new_high_breakout(args: Dict[str, Any]):
|
||
return await check_new_high_breakout(args.get("code"), args.get("days", 250))
|
||
|
||
async def _wrap_identify_candlestick_pattern(args: Dict[str, Any]):
|
||
return await identify_candlestick_pattern(args.get("code"), args.get("days", 10))
|
||
|
||
async def _wrap_find_price_gaps(args: Dict[str, Any]):
|
||
return await find_price_gaps(args.get("code"), args.get("days", 60), args.get("min_gap_pct", 2.0))
|
||
|
||
async def _wrap_calc_max_drawdown(args: Dict[str, Any]):
|
||
return await calc_max_drawdown(args.get("code"), args.get("days", 250))
|
||
|
||
async def _wrap_check_valuation_rank(args: Dict[str, Any]):
|
||
return await check_valuation_rank(args.get("code"), args.get("days", 250))
|
||
|
||
async def _wrap_calc_price_zscore(args: Dict[str, Any]):
|
||
return await calc_price_zscore(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_calc_market_profile_vpoc(args: Dict[str, Any]):
|
||
return await calc_market_profile_vpoc(args.get("code"), args.get("date"))
|
||
|
||
async def _wrap_calc_realized_volatility(args: Dict[str, Any]):
|
||
return await calc_realized_volatility(args.get("code"), args.get("date"))
|
||
|
||
async def _wrap_analyze_buying_pressure(args: Dict[str, Any]):
|
||
return await analyze_buying_pressure(args.get("code"), args.get("date"))
|
||
|
||
async def _wrap_get_comprehensive_analysis(args: Dict[str, Any]):
|
||
return await get_comprehensive_analysis(args.get("code"))
|
||
|
||
async def _wrap_calc_rsi_divergence(args: Dict[str, Any]):
|
||
return await calc_rsi_divergence(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_calc_bollinger_squeeze(args: Dict[str, Any]):
|
||
return await calc_bollinger_squeeze(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_analyze_obv_trend(args: Dict[str, Any]):
|
||
return await analyze_obv_trend(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_calc_amihud_illiquidity(args: Dict[str, Any]):
|
||
return await calc_amihud_illiquidity(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_calc_parkinson_volatility(args: Dict[str, Any]):
|
||
return await calc_parkinson_volatility(args.get("code"), args.get("date"))
|
||
|
||
async def _wrap_calc_trend_slope(args: Dict[str, Any]):
|
||
return await calc_trend_slope(args.get("code"), args.get("days", 60))
|
||
|
||
async def _wrap_calc_hurst_exponent(args: Dict[str, Any]):
|
||
return await calc_hurst_exponent(args.get("code"), args.get("days", 100))
|
||
|
||
async def _wrap_test_cointegration(args: Dict[str, Any]):
|
||
return await test_cointegration(args.get("code1"), args.get("code2"), args.get("days", 120))
|
||
|
||
async def _wrap_calc_kelly_position(args: Dict[str, Any]):
|
||
return await calc_kelly_position(args.get("win_rate"), args.get("win_loss_ratio"), args.get("max_position", 0.25))
|
||
|
||
async def _wrap_search_similar_kline(args: Dict[str, Any]):
|
||
return await search_similar_kline(args.get("code"), args.get("lookback", 5), args.get("search_range", 250))
|
||
|
||
async def _wrap_decompose_trend_simple(args: Dict[str, Any]):
|
||
return await decompose_trend_simple(args.get("code"), args.get("days", 120))
|
||
|
||
async def _wrap_calc_price_entropy(args: Dict[str, Any]):
|
||
return await calc_price_entropy(args.get("code"), args.get("days", 60), args.get("bins", 10))
|
||
|
||
|
||
QUANT_TOOLS = {
|
||
# 经典技术指标
|
||
"get_macd_signal": _wrap_get_macd_signal,
|
||
"check_oscillator_status": _wrap_check_oscillator_status,
|
||
"analyze_bollinger_bands": _wrap_analyze_bollinger_bands,
|
||
"calc_stop_loss_atr": _wrap_calc_stop_loss_atr,
|
||
|
||
# 资金与情绪
|
||
"analyze_market_heat": _wrap_analyze_market_heat,
|
||
"check_volume_price_divergence": _wrap_check_volume_price_divergence,
|
||
|
||
# 形态与突破
|
||
"check_new_high_breakout": _wrap_check_new_high_breakout,
|
||
"identify_candlestick_pattern": _wrap_identify_candlestick_pattern,
|
||
"find_price_gaps": _wrap_find_price_gaps,
|
||
|
||
# 风险与估值
|
||
"calc_max_drawdown": _wrap_calc_max_drawdown,
|
||
"check_valuation_rank": _wrap_check_valuation_rank,
|
||
"calc_price_zscore": _wrap_calc_price_zscore,
|
||
|
||
# 分钟级高阶算子
|
||
"calc_market_profile_vpoc": _wrap_calc_market_profile_vpoc,
|
||
"calc_realized_volatility": _wrap_calc_realized_volatility,
|
||
"analyze_buying_pressure": _wrap_analyze_buying_pressure,
|
||
|
||
# 综合分析
|
||
"get_comprehensive_analysis": _wrap_get_comprehensive_analysis,
|
||
|
||
# ==================== 新增12个因子 ====================
|
||
|
||
# RSI背离检测
|
||
"calc_rsi_divergence": _wrap_calc_rsi_divergence,
|
||
|
||
# 布林带挤压
|
||
"calc_bollinger_squeeze": _wrap_calc_bollinger_squeeze,
|
||
|
||
# OBV能量潮独立分析
|
||
"analyze_obv_trend": _wrap_analyze_obv_trend,
|
||
|
||
# Amihud非流动性因子
|
||
"calc_amihud_illiquidity": _wrap_calc_amihud_illiquidity,
|
||
|
||
# 帕金森波动率
|
||
"calc_parkinson_volatility": _wrap_calc_parkinson_volatility,
|
||
|
||
# 趋势线性回归斜率
|
||
"calc_trend_slope": _wrap_calc_trend_slope,
|
||
|
||
# Hurst指数
|
||
"calc_hurst_exponent": _wrap_calc_hurst_exponent,
|
||
|
||
# 协整性测试
|
||
"test_cointegration": _wrap_test_cointegration,
|
||
|
||
# 凯利公式仓位
|
||
"calc_kelly_position": _wrap_calc_kelly_position,
|
||
|
||
# 相似K线检索
|
||
"search_similar_kline": _wrap_search_similar_kline,
|
||
|
||
# 趋势分解(简化版,不依赖Prophet)
|
||
"decompose_trend_simple": _wrap_decompose_trend_simple,
|
||
|
||
# 价格熵值计算
|
||
"calc_price_entropy": _wrap_calc_price_entropy,
|
||
}
|