"""
MCP Server for Financial Data Search
基于FastAPI的MCP服务端,整合多个金融数据搜索API
支持LLM调用和Web聊天功能
"""
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Literal, AsyncGenerator
from datetime import datetime, date
import logging
import httpx
import time
from enum import Enum
import mcp_database as db
import mcp_quant as quant # 量化因子计算模块
from openai import OpenAI
import json
import asyncio
import uuid
from mcp_elasticsearch import es_client
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="Financial Data MCP Server",
description="Model Context Protocol server for financial data search and analysis",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== 配置 ====================
class ServiceEndpoints:
"""API服务端点配置"""
NEWS_API = "http://222.128.1.157:21891" # 新闻API
ROADSHOW_API = "http://222.128.1.157:19800" # 路演API
CONCEPT_API = "http://222.128.1.157:16801" # 概念API V2(concept_api_v2.py,端口16801)
STOCK_ANALYSIS_API = "http://222.128.1.157:8811" # 涨停分析+研报API
MAIN_APP_API = "http://127.0.0.1:5001" # 主应用API(自选股、自选事件等)
# HTTP客户端配置
HTTP_CLIENT = httpx.AsyncClient(timeout=60.0)
# ==================== Agent系统配置 ====================
# ==================== 多模型配置 ====================
# 模型配置字典(支持动态切换)
MODEL_CONFIGS = {
"deepseek": {
"api_key": "sk-7363bdb28d7d4bf0aa68eb9449f8f063",
"base_url": "https://api.deepseek.com",
"model": "deepseek-chat", # 默认模型
"max_tokens": 8192, # DeepSeek 限制为 8192
},
"kimi-k2": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "moonshot-v1-8k", # 快速模型
"max_tokens": 8192, # moonshot-v1-8k 限制为 8k
},
"kimi-k2-thinking": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-k2-thinking", # 深度思考模型
"max_tokens": 32768, # Kimi 思考模型支持更大
},
"glm-4.6": {
"api_key": "", # 需要配置智谱AI密钥
"base_url": "https://open.bigmodel.cn/api/paas/v4",
"model": "glm-4",
"max_tokens": 8192,
},
"deepmoney": {
"api_key": "", # 空值
"base_url": "http://111.62.35.50:8000/v1",
"model": "deepmoney",
"max_tokens": 32768, # DeepMoney 本地托管,上下文 65536,输出限制 32768
},
"gemini-3": {
"api_key": "", # 需要配置Google API密钥
"base_url": "https://generativelanguage.googleapis.com/v1",
"model": "gemini-pro",
"max_tokens": 8192,
},
}
# 默认 LLM 配置(使用 deepmoney,本地托管,上下文长)
LLM_CONFIG = MODEL_CONFIGS["deepmoney"]
DEEPMONEY_CONFIG = MODEL_CONFIGS["deepmoney"]
# ==================== MCP协议数据模型 ====================
class ToolParameter(BaseModel):
"""工具参数定义"""
type: str
description: str
enum: Optional[List[str]] = None
default: Optional[Any] = None
class ToolDefinition(BaseModel):
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any] # 支持完整的 JSON Schema 格式
class ToolCallRequest(BaseModel):
"""工具调用请求"""
tool: str
arguments: Dict[str, Any] = {}
class ToolCallResponse(BaseModel):
"""工具调用响应"""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
# ==================== Agent系统数据模型 ====================
class ToolCall(BaseModel):
"""工具调用"""
tool: str
arguments: Dict[str, Any]
reason: str
class ExecutionPlan(BaseModel):
"""执行计划"""
goal: str
steps: List[ToolCall]
reasoning: str
class StepResult(BaseModel):
"""单步执行结果"""
step_index: int
tool: str
arguments: Dict[str, Any]
status: Literal["success", "failed", "skipped"]
result: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0
class AgentResponse(BaseModel):
"""Agent响应"""
success: bool
message: str
plan: Optional[ExecutionPlan] = None
step_results: List[StepResult] = []
final_summary: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class ConversationMessage(BaseModel):
"""对话历史消息"""
isUser: bool
content: str
class AgentChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_history: List[ConversationMessage] = []
user_id: Optional[str] = None # 用户ID
user_nickname: Optional[str] = None # 用户昵称
user_avatar: Optional[str] = None # 用户头像URL
subscription_type: Optional[str] = None # 用户订阅类型(free/pro/max)
session_id: Optional[str] = None # 会话ID(如果为空则创建新会话)
model: Optional[str] = "deepmoney" # 选择的模型(deepmoney, deepseek, kimi-k2, kimi-k2-thinking, glm-4.6, gemini-3)
tools: Optional[List[str]] = None # 选择的工具列表(工具名称数组,如果为None则使用全部工具)
# ==================== MCP工具定义 ====================
TOOLS: List[ToolDefinition] = [
ToolDefinition(
name="search_news",
description="搜索全球新闻,支持关键词搜索和日期过滤。适用于查找国际新闻、行业动态等。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,例如:'人工智能'、'新能源汽车'"
},
"source": {
"type": "string",
"description": "新闻来源筛选,可选"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量,默认20",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_china_news",
description="搜索中国新闻,使用KNN语义搜索。支持精确匹配模式,适合查找股票、公司相关新闻。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"exact_match": {
"type": "boolean",
"description": "是否精确匹配(用于股票代码、公司名称等),默认false",
"default": False
},
"source": {
"type": "string",
"description": "新闻来源筛选"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量,默认20",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_medical_news",
description="搜索医疗健康类新闻,包括医药、医疗设备、生物技术等领域。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"source": {
"type": "string",
"description": "新闻来源"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_roadshows",
description="搜索上市公司路演、投资者交流活动记录。可按公司代码、日期范围搜索。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,可以是公司名称、主题等"
},
"company_code": {
"type": "string",
"description": "公司股票代码,例如:'600519.SH'"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
},
"size": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_concepts",
description="搜索股票概念板块,返回概念详情及相关股票列表。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,例如:'新能源'、'人工智能'、'商业航天'"
},
"trade_date": {
"type": "string",
"description": "交易日期,格式:YYYY-MM-DD,不传则使用今天"
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_concept_details",
description="根据概念ID获取详细信息,包括描述、相关股票、涨跌幅数据等。",
parameters={
"type": "object",
"properties": {
"concept_id": {
"type": "string",
"description": "概念ID"
},
"trade_date": {
"type": "string",
"description": "交易日期,格式:YYYY-MM-DD"
}
},
"required": ["concept_id"]
}
),
ToolDefinition(
name="get_stock_concepts",
description="查询指定股票的所有相关概念板块,包括涨跌幅信息。",
parameters={
"type": "object",
"properties": {
"stock_code": {
"type": "string",
"description": "股票代码或名称"
},
"size": {
"type": "integer",
"description": "返回概念数量",
"default": 50
},
"sort_by": {
"type": "string",
"description": "排序方式",
"enum": ["stock_count", "concept_name", "recent"],
"default": "stock_count"
},
"trade_date": {
"type": "string",
"description": "交易日期,格式:YYYY-MM-DD"
}
},
"required": ["stock_code"]
}
),
ToolDefinition(
name="get_concept_statistics",
description="获取概念板块统计数据,包括涨幅榜、跌幅榜、活跃榜、波动榜、连涨榜。",
parameters={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "统计天数(与start_date/end_date互斥)"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"min_stock_count": {
"type": "integer",
"description": "最少股票数量过滤",
"default": 3
}
},
"required": []
}
),
ToolDefinition(
name="search_limit_up_stocks",
description="搜索涨停股票,支持按日期、关键词、板块等条件搜索。包括混合语义搜索。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词(涨停原因、公司名称等)"
},
"date": {
"type": "string",
"description": "日期,格式:YYYYMMDD"
},
"mode": {
"type": "string",
"description": "搜索模式",
"enum": ["hybrid", "text", "vector"],
"default": "hybrid"
},
"sectors": {
"type": "array",
"items": {"type": "string"},
"description": "板块筛选"
},
"page_size": {
"type": "integer",
"description": "每页结果数",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_daily_stock_analysis",
description="获取指定日期的涨停股票分析,包括板块分析、词云、趋势图表等。",
parameters={
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "日期,格式:YYYYMMDD"
}
},
"required": ["date"]
}
),
ToolDefinition(
name="search_research_reports",
description="搜索研究报告,支持文本和语义混合搜索。可按作者、证券、日期等筛选。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"mode": {
"type": "string",
"description": "搜索模式",
"enum": ["hybrid", "text", "vector"],
"default": "hybrid"
},
"exact_match": {
"type": "string",
"description": "是否精确匹配:0=模糊,1=精确",
"enum": ["0", "1"],
"default": "0"
},
"security_code": {
"type": "string",
"description": "证券代码筛选"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"size": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_stock_code_by_name",
description="根据股票名称查询股票代码,支持模糊匹配。当只知道股票名称不知道代码时使用。",
parameters={
"type": "object",
"properties": {
"stock_name": {
"type": "string",
"description": "股票名称,例如:'贵州茅台'、'舒泰神'、'比亚迪'"
}
},
"required": ["stock_name"]
}
),
ToolDefinition(
name="get_stock_basic_info",
description="获取股票基本信息,包括公司名称、行业、地址、主营业务、高管等基础数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码,例如:600519"
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_financial_index",
description="获取股票财务指标,包括每股收益、净资产收益率、营收增长率等关键财务数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数,默认10",
"default": 10
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_trade_data",
description="获取股票交易数据,包括价格、成交量、涨跌幅、换手率等日线行情数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数,默认30",
"default": 30
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_balance_sheet",
description="获取股票资产负债表,包括资产、负债、所有者权益等财务状况数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数,默认8",
"default": 8
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_cashflow",
description="获取股票现金流量表,包括经营、投资、筹资活动现金流数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期,格式:YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期,格式:YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数,默认8",
"default": 8
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="search_stocks_by_criteria",
description="按条件搜索股票,支持按行业、地区、市值等条件筛选股票列表。",
parameters={
"type": "object",
"properties": {
"industry": {
"type": "string",
"description": "行业名称,支持模糊匹配"
},
"province": {
"type": "string",
"description": "省份名称"
},
"min_market_cap": {
"type": "number",
"description": "最小市值(亿元)"
},
"max_market_cap": {
"type": "number",
"description": "最大市值(亿元)"
},
"limit": {
"type": "integer",
"description": "返回条数,默认50",
"default": 50
}
},
"required": []
}
),
ToolDefinition(
name="get_stock_comparison",
description="股票对比分析,支持多只股票的财务指标或交易数据对比。",
parameters={
"type": "object",
"properties": {
"seccodes": {
"type": "array",
"items": {"type": "string"},
"description": "股票代码列表,至少2个"
},
"metric": {
"type": "string",
"description": "对比指标类型",
"enum": ["financial", "trade"],
"default": "financial"
}
},
"required": ["seccodes"]
}
),
ToolDefinition(
name="get_user_watchlist",
description="获取用户的自选股列表及实时行情数据。返回用户关注的股票及其当前价格、涨跌幅等信息。",
parameters={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户ID(可选,如果不提供则使用当前会话用户)"
}
},
"required": []
}
),
ToolDefinition(
name="get_user_following_events",
description="获取用户关注的事件列表。返回用户关注的热点事件及其基本信息(标题、类型、热度、关注人数等)。",
parameters={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户ID(可选,如果不提供则使用当前会话用户)"
}
},
"required": []
}
),
# ==================== 分钟频数据工具 ====================
ToolDefinition(
name="get_stock_minute_data",
description="获取股票分钟频K线数据。适用于分析日内走势、寻找交易时机、技术分析等场景。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码,例如:'600519' 或 '600519.SH'"
},
"start_time": {
"type": "string",
"description": "开始时间,格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
},
"end_time": {
"type": "string",
"description": "结束时间,格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数,默认240(约一个交易日)",
"default": 240
}
},
"required": ["code"]
}
),
ToolDefinition(
name="get_stock_minute_aggregation",
description="获取股票分钟频数据的聚合K线(5分钟、15分钟、30分钟等周期)。适用于中短期技术分析。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "交易日期,格式:YYYY-MM-DD"
},
"interval": {
"type": "integer",
"description": "聚合间隔(分钟),可选:5、15、30、60",
"default": 5
}
},
"required": ["code", "date"]
}
),
ToolDefinition(
name="get_stock_intraday_statistics",
description="获取股票日内统计数据,包括开高低收、成交量、成交额、日内波动率等汇总指标。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "交易日期,格式:YYYY-MM-DD"
}
},
"required": ["code", "date"]
}
),
# ==================== 量化因子工具 ====================
ToolDefinition(
name="get_macd_signal",
description="获取MACD趋势判定信号,包括金叉/死叉、动能增减、顶底背离等状态。适用于判断股票短期趋势方向。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60天",
"default": 60
}
},
"required": ["code"]
}
),
ToolDefinition(
name="check_oscillator_status",
description="检查KDJ/RSI超买超卖状态,判断股票是否处于超买区(风险积聚)或超卖区(可能反弹)。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60天",
"default": 60
}
},
"required": ["code"]
}
),
ToolDefinition(
name="analyze_bollinger_bands",
description="分析布林带通道,判断股价是在中轨之上(强势)、触及上轨(压力)、触及下轨(支撑)或布林带收窄(变盘在即)。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60天",
"default": 60
},
"period": {
"type": "integer",
"description": "布林带周期,默认20",
"default": 20
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_stop_loss_atr",
description="使用ATR真实波幅计算止损位。告诉用户\"如果买入,止损点应该设在当前价格减去N倍ATR的位置\"。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认30天",
"default": 30
},
"atr_multiplier": {
"type": "number",
"description": "ATR倍数,默认2倍",
"default": 2.0
}
},
"required": ["code"]
}
),
ToolDefinition(
name="analyze_market_heat",
description="分析换手率活跃度和量能,判断股票是冷门股、活跃股还是妖股,以及主力是在吸筹还是出货。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认30天",
"default": 30
}
},
"required": ["code"]
}
),
ToolDefinition(
name="check_new_high_breakout",
description="检查唐奇安通道突破(海龟交易法则),判断是否突破20日/60日新高或新低。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60天",
"default": 60
}
},
"required": ["code"]
}
),
ToolDefinition(
name="identify_candlestick_pattern",
description="识别K线组合形态,如早晨之星(反转信号)、红三兵(上涨信号)、穿头破脚(吞没形态)等经典形态。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认10天",
"default": 10
}
},
"required": ["code"]
}
),
ToolDefinition(
name="find_price_gaps",
description="寻找跳空缺口,筛选出近期有未回补缺口的情况。缺口往往代表主力资金的强势突破意图或恐慌抛售。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认30天",
"default": 30
}
},
"required": ["code"]
}
),
ToolDefinition(
name="check_volume_price_divergence",
description="检测量价背离。股价创新高但成交量萎缩(量价背离),预警信号,提示上涨动力不足。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认20天",
"default": 20
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_max_drawdown",
description="计算最大回撤和夏普比率。用于评估\"买这只票最坏情况会亏多少\"以及风险调整后收益。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认250天(约一年)",
"default": 250
}
},
"required": ["code"]
}
),
ToolDefinition(
name="check_valuation_rank",
description="检查历史PE/PB百分位估值。计算当前PE处于过去N年的什么位置(例如:比过去90%的时间都便宜)。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"years": {
"type": "integer",
"description": "历史年数,默认3年",
"default": 3
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_price_zscore",
description="计算价格Z-Score(乖离率标准化),判断均值回归概率。当Z-Score过大时,统计回调概率。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"period": {
"type": "integer",
"description": "均线周期,默认60日",
"default": 60
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_market_profile_vpoc",
description="计算市场轮廓VPOC(成交量最大的价格档位),基于分钟级数据。VPOC是当日极强的支撑线或阻力线。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "日期,格式:YYYY-MM-DD"
}
},
"required": ["code", "date"]
}
),
ToolDefinition(
name="calc_realized_volatility",
description="计算已实现波动率(RV),基于分钟级数据。比日线波动率更精准,用于判断趋势动能是否耗尽。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "日期,格式:YYYY-MM-DD"
}
},
"required": ["code", "date"]
}
),
ToolDefinition(
name="analyze_buying_pressure",
description="分析买卖压力失衡,基于分钟级数据。捕捉盘中主力资金的\"抢筹\"或\"砸盘\"意图。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "日期,格式:YYYY-MM-DD"
}
},
"required": ["code", "date"]
}
),
ToolDefinition(
name="get_comprehensive_analysis",
description="综合技术分析,一次性返回MACD、KDJ/RSI、布林带、量能、突破、K线形态等多个指标,并给出多空信号总结。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
}
},
"required": ["code"]
}
),
# ==================== 新增量化因子工具(12个) ====================
ToolDefinition(
name="calc_rsi_divergence",
description="RSI背离检测,独立分析RSI指标的顶背离和底背离信号,判断反转概率。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60",
"default": 60
},
"rsi_period": {
"type": "integer",
"description": "RSI周期,默认14",
"default": 14
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_bollinger_squeeze",
description="布林带挤压分析,检测布林带收窄程度,预判变盘时机。当带宽处于历史低位时发出变盘预警。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60",
"default": 60
},
"period": {
"type": "integer",
"description": "布林带周期,默认20",
"default": 20
}
},
"required": ["code"]
}
),
ToolDefinition(
name="analyze_obv_trend",
description="OBV能量潮独立分析,追踪资金流向,检测OBV与价格的背离,判断主力动向。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60",
"default": 60
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_amihud_illiquidity",
description="计算Amihud非流动性因子,衡量股票流动性。值越大表示流动性越差,大单交易冲击成本越高。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认20",
"default": 20
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_parkinson_volatility",
description="计算帕金森波动率(基于分钟级高低价),比传统波动率更准确,适用于日内波动分析。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"date": {
"type": "string",
"description": "日期,格式YYYY-MM-DD"
}
},
"required": ["code", "date"]
}
),
ToolDefinition(
name="calc_trend_slope",
description="计算趋势线性回归斜率,量化趋势强度和方向。返回斜率、R²拟合度和趋势判断。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认20",
"default": 20
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_hurst_exponent",
description="计算Hurst指数,判断市场是趋势型(H>0.5)还是均值回归型(H<0.5),指导策略选择。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,建议100以上",
"default": 100
}
},
"required": ["code"]
}
),
ToolDefinition(
name="test_cointegration",
description="协整性测试,用于配对交易。检测两只股票是否存在长期均衡关系,计算对冲比率和价差。",
parameters={
"type": "object",
"properties": {
"code1": {
"type": "string",
"description": "股票代码1"
},
"code2": {
"type": "string",
"description": "股票代码2"
},
"days": {
"type": "integer",
"description": "分析天数,默认250",
"default": 250
}
},
"required": ["code1", "code2"]
}
),
ToolDefinition(
name="calc_kelly_position",
description="凯利公式计算最优仓位。根据胜率和盈亏比计算理论最优仓位,并提供保守建议。",
parameters={
"type": "object",
"properties": {
"win_rate": {
"type": "number",
"description": "胜率(0-1之间,如0.6表示60%)"
},
"win_loss_ratio": {
"type": "number",
"description": "盈亏比(平均盈利/平均亏损)"
},
"max_position": {
"type": "number",
"description": "最大允许仓位,默认0.25",
"default": 0.25
}
},
"required": ["win_rate", "win_loss_ratio"]
}
),
ToolDefinition(
name="search_similar_kline",
description="相似K线检索,在历史中搜索与当前形态相似的K线组合,统计历史后续走势作为参考。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"lookback": {
"type": "integer",
"description": "匹配窗口大小,默认10天",
"default": 10
},
"top_n": {
"type": "integer",
"description": "返回最相似的N个历史片段,默认5",
"default": 5
}
},
"required": ["code"]
}
),
ToolDefinition(
name="decompose_trend_simple",
description="趋势分解分析,将价格序列分解为趋势+周期+残差,识别主周期和趋势方向。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认120",
"default": 120
}
},
"required": ["code"]
}
),
ToolDefinition(
name="calc_price_entropy",
description="计算价格信息熵,衡量市场混乱程度。熵值越低表示趋势越明显,越高表示随机性越强。",
parameters={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "股票代码"
},
"days": {
"type": "integer",
"description": "分析天数,默认60",
"default": 60
}
},
"required": ["code"]
}
),
]
# ==================== MCP协议端点 ====================
@app.get("/")
async def root():
"""服务根端点"""
return {
"name": "Financial Data MCP Server",
"version": "1.0.0",
"protocol": "MCP",
"description": "Model Context Protocol server for financial data search and analysis"
}
@app.get("/tools")
async def list_tools():
"""列出所有可用工具"""
return {
"tools": [tool.dict() for tool in TOOLS]
}
@app.get("/tools/{tool_name}")
async def get_tool(tool_name: str):
"""获取特定工具的定义"""
tool = next((t for t in TOOLS if t.name == tool_name), None)
if not tool:
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
return tool.dict()
@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
"""调用工具"""
logger.info(f"Tool call: {request.tool} with args: {request.arguments}")
try:
# 路由到对应的工具处理函数
handler = TOOL_HANDLERS.get(request.tool)
if not handler:
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found")
result = await handler(request.arguments)
return ToolCallResponse(
success=True,
data=result,
metadata={
"tool": request.tool,
"timestamp": datetime.now().isoformat()
}
)
except Exception as e:
logger.error(f"Tool call error: {str(e)}", exc_info=True)
return ToolCallResponse(
success=False,
error=str(e),
metadata={
"tool": request.tool,
"timestamp": datetime.now().isoformat()
}
)
# ==================== 工具处理函数 ====================
async def handle_search_news(args: Dict[str, Any]) -> Any:
"""处理新闻搜索"""
params = {
"query": args.get("query"),
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 20)
}
# 移除None值
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_china_news(args: Dict[str, Any]) -> Any:
"""处理中国新闻搜索"""
params = {
"query": args.get("query"),
"exact_match": args.get("exact_match", False),
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 20)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_china_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_medical_news(args: Dict[str, Any]) -> Any:
"""处理医疗新闻搜索"""
params = {
"query": args["query"],
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 10)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_medical_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_roadshows(args: Dict[str, Any]) -> Any:
"""处理路演搜索"""
params = {
"query": args["query"],
"company_code": args.get("company_code"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"size": args.get("size", 10)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.ROADSHOW_API}/search", params=params)
response.raise_for_status()
return response.json()
async def handle_search_concepts(args: Dict[str, Any]) -> Any:
"""处理概念搜索
参数写死:size=12, page=1, sort_by="_score"
trade_date 如果没传则使用今天的日期
"""
from datetime import date
# trade_date 默认今天
trade_date = args.get("trade_date") or date.today().strftime("%Y-%m-%d")
payload = {
"query": args["query"],
"size": 12, # 写死
"page": 1, # 写死
"sort_by": "_score", # 写死,按相关度排序
"trade_date": trade_date,
"search_size": 100,
"use_knn": True
}
logger.info(f"[search_concepts] 请求参数: {payload}")
response = await HTTP_CLIENT.post(f"{ServiceEndpoints.CONCEPT_API}/search", json=payload)
response.raise_for_status()
return response.json()
async def handle_get_concept_details(args: Dict[str, Any]) -> Any:
"""处理概念详情获取"""
concept_id = args["concept_id"]
params = {}
if args.get("trade_date"):
params["trade_date"] = args["trade_date"]
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.CONCEPT_API}/concept/{concept_id}",
params=params
)
response.raise_for_status()
return response.json()
async def handle_get_stock_concepts(args: Dict[str, Any]) -> Any:
"""处理股票概念获取"""
# 兼容不同的参数名: stock_code, seccode, code
stock_code = args.get("stock_code") or args.get("seccode") or args.get("code")
if not stock_code:
raise ValueError("缺少股票代码参数 (stock_code/seccode/code)")
params = {
"size": args.get("size", 50),
"sort_by": args.get("sort_by", "stock_count"),
"include_description": True
}
if args.get("trade_date"):
params["trade_date"] = args["trade_date"]
logger.info(f"[get_stock_concepts] 查询股票 {stock_code} 的概念")
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.CONCEPT_API}/stock/{stock_code}/concepts",
params=params
)
response.raise_for_status()
return response.json()
async def handle_get_concept_statistics(args: Dict[str, Any]) -> Any:
"""处理概念统计获取"""
params = {}
if args.get("days"):
params["days"] = args["days"]
if args.get("start_date"):
params["start_date"] = args["start_date"]
if args.get("end_date"):
params["end_date"] = args["end_date"]
if args.get("min_stock_count"):
params["min_stock_count"] = args["min_stock_count"]
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/statistics", params=params)
response.raise_for_status()
return response.json()
async def handle_search_limit_up_stocks(args: Dict[str, Any]) -> Any:
"""处理涨停股票搜索"""
payload = {
"query": args["query"],
"mode": args.get("mode", "hybrid"),
"page_size": args.get("page_size", 20)
}
if args.get("date"):
payload["date"] = args["date"]
if args.get("sectors"):
payload["sectors"] = args["sectors"]
response = await HTTP_CLIENT.post(
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/stocks/search/hybrid",
json=payload
)
response.raise_for_status()
return response.json()
async def handle_get_daily_stock_analysis(args: Dict[str, Any]) -> Any:
"""处理每日股票分析获取"""
date = args["date"]
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/analysis/daily/{date}"
)
response.raise_for_status()
return response.json()
async def handle_search_research_reports(args: Dict[str, Any]) -> Any:
"""处理研报搜索"""
params = {
"query": args["query"],
"mode": args.get("mode", "hybrid"),
"exact_match": args.get("exact_match", "0"),
"size": args.get("size", 10)
}
if args.get("security_code"):
params["security_code"] = args["security_code"]
if args.get("start_date"):
params["start_date"] = args["start_date"]
if args.get("end_date"):
params["end_date"] = args["end_date"]
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/search", params=params)
response.raise_for_status()
return response.json()
async def handle_get_stock_code_by_name(args: Dict[str, Any]) -> Any:
"""根据股票名称查询股票代码"""
# 兼容不同的参数名: stock_name, name
stock_name = args.get("stock_name") or args.get("name")
if not stock_name:
return {"success": False, "error": "缺少股票名称参数 (stock_name/name)"}
logger.info(f"[get_stock_code_by_name] 查询股票名称: {stock_name}")
result = await db.get_stock_code_by_name(stock_name)
return result
async def handle_get_stock_basic_info(args: Dict[str, Any]) -> Any:
"""处理股票基本信息查询"""
# 兼容不同的参数名: seccode, stock_code, code
seccode = args.get("seccode") or args.get("stock_code") or args.get("code")
if not seccode:
return {"success": False, "error": "缺少股票代码参数 (seccode/stock_code/code)"}
result = await db.get_stock_basic_info(seccode)
if result:
return {"success": True, "data": result}
else:
return {"success": False, "error": f"未找到股票代码 {seccode} 的信息"}
async def handle_get_stock_financial_index(args: Dict[str, Any]) -> Any:
"""处理股票财务指标查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 10)
result = await db.get_stock_financial_index(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_trade_data(args: Dict[str, Any]) -> Any:
"""处理股票交易数据查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 30)
result = await db.get_stock_trade_data(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_balance_sheet(args: Dict[str, Any]) -> Any:
"""处理资产负债表查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 8)
result = await db.get_stock_balance_sheet(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_cashflow(args: Dict[str, Any]) -> Any:
"""处理现金流量表查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 8)
result = await db.get_stock_cashflow(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_search_stocks_by_criteria(args: Dict[str, Any]) -> Any:
"""处理按条件搜索股票"""
industry = args.get("industry")
province = args.get("province")
min_market_cap = args.get("min_market_cap")
max_market_cap = args.get("max_market_cap")
limit = args.get("limit", 50)
result = await db.search_stocks_by_criteria(
industry, province, min_market_cap, max_market_cap, limit
)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_comparison(args: Dict[str, Any]) -> Any:
"""处理股票对比分析"""
seccodes = args["seccodes"]
metric = args.get("metric", "financial")
result = await db.get_stock_comparison(seccodes, metric)
return {
"success": True,
"data": result
}
async def handle_get_user_watchlist(args: Dict[str, Any]) -> Any:
"""获取用户自选股列表及实时行情"""
try:
# 从 agent 实例获取 cookies(如果可用)
cookies = getattr(agent, 'cookies', {})
# 调用主应用的自选股API
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.MAIN_APP_API}/api/account/watchlist/realtime",
headers={
"Content-Type": "application/json"
},
cookies=cookies # 传递用户的 session cookie
)
if response.status_code == 200:
data = response.json()
logger.info(f"[Watchlist] 成功获取 {len(data.get('data', []))} 只自选股")
return data
elif response.status_code == 401:
logger.warning("[Watchlist] 未登录或会话已过期")
return {
"success": False,
"error": "未登录或会话已过期",
"data": []
}
else:
logger.error(f"[Watchlist] 获取失败: {response.status_code}")
return {
"success": False,
"error": f"获取自选股失败: {response.status_code}",
"data": []
}
except Exception as e:
logger.error(f"[Watchlist] 获取用户自选股失败: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"data": []
}
async def handle_get_user_following_events(args: Dict[str, Any]) -> Any:
"""获取用户关注的事件列表"""
try:
# 从 agent 实例获取 cookies(如果可用)
cookies = getattr(agent, 'cookies', {})
# 调用主应用的关注事件API
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.MAIN_APP_API}/api/account/events/following",
headers={
"Content-Type": "application/json"
},
cookies=cookies # 传递用户的 session cookie
)
if response.status_code == 200:
data = response.json()
logger.info(f"[FollowingEvents] 成功获取 {len(data.get('data', []))} 个关注事件")
return data
elif response.status_code == 401:
logger.warning("[FollowingEvents] 未登录或会话已过期")
return {
"success": False,
"error": "未登录或会话已过期",
"data": []
}
else:
logger.error(f"[FollowingEvents] 获取失败: {response.status_code}")
return {
"success": False,
"error": f"获取关注事件失败: {response.status_code}",
"data": []
}
except Exception as e:
logger.error(f"[FollowingEvents] 获取用户关注事件失败: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"data": []
}
# ==================== 分钟频数据处理函数 ====================
async def handle_get_stock_minute_data(args: Dict[str, Any]) -> Any:
"""处理股票分钟频数据查询"""
code = args["code"]
start_time = args.get("start_time")
end_time = args.get("end_time")
limit = args.get("limit", 240)
result = await db.get_stock_minute_data(code, start_time, end_time, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_minute_aggregation(args: Dict[str, Any]) -> Any:
"""处理股票分钟频数据聚合查询"""
code = args["code"]
date = args["date"]
interval = args.get("interval", 5)
result = await db.get_stock_minute_aggregation(code, date, interval)
return {
"success": True,
"data": result,
"count": len(result),
"interval": f"{interval}分钟"
}
async def handle_get_stock_intraday_statistics(args: Dict[str, Any]) -> Any:
"""处理股票日内统计数据查询"""
code = args["code"]
date = args["date"]
result = await db.get_stock_intraday_statistics(code, date)
return result
# 工具处理函数映射
TOOL_HANDLERS = {
"search_news": handle_search_news,
"search_china_news": handle_search_china_news,
"search_medical_news": handle_search_medical_news,
"search_roadshows": handle_search_roadshows,
"search_concepts": handle_search_concepts,
"get_concept_details": handle_get_concept_details,
"get_stock_concepts": handle_get_stock_concepts,
"get_concept_statistics": handle_get_concept_statistics,
"search_limit_up_stocks": handle_search_limit_up_stocks,
"get_daily_stock_analysis": handle_get_daily_stock_analysis,
"search_research_reports": handle_search_research_reports,
"get_stock_code_by_name": handle_get_stock_code_by_name,
"get_stock_basic_info": handle_get_stock_basic_info,
"get_stock_financial_index": handle_get_stock_financial_index,
"get_stock_trade_data": handle_get_stock_trade_data,
"get_stock_balance_sheet": handle_get_stock_balance_sheet,
"get_stock_cashflow": handle_get_stock_cashflow,
"search_stocks_by_criteria": handle_search_stocks_by_criteria,
"get_stock_comparison": handle_get_stock_comparison,
"get_user_watchlist": handle_get_user_watchlist,
"get_user_following_events": handle_get_user_following_events,
# 分钟频数据工具
"get_stock_minute_data": handle_get_stock_minute_data,
"get_stock_minute_aggregation": handle_get_stock_minute_aggregation,
"get_stock_intraday_statistics": handle_get_stock_intraday_statistics,
# 量化因子工具(从 mcp_quant 模块导入)
**quant.QUANT_TOOLS,
}
# ==================== Agent系统实现 ====================
class MCPAgentIntegrated:
"""集成版 MCP Agent - 使用 LLM 进行计划制定和总结"""
def __init__(self):
# 初始化主 LLM 客户端(计划制定 + 总结)
self.llm_client = OpenAI(
api_key=LLM_CONFIG["api_key"],
base_url=LLM_CONFIG["base_url"],
)
self.llm_model = LLM_CONFIG["model"]
self.llm_max_tokens = LLM_CONFIG.get("max_tokens", 8192)
# 保持 DeepMoney 客户端作为备用
self.deepmoney_client = OpenAI(
api_key=DEEPMONEY_CONFIG["api_key"],
base_url=DEEPMONEY_CONFIG["base_url"],
)
self.deepmoney_model = DEEPMONEY_CONFIG["model"]
def get_planning_prompt(self, tools: List[dict]) -> str:
"""获取计划制定的系统提示词"""
tools_desc = "\n\n".join([
f"**{tool['name']}**\n"
f"描述:{tool['description']}\n"
f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
for tool in tools
])
# 获取当前时间信息
from datetime import datetime
now = datetime.now()
current_time_info = f"""## 当前时间
- **日期**: {now.strftime('%Y年%m月%d日')}
- **时间**: {now.strftime('%H:%M:%S')}
- **星期**: {['周一', '周二', '周三', '周四', '周五', '周六', '周日'][now.weekday()]}
- **A股交易时间**: 上午 9:30-11:30,下午 13:00-15:00
- **当前是否交易时段**: {'是' if (now.weekday() < 5 and ((now.hour == 9 and now.minute >= 30) or (10 <= now.hour < 11) or (now.hour == 11 and now.minute <= 30) or (13 <= now.hour < 15))) else '否'}
**时间语义理解**:
- "今天/当天" = {now.strftime('%Y-%m-%d')}
- "最近/近期" = 最近 5-10 个交易日
- "短线" = 5-20 个交易日
- "中线" = 1-3 个月
- "长线" = 6 个月以上
"""
return f"""你是"价小前",北京价值前沿科技公司的AI投研聊天助手。
## 你的人格特征
- **名字**: 价小前
- **身份**: 北京价值前沿科技公司的专业AI投研助手
- **专业领域**: 股票投资研究、市场分析、新闻解读、财务分析
- **性格**: 专业、严谨、友好,擅长用简洁的语言解释复杂的金融概念
- **服务宗旨**: 帮助投资者做出更明智的投资决策,提供数据驱动的研究支持
{current_time_info}
## 可用工具
{tools_desc}
## 特殊工具
- **summarize_news**: 使用 DeepMoney 模型总结新闻数据
- 参数: {{"data": "新闻列表JSON", "focus": "关注点"}}
- 适用场景: 当需要总结新闻、研报等文本数据时
## 重要知识
- 贵州茅台: 600519
- 涨停: 涨幅约10%
- 概念板块: 相同题材股票分类
## 任务
分析用户问题,制定详细的执行计划。返回 JSON:
```json
{{
"goal": "用户目标",
"reasoning": "分析思路",
"steps": [
{{
"tool": "工具名",
"arguments": {{"参数": "值"}},
"reason": "原因"
}}
]
}}
```
## 规划原则
1. **先收集数据,再分析总结**
2. **使用 summarize_news 总结新闻类数据**
3. **根据问题复杂度灵活规划步骤数**:
- 简单问题(如查询单只股票):2-3 步
- 中等复杂度(如对比分析):3-5 步
- 复杂问题(如多维度深度分析):5-8 步
- 避免过度拆分简单任务
4. **每个步骤应有明确目的,避免冗余**
5. **最后通常需要总结步骤**(除非用户只要原始数据)
## 示例
**示例 1: 概念板块查询(1-2 步)**
用户:"商业航天有哪些股票"
```json
{{
"goal": "查询商业航天概念板块的相关股票",
"reasoning": "使用 search_concepts 搜索概念板块,会返回概念详情和相关股票列表",
"steps": [
{{"tool": "search_concepts", "arguments": {{"query": "商业航天", "size": 10}}, "reason": "搜索商业航天概念,获取相关股票"}}
]
}}
```
**示例 2: 新闻查询(2 步)**
用户:"贵州茅台最近有什么新闻"
```json
{{
"goal": "查询并总结贵州茅台最新新闻",
"reasoning": "简单的新闻查询,只需搜索和总结两步",
"steps": [
{{"tool": "search_china_news", "arguments": {{"query": "贵州茅台", "top_k": 10}}, "reason": "搜索新闻"}},
{{"tool": "summarize_news", "arguments": {{"data": "新闻数据", "focus": "重要动态"}}, "reason": "总结要点"}}
]
}}
```
**示例 3: 股票对比分析(4 步)**
用户:"对比分析贵州茅台和五粮液的投资价值"
```json
{{
"goal": "对比分析两只股票的投资价值",
"reasoning": "需要分别获取两只股票的数据,然后对比分析",
"steps": [
{{"tool": "get_stock_basic_info", "arguments": {{"stock_code": "600519"}}, "reason": "获取茅台基本信息"}},
{{"tool": "get_stock_basic_info", "arguments": {{"stock_code": "000858"}}, "reason": "获取五粮液基本信息"}},
{{"tool": "search_china_news", "arguments": {{"query": "茅台 五粮液 对比", "top_k": 5}}, "reason": "搜索对比分析文章"}},
{{"tool": "summarize_news", "arguments": {{"data": "新闻", "focus": "投资价值对比"}}, "reason": "总结对比结论"}}
]
}}
```
**示例 4: 概念板块深度分析(4-5 步)**
用户:"分析人工智能概念板块的投资机会"
```json
{{
"goal": "深度分析人工智能板块的投资机会",
"reasoning": "先搜索概念获取成分股,再搜索涨停数据和新闻,最后总结",
"steps": [
{{"tool": "search_concepts", "arguments": {{"query": "人工智能", "size": 10, "sort_by": "change_pct"}}, "reason": "搜索人工智能概念,获取成分股和涨跌情况"}},
{{"tool": "search_limit_up_stocks", "arguments": {{"query": "人工智能"}}, "reason": "查看AI相关涨停股"}},
{{"tool": "search_china_news", "arguments": {{"query": "人工智能概念股", "top_k": 10}}, "reason": "搜索最新新闻动态"}},
{{"tool": "summarize_news", "arguments": {{"data": "所有数据", "focus": "投资机会和风险"}}, "reason": "综合分析总结"}}
]
}}
```
**重要提示**:
- 简单问题不要硬凑步骤,2-3 步足够
- 复杂问题可以拆分到 6-8 步,但每步必须有实际价值
- 避免重复调用相同工具(除非参数不同)
只返回JSON,不要其他内容。"""
async def create_plan(self, user_query: str, tools: List[dict], chat_history: List[dict] = None) -> ExecutionPlan:
"""阶段1: 使用 LLM 创建执行计划(带思考过程和历史上下文)"""
logger.info(f"[Planning] LLM开始制定计划: {user_query}")
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
]
# 添加会话历史(多轮对话上下文)
if chat_history:
# 限制历史消息数量,避免 context 过长
recent_history = chat_history[-10:] # 最近10条消息
for msg in recent_history:
role = "user" if msg.get("message_type") == "user" else "assistant"
content = msg.get("message", "")
# 截断过长的历史消息
if len(content) > 500:
content = content[:500] + "..."
messages.append({"role": role, "content": content})
logger.info(f"[Planning] 添加了 {len(recent_history)} 条历史消息到上下文")
# 添加当前用户问题
messages.append({"role": "user", "content": user_query})
# 使用配置的 LLM 模型
response = self.llm_client.chat.completions.create(
model=self.llm_model,
messages=messages,
temperature=1.0,
max_tokens=self.llm_max_tokens,
)
choice = response.choices[0]
message = choice.message
# 提取思考过程
reasoning_content = ""
if hasattr(message, "reasoning_content"):
reasoning_content = getattr(message, "reasoning_content")
logger.info(f"[Planning] LLM思考过程: {reasoning_content[:200]}...")
# 提取计划内容
plan_json = message.content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步")
return plan
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
tool_handlers: Dict[str, Any],
) -> Dict[str, Any]:
"""执行单个工具"""
# 详细日志:打印工具名和参数
logger.info(f"[Tool Call] ========== 工具调用开始 ==========")
logger.info(f"[Tool Call] 工具名: {tool_name}")
logger.info(f"[Tool Call] 参数类型: {type(arguments)}")
logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False, indent=2)}")
# 特殊工具:summarize_news(使用 DeepMoney)
if tool_name == "summarize_news":
return await self.summarize_news_with_deepmoney(
data=arguments.get("data", ""),
focus=arguments.get("focus", "关键信息"),
)
# 调用 MCP 工具
handler = tool_handlers.get(tool_name)
if not handler:
logger.error(f"[Tool Call] 工具 '{tool_name}' 未找到!可用工具: {list(tool_handlers.keys())}")
raise ValueError(f"Tool '{tool_name}' not found")
logger.info(f"[Tool Call] 调用 handler: {handler.__name__}")
result = await handler(arguments)
logger.info(f"[Tool Call] 返回结果类型: {type(result)}")
logger.info(f"[Tool Call] ========== 工具调用结束 ==========")
return result
async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str:
"""使用 DeepMoney 模型总结新闻"""
logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}")
messages = [
{
"role": "system",
"content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。"
},
{
"role": "user",
"content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}"
},
]
try:
response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192),
)
summary = response.choices[0].message.content
logger.info(f"[DeepMoney] 总结完成")
return summary
except Exception as e:
logger.error(f"[DeepMoney] 总结失败: {str(e)}")
# 降级:返回简化摘要
return f"新闻总结失败,原始数据:{data[:500]}..."
async def execute_plan(
self,
plan: ExecutionPlan,
tool_handlers: Dict[str, Any],
) -> List[StepResult]:
"""阶段2: 执行计划"""
logger.info(f"[Execution] 开始执行: {len(plan.steps)} 步")
results = []
collected_data = {}
for i, step in enumerate(plan.steps):
logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}")
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
# 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据"
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
# 将收集的数据传递
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
results.append(step_result)
# 收集数据
collected_data[f"step_{i+1}_{step.tool}"] = result
logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s")
except Exception as e:
logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}")
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
results.append(step_result)
# 继续执行其他步骤
continue
logger.info(f"[Execution] 执行完成")
return results
async def generate_final_summary(
self,
user_query: str,
plan: ExecutionPlan,
step_results: List[StepResult],
) -> str:
"""阶段3: 使用 LLM 生成最终总结"""
logger.info("[Summary] LLM生成最终总结")
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
if not successful_results:
return "很抱歉,所有步骤都执行失败,无法生成分析报告。"
def safe_truncate_result(result, max_length=600):
"""安全截取结果,避免截断 JSON 到不完整状态"""
result_str = str(result)
if len(result_str) <= max_length:
return result_str
# 截取到 max_length
truncated = result_str[:max_length]
# 尝试找到最后一个完整的 JSON 边界(}, ] 或 ,)
# 从后往前找一个安全的截断点
safe_endings = ['},', '},\n', '}\n', '],', '],\n', ']\n', '",', '",\n', '"\n']
best_pos = -1
for ending in safe_endings:
pos = truncated.rfind(ending)
if pos > best_pos:
best_pos = pos
if best_pos > max_length // 2: # 只有找到的位置超过一半时才使用
truncated = truncated[:best_pos + 1]
# 如果结果看起来像 JSON,添加省略提示
if truncated.strip().startswith('{') or truncated.strip().startswith('['):
return truncated + "\n...(数据已截断)"
else:
return truncated + "..."
# 构建结果文本(精简版,安全截取)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {safe_truncate_result(r.result)}"
for r in successful_results[:3] # 只取前3个,避免超长
])
messages = [
{
"role": "system",
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
## 数据可视化能力
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
支持的图表类型:
- 折线图(line):适合时间序列数据(如股价走势、财务指标趋势)
- 柱状图(bar):适合对比数据(如不同年份的收入、利润对比)
- 饼图(pie):适合占比数据(如业务结构、资产分布)
### 图表格式(使用 Markdown 代码块)
在报告中插入图表时,使用以下格式:
```echarts
{
"title": {"text": "图表标题"},
"tooltip": {},
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
"yAxis": {"type": "value"},
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
}
```
### 示例
如果有股价数据,可以这样呈现:
**股价走势分析**
近30日股价呈现上涨趋势,最高达到1850元。
```echarts
{
"title": {"text": "近30日股价走势", "left": "center"},
"tooltip": {"trigger": "axis"},
"xAxis": {"type": "category", "data": ["2024-01-01", "2024-01-02", "2024-01-03"]},
"yAxis": {"type": "value", "name": "股价(元)"},
"series": [{"name": "收盘价", "type": "line", "data": [1800, 1820, 1850], "smooth": true}]
}
```
**重要提示**:
- ECharts 配置必须是合法的 JSON 格式
- 只在有明确数值数据时才生成图表
- 不要凭空捏造数据"""
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告(500字以内)。如果结果中包含数值型数据,请使用 ECharts 图表进行可视化展示。"""
},
]
try:
response = self.llm_client.chat.completions.create(
model=self.llm_model,
messages=messages,
temperature=0.7,
max_tokens=self.llm_max_tokens,
)
summary = response.choices[0].message.content
logger.info("[Summary] 总结完成")
return summary
except Exception as e:
logger.error(f"[Summary] 总结失败: {str(e)}")
# 降级:返回最后一步的结果
if successful_results:
last_result = successful_results[-1]
if isinstance(last_result.result, str):
return last_result.result
else:
return json.dumps(last_result.result, ensure_ascii=False, indent=2)
return "总结生成失败"
async def process_query(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
chat_history: List[dict] = None,
) -> AgentResponse:
"""主流程(非流式)"""
logger.info(f"[Agent] 处理查询: {user_query}")
if chat_history:
logger.info(f"[Agent] 带有 {len(chat_history)} 条历史消息")
try:
# 阶段1: LLM 制定计划(带历史上下文)
plan = await self.create_plan(user_query, tools, chat_history)
# 阶段2: 执行工具
step_results = await self.execute_plan(plan, tool_handlers)
# 阶段3: LLM 生成总结
final_summary = await self.generate_final_summary(
user_query, plan, step_results
)
return AgentResponse(
success=True,
message=final_summary,
plan=plan,
step_results=step_results,
final_summary=final_summary,
metadata={
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
"model_used": {
"planning": self.llm_model,
"summarization": self.llm_model,
"news_summary": self.deepmoney_model,
},
},
)
except Exception as e:
logger.error(f"[Agent] 错误: {str(e)}", exc_info=True)
return AgentResponse(
success=False,
message=f"处理失败: {str(e)}",
)
async def generate_session_title(self, user_message: str, assistant_response: str) -> str:
"""生成会话标题(简短概述),使用 DeepMoney 模型"""
try:
messages = [
{
"role": "system",
"content": "你是一个标题生成器。根据用户问题和AI回复,生成一个简短的会话标题(10-20个字)。只返回标题文本,不要任何其他内容。"
},
{
"role": "user",
"content": f"用户问题:{user_message[:200]}\n\nAI回复:{assistant_response[:500]}\n\n请生成一个简短的会话标题:"
}
]
# 使用 DeepMoney 模型(本地托管,支持长上下文)
response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.3,
max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192),
)
title = response.choices[0].message.content.strip()
# 处理 DeepMoney 的 ... 标签,只保留 之后的内容
if "" in title:
title = title.split("")[-1].strip()
# 清理可能的引号
title = title.strip('"\'')
# 限制长度
if len(title) > 30:
title = title[:27] + "..."
return title
except Exception as e:
logger.error(f"[Title] 生成标题失败: {e}")
# 降级:使用用户消息的前20个字符
return user_message[:20] + "..." if len(user_message) > 20 else user_message
async def process_query_stream(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
session_id: str = None,
user_id: str = None,
user_nickname: str = None,
user_avatar: str = None,
cookies: dict = None,
model_config: dict = None, # 新增:动态模型配置
chat_history: List[dict] = None, # 新增:历史对话记录
is_new_session: bool = False, # 新增:是否是新会话(用于生成标题)
) -> AsyncGenerator[str, None]:
"""
主流程(流式输出)- 使用原生 OpenAI Tool Calling API
支持 vLLM 的 --enable-auto-tool-choice --tool-call-parser qwen3_coder
"""
logger.info(f"[Agent Stream] 处理查询: {user_query}")
if chat_history:
logger.info(f"[Agent Stream] 带有 {len(chat_history)} 条历史消息")
if is_new_session:
logger.info(f"[Agent Stream] 这是新会话,将在完成后生成标题")
# 将 cookies 存储为实例属性,供工具调用时使用
self.cookies = cookies or {}
# 如果传入了自定义模型配置,使用自定义配置,否则使用默认 LLM
if model_config:
llm_client = OpenAI(
api_key=model_config["api_key"],
base_url=model_config["base_url"],
)
llm_model = model_config["model"]
llm_max_tokens = model_config.get("max_tokens", 8192)
logger.info(f"[Agent Stream] 使用自定义模型: {llm_model}")
else:
llm_client = self.llm_client
llm_model = self.llm_model
llm_max_tokens = self.llm_max_tokens
logger.info(f"[Agent Stream] 使用默认模型: {llm_model}")
# 将工具列表转换为 OpenAI tools 格式
openai_tools = []
for tool in tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
})
logger.info(f"[Agent Stream] 已加载 {len(openai_tools)} 个工具")
# 获取当前时间信息
now = datetime.now()
current_time_info = f"""当前时间: {now.strftime('%Y年%m月%d日 %H:%M:%S')} {['周一', '周二', '周三', '周四', '周五', '周六', '周日'][now.weekday()]}
A股交易时间: 上午 9:30-11:30,下午 13:00-15:00
时间语义: "今天"={now.strftime('%Y-%m-%d')}, "最近"=最近5-10个交易日"""
# 构建系统提示词(适用于原生 tool calling)
system_prompt = f"""你是"价小前",北京价值前沿科技公司的AI投研聊天助手。
## 你的能力
- 专业领域: 股票投资研究、市场分析、新闻解读、财务分析
- 你可以调用各种工具来查询股票数据、新闻、概念板块等信息
- 根据用户问题,智能选择并调用合适的工具
{current_time_info}
## 重要知识
- 贵州茅台: 600519
- 涨停: 涨幅约10%
- 概念板块: 相同题材股票分类
## 工具使用原则
1. 根据用户问题,选择最合适的工具
2. 可以多次调用工具来完成复杂任务
3. 获取数据后,给出专业的分析和总结
4. 如果需要总结新闻类数据,使用 summarize_news 工具
## 输出要求
- 使用 Markdown 格式,结构清晰
- 重要数据用 **加粗** 标注
- 如有数值数据,可使用 ECharts 图表展示(使用 ```echarts 代码块)"""
try:
# 发送开始事件
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
# 构建消息列表
messages = [
{"role": "system", "content": system_prompt},
]
# 添加历史对话(最近 10 轮)
if chat_history:
recent_history = chat_history[-10:]
for msg in recent_history:
role = "user" if msg.get("message_type") == "user" else "assistant"
content = msg.get("message", "")
if len(content) > 500:
content = content[:500] + "..."
messages.append({"role": role, "content": content})
logger.info(f"[Agent Stream] 已添加 {len(recent_history)} 条历史消息到上下文")
# 添加当前用户查询
messages.append({"role": "user", "content": user_query})
# 用于收集执行结果
step_results = []
collected_data = {}
plan_steps = [] # 记录执行的步骤,用于前端显示
step_index = 0
max_tool_calls = 10 # 最大工具调用次数,防止无限循环
yield self._format_sse("status", {"stage": "thinking", "message": "正在分析问题..."})
# 循环处理,直到模型不再调用工具
while step_index < max_tool_calls:
logger.info(f"[Agent Stream] 第 {step_index + 1} 轮 LLM 调用")
# 使用原生 tool calling(非流式,因为需要获取 tool_calls)
try:
response = llm_client.chat.completions.create(
model=llm_model,
messages=messages,
tools=openai_tools,
tool_choice="auto",
temperature=0.7,
max_tokens=llm_max_tokens,
stream=False, # 工具调用需要非流式
)
except Exception as e:
logger.error(f"[Agent Stream] LLM 调用失败: {e}")
raise
assistant_message = response.choices[0].message
logger.info(f"[Agent Stream] LLM 响应: finish_reason={response.choices[0].finish_reason}")
# 获取工具调用(优先使用原生 tool_calls,其次解析文本格式)
native_tool_calls = assistant_message.tool_calls or []
text_tool_calls = []
# 如果没有原生工具调用,尝试从文本内容中解析
if not native_tool_calls and assistant_message.content:
content = assistant_message.content
# 检查是否包含工具调用标记(包括 DSML 格式)
has_tool_markers = (
'' in content or
'```tool_call' in content or
'"tool":' in content or
'DSML' in content or # DeepSeek DSML 格式
'|DSML|' in content # 全角竖线版本
)
if has_tool_markers:
logger.info(f"[Agent Stream] 尝试从文本内容解析工具调用")
logger.info(f"[Agent Stream] 内容预览: {content[:500]}")
text_tool_calls = self._parse_text_tool_calls(content)
# 检查是否有工具调用(原生或文本格式)
if native_tool_calls:
logger.info(f"[Agent Stream] 检测到 {len(native_tool_calls)} 个原生工具调用")
# 将 assistant 消息添加到历史(包含 tool_calls)
messages.append(assistant_message)
# 如果是第一次工具调用,发送计划事件
if step_index == 0:
# 构建计划数据
plan_data = {
"goal": f"分析用户问题:{user_query[:50]}...",
"reasoning": "使用工具获取相关数据进行分析",
"steps": []
}
for tc in native_tool_calls:
try:
args = json.loads(tc.function.arguments) if tc.function.arguments else {}
except:
args = {}
plan_data["steps"].append({
"tool": tc.function.name,
"arguments": args,
"reason": f"调用 {tc.function.name}"
})
yield self._format_sse("plan", plan_data)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(native_tool_calls)} 个工具调用"})
# 执行每个工具调用
for tool_call in native_tool_calls:
tool_name = tool_call.function.name
tool_call_id = tool_call.id
try:
arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
arguments = {}
logger.warning(f"[Agent Stream] 工具参数解析失败: {tool_call.function.arguments}")
logger.info(f"[Tool Call] ========== 工具调用开始 ==========")
logger.info(f"[Tool Call] 工具名: {tool_name}")
logger.info(f"[Tool Call] tool_call_id: {tool_call_id}")
logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False)}")
# 发送步骤开始事件
yield self._format_sse("step_start", {
"step_index": step_index,
"tool": tool_name,
"arguments": arguments,
"reason": f"调用 {tool_name}",
})
start_time = datetime.now()
try:
# 特殊处理 summarize_news
if tool_name == "summarize_news":
data_arg = arguments.get("data", "")
if data_arg in ["前面的新闻数据", "前面收集的所有数据", ""]:
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(tool_name, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
# 记录结果
step_result = StepResult(
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
step_results.append(step_result)
collected_data[f"step_{step_index+1}_{tool_name}"] = result
plan_steps.append({"tool": tool_name, "arguments": arguments, "reason": f"调用 {tool_name}"})
# 发送步骤完成事件
yield self._format_sse("step_complete", {
"step_index": step_index,
"tool": tool_name,
"status": "success",
"result": result,
"execution_time": execution_time,
})
# 将工具结果添加到消息历史
result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result)
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_str[:5000] # 截断过长的结果
})
logger.info(f"[Tool Call] 执行成功,耗时 {execution_time:.2f}s")
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
step_result = StepResult(
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="failed",
error=error_msg,
execution_time=execution_time,
)
step_results.append(step_result)
# 发送步骤失败事件
yield self._format_sse("step_complete", {
"step_index": step_index,
"tool": tool_name,
"status": "failed",
"error": error_msg,
"execution_time": execution_time,
})
# 将错误信息添加到消息历史
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": f"工具执行失败: {error_msg}"
})
logger.error(f"[Tool Call] 执行失败: {error_msg}")
logger.info(f"[Tool Call] ========== 工具调用结束 ==========")
step_index += 1
elif text_tool_calls:
# 处理文本格式的工具调用
logger.info(f"[Agent Stream] 检测到 {len(text_tool_calls)} 个文本格式工具调用")
# 将 assistant 消息添加到历史
messages.append({"role": "assistant", "content": assistant_message.content})
# 如果是第一次工具调用,发送计划事件
if step_index == 0:
plan_data = {
"goal": f"分析用户问题:{user_query[:50]}...",
"reasoning": "使用工具获取相关数据进行分析",
"steps": [
{"tool": tc["name"], "arguments": tc["arguments"], "reason": f"调用 {tc['name']}"}
for tc in text_tool_calls
]
}
yield self._format_sse("plan", plan_data)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(text_tool_calls)} 个工具调用"})
# 执行每个工具调用
for tc in text_tool_calls:
tool_name = tc["name"]
arguments = tc["arguments"]
tool_call_id = f"text_call_{step_index}_{tool_name}"
logger.info(f"[Tool Call] ========== 文本工具调用开始 ==========")
logger.info(f"[Tool Call] 工具名: {tool_name}")
logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False)}")
# 发送步骤开始事件
yield self._format_sse("step_start", {
"step_index": step_index,
"tool": tool_name,
"arguments": arguments,
"reason": f"调用 {tool_name}",
})
start_time = datetime.now()
try:
# 特殊处理 summarize_news
if tool_name == "summarize_news":
data_arg = arguments.get("data", "")
if data_arg in ["前面的新闻数据", "前面收集的所有数据", ""]:
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(tool_name, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
# 记录结果
step_result = StepResult(
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
step_results.append(step_result)
collected_data[f"step_{step_index+1}_{tool_name}"] = result
plan_steps.append({"tool": tool_name, "arguments": arguments, "reason": f"调用 {tool_name}"})
# 发送步骤完成事件
yield self._format_sse("step_complete", {
"step_index": step_index,
"tool": tool_name,
"status": "success",
"result": result,
"execution_time": execution_time,
})
# 将工具结果添加到消息历史(简化格式,因为模型可能不支持标准 tool 消息)
result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result)
messages.append({
"role": "user",
"content": f"[工具调用结果] {tool_name}: {result_str[:3000]}"
})
logger.info(f"[Tool Call] 执行成功,耗时 {execution_time:.2f}s")
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
step_result = StepResult(
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="failed",
error=error_msg,
execution_time=execution_time,
)
step_results.append(step_result)
yield self._format_sse("step_complete", {
"step_index": step_index,
"tool": tool_name,
"status": "failed",
"error": error_msg,
"execution_time": execution_time,
})
messages.append({
"role": "user",
"content": f"[工具调用失败] {tool_name}: {error_msg}"
})
logger.error(f"[Tool Call] 执行失败: {error_msg}")
logger.info(f"[Tool Call] ========== 文本工具调用结束 ==========")
step_index += 1
else:
# 没有工具调用,模型生成了最终回复
logger.info(f"[Agent Stream] 模型生成最终回复")
break
# 构建 plan 对象(用于保存到 ES)
plan = ExecutionPlan(
goal=f"分析用户问题:{user_query[:50]}...",
reasoning="使用工具获取相关数据进行分析",
steps=[ToolCall(tool=s["tool"], arguments=s["arguments"], reason=s["reason"]) for s in plan_steps],
)
# 阶段3: 生成最终总结
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
# 初始化 final_summary
final_summary = ""
if not successful_results and step_index == 0:
# 如果没有执行任何工具(模型直接回复),使用模型的回复
if assistant_message and assistant_message.content:
final_summary = assistant_message.content
# 流式发送(虽然已经是完整的,但保持前端兼容)
yield self._format_sse("summary_chunk", {"content": final_summary})
else:
final_summary = "抱歉,我无法处理您的请求。"
yield self._format_sse("summary_chunk", {"content": final_summary})
elif not successful_results:
# 所有步骤都失败
final_summary = "很抱歉,所有步骤都执行失败,无法生成分析报告。"
yield self._format_sse("summary_chunk", {"content": final_summary})
else:
# 有成功的工具调用,使用流式 API 生成最终回复
try:
# 使用流式 API 生成最终回复(不再传入 tools,让模型生成文本回复)
summary_stream = llm_client.chat.completions.create(
model=llm_model,
messages=messages, # messages 已包含所有工具调用历史
temperature=0.7,
max_tokens=llm_max_tokens,
stream=True, # 启用流式输出
)
# 逐块发送总结内容
for chunk in summary_stream:
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
final_summary += content_chunk
# 发送总结片段
yield self._format_sse("summary_chunk", {
"content": content_chunk
})
logger.info("[Summary] 流式总结完成")
except Exception as llm_error:
logger.error(f"[Summary] 流式总结失败: {llm_error}")
# 降级:使用工具调用结果的简单拼接
results_text = "\n\n".join([
f"**{r.tool}**: {str(r.result)[:500]}..."
for r in successful_results[:5]
])
final_summary = f"根据查询结果:\n\n{results_text}"
yield self._format_sse("summary_chunk", {"content": final_summary})
logger.warning("[Summary] 使用降级方案")
# 发送完整的总结和元数据
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps) if plan_steps else 0,
"successful_steps": len(successful_results),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results) if step_results else 0,
},
})
# 保存 Agent 回复到 ES(如果提供了 session_id)
if session_id and user_id:
try:
# 将执行步骤转换为 JSON 字符串
steps_json = json.dumps(
[{"tool": step.tool, "status": step.status, "result": step.result} for step in step_results],
ensure_ascii=False
)
# 将 plan 转换为 JSON 字符串(ES 中 plan 字段是 text 类型)
plan_json = json.dumps({
"goal": plan.goal,
"reasoning": plan.reasoning,
"steps": [{"tool": step.tool, "arguments": step.arguments, "reason": step.reason} for step in plan.steps]
}, ensure_ascii=False)
# 如果是新会话,生成会话标题
session_title = None
if is_new_session:
try:
session_title = await self.generate_session_title(user_query, final_summary)
logger.info(f"[Title] 新会话标题: {session_title}")
except Exception as title_error:
logger.error(f"[Title] 生成标题失败: {title_error}")
# 降级:使用用户消息的前 20 个字符
session_title = user_query[:20] + "..." if len(user_query) > 20 else user_query
es_client.save_chat_message(
session_id=session_id,
user_id=user_id,
user_nickname=user_nickname or "匿名用户",
user_avatar=user_avatar or "",
message_type="assistant",
message=final_summary,
plan=plan_json,
steps=steps_json,
session_title=session_title, # 新会话时保存标题
)
logger.info(f"[ES] Agent 回复已保存到会话 {session_id}")
# 如果生成了标题,通过 SSE 发送给前端
if session_title:
yield self._format_sse("session_title", {"title": session_title})
except Exception as e:
logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True)
# 发送完成事件(包含 session_id)
yield self._format_sse("done", {"message": "处理完成", "session_id": session_id})
except Exception as e:
logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)
yield self._format_sse("error", {"message": f"处理失败: {str(e)}"})
def _format_sse(self, event: str, data: dict) -> str:
"""格式化 SSE 消息"""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def _parse_text_tool_calls(self, content: str) -> List[Dict[str, Any]]:
"""
解析文本格式的工具调用
支持的格式:
1. value
2. ```tool_call\n{"name": "xxx", "arguments": {...}}\n```
3. DeepSeek DSML 格式: <|DSML|function_calls> <|DSML|invoke name="xxx"> <|DSML|parameter name="yyy" string="true">value|DSML|parameter> |DSML|invoke> |DSML|function_calls>
返回: [{"name": "tool_name", "arguments": {...}}, ...]
"""
import re
tool_calls = []
# 格式1: 标签格式
# 例如: 300274
pattern1 = r'\s*(.*?)\s*'
matches1 = re.findall(pattern1, content, re.DOTALL)
for func_name, params_str in matches1:
arguments = {}
# 解析参数: value
param_pattern = r'\s*(.*?)\s*'
param_matches = re.findall(param_pattern, params_str, re.DOTALL)
for param_name, param_value in param_matches:
# 尝试解析 JSON 值,否则作为字符串
param_value = param_value.strip()
try:
arguments[param_name] = json.loads(param_value)
except:
arguments[param_name] = param_value
tool_calls.append({
"name": func_name,
"arguments": arguments
})
# 格式2: ```tool_call 代码块格式
pattern2 = r'```tool_call\s*\n?(.*?)\n?```'
matches2 = re.findall(pattern2, content, re.DOTALL)
for match in matches2:
try:
data = json.loads(match.strip())
if isinstance(data, dict) and "name" in data:
tool_calls.append({
"name": data["name"],
"arguments": data.get("arguments", {})
})
except:
pass
# 格式3: 直接 JSON 格式 {"tool": "xxx", "arguments": {...}}
pattern3 = r'\{\s*"tool"\s*:\s*"(\w+)"\s*,\s*"arguments"\s*:\s*(\{[^}]*\})\s*\}'
matches3 = re.findall(pattern3, content)
for tool_name, args_str in matches3:
try:
arguments = json.loads(args_str)
tool_calls.append({
"name": tool_name,
"arguments": arguments
})
except:
pass
# 格式4: DeepSeek DSML 格式(使用全角竖线 |)
# <|DSML|function_calls> <|DSML|invoke name="search_research_reports"> <|DSML|parameter name="query" string="true">AI概念股|DSML|parameter> |DSML|invoke> |DSML|function_calls>
# 注意:| 是全角字符
dsml_pattern = r'<[|\|]DSML[|\|]function_calls>(.*?)[|\|]DSML[|\|]function_calls>'
dsml_matches = re.findall(dsml_pattern, content, re.DOTALL)
for dsml_content in dsml_matches:
# 解析 invoke 标签
invoke_pattern = r'<[|\|]DSML[|\|]invoke\s+name="(\w+)">(.*?)[|\|]DSML[|\|]invoke>'
invoke_matches = re.findall(invoke_pattern, dsml_content, re.DOTALL)
for func_name, params_str in invoke_matches:
arguments = {}
# 解析参数: <|DSML|parameter name="xxx" string="true/false">value|DSML|parameter>
param_pattern = r'<[|\|]DSML[|\|]parameter\s+name="(\w+)"\s+string="(true|false)">(.*?)[|\|]DSML[|\|]parameter>'
param_matches = re.findall(param_pattern, params_str, re.DOTALL)
for param_name, is_string, param_value in param_matches:
param_value = param_value.strip()
if is_string == "false":
# 不是字符串,尝试解析为数字或 JSON
try:
arguments[param_name] = json.loads(param_value)
except:
# 尝试转为整数或浮点数
try:
arguments[param_name] = int(param_value)
except:
try:
arguments[param_name] = float(param_value)
except:
arguments[param_name] = param_value
else:
# 是字符串
arguments[param_name] = param_value
tool_calls.append({
"name": func_name,
"arguments": arguments
})
logger.info(f"[Text Tool Call] 解析到 {len(tool_calls)} 个工具调用: {tool_calls}")
return tool_calls
# 创建 Agent 实例(全局)
agent = MCPAgentIntegrated()
# ==================== Web聊天接口 ====================
class ChatMessage(BaseModel):
"""聊天消息"""
role: Literal["user", "assistant", "system"]
content: str
class ChatRequest(BaseModel):
"""聊天请求"""
messages: List[ChatMessage]
stream: bool = False
@app.post("/chat")
async def chat(request: ChatRequest):
"""
Web聊天接口
这是一个简化的接口,实际应该集成LLM API(如OpenAI、Claude等)
这里只是演示如何使用工具
"""
# TODO: 集成实际的LLM API
# 1. 将消息发送给LLM
# 2. LLM返回需要调用的工具
# 3. 调用工具并获取结果
# 4. 将工具结果返回给LLM
# 5. LLM生成最终回复
return {
"message": "Chat endpoint placeholder - integrate with your LLM provider",
"available_tools": len(TOOLS),
"hint": "Use POST /tools/call to invoke tools"
}
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(request: AgentChatRequest):
"""智能代理对话端点(非流式)"""
logger.info(f"Agent chat: {request.message} (user: {request.user_id})")
# ==================== 权限检查 ====================
# 订阅等级判断函数(与 app.py 保持一致)
def subscription_level(sub_type: str) -> int:
"""将订阅类型映射到等级数值,free=0, pro=1, max=2"""
mapping = {'free': 0, 'pro': 1, 'max': 2}
return mapping.get((sub_type or 'free').lower(), 0)
# 获取用户订阅类型(默认为 free)
user_subscription = (request.subscription_type or 'free').lower()
required_level = 'max'
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
if not has_access:
logger.warning(
f"权限检查失败 - user_id: {request.user_id}, "
f"nickname: {request.user_nickname}, "
f"subscription_type: {user_subscription}, "
f"required: {required_level}"
)
raise HTTPException(
status_code=403,
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
)
logger.info(
f"权限检查通过 - user_id: {request.user_id}, "
f"nickname: {request.user_nickname}, "
f"subscription_type: {user_subscription}"
)
# ==================== 会话管理 ====================
# 如果没有提供 session_id,创建新会话
session_id = request.session_id or str(uuid.uuid4())
is_new_session = not request.session_id
# 获取会话历史(用于多轮对话)
chat_history = []
if not is_new_session:
try:
history = es_client.get_chat_history(session_id, limit=20) # 最近20条
chat_history = history
logger.info(f"加载会话历史: {len(chat_history)} 条消息")
except Exception as e:
logger.error(f"获取会话历史失败: {e}")
# 保存用户消息到 ES
try:
es_client.save_chat_message(
session_id=session_id,
user_id=request.user_id or "anonymous",
user_nickname=request.user_nickname or "匿名用户",
user_avatar=request.user_avatar or "",
message_type="user",
message=request.message,
is_first_message=is_new_session,
)
except Exception as e:
logger.error(f"保存用户消息失败: {e}")
# 获取工具列表(根据前端选择过滤)
if request.tools and len(request.tools) > 0:
# 用户指定了工具列表,按名称过滤
selected_tool_names = set(request.tools)
tools = [tool.dict() for tool in TOOLS if tool.name in selected_tool_names]
logger.info(f"使用用户选择的 {len(tools)} 个工具: {request.tools[:10]}...")
else:
# 用户未指定,使用全部工具
tools = [tool.dict() for tool in TOOLS]
logger.info(f"使用全部 {len(tools)} 个工具")
# 添加特殊工具:summarize_news(始终可用)
tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据(JSON格式)"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响'、'投资机会'等"
}
},
"required": ["data"]
}
})
# 处理查询(传入会话历史实现多轮对话)
response = await agent.process_query(
user_query=request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
chat_history=chat_history,
)
# 保存 Agent 回复到 ES
session_title = None
try:
# 将执行步骤转换为JSON字符串
steps_json = json.dumps(
[{"tool": step.tool, "status": step.status, "result": step.result} for step in response.step_results],
ensure_ascii=False
)
# 将 plan 转换为 JSON 字符串(ES 中 plan 字段是 text 类型)
plan_json = json.dumps(response.plan.dict(), ensure_ascii=False) if response.plan else None
# 如果是新会话,生成会话标题
if is_new_session:
session_title = await agent.generate_session_title(request.message, response.final_summary)
logger.info(f"生成会话标题: {session_title}")
es_client.save_chat_message(
session_id=session_id,
user_id=request.user_id or "anonymous",
user_nickname=request.user_nickname or "匿名用户",
user_avatar=request.user_avatar or "",
message_type="assistant",
message=response.final_summary, # 使用 final_summary 而不是 final_answer
plan=plan_json, # 传递 JSON 字符串而不是字典
steps=steps_json,
session_title=session_title,
)
except Exception as e:
logger.error(f"保存 Agent 回复失败: {e}", exc_info=True)
# 在响应中返回 session_id 和 title
response_dict = response.dict()
response_dict["session_id"] = session_id
response_dict["session_title"] = session_title
return response_dict
@app.post("/agent/chat/stream")
async def agent_chat_stream(chat_request: AgentChatRequest, request: Request):
"""智能代理对话端点(流式 SSE)"""
logger.info(f"Agent chat stream: {chat_request.message}")
# 获取请求的 cookies(用于转发到需要认证的 API)
cookies = request.cookies
# ==================== 权限检查 ====================
# 订阅等级判断函数(与 app.py 保持一致)
def subscription_level(sub_type: str) -> int:
"""将订阅类型映射到等级数值,free=0, pro=1, max=2"""
mapping = {'free': 0, 'pro': 1, 'max': 2}
return mapping.get((sub_type or 'free').lower(), 0)
# 获取用户订阅类型(默认为 free)
user_subscription = (chat_request.subscription_type or 'free').lower()
required_level = 'max'
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
if not has_access:
logger.warning(
f"[Stream] 权限检查失败 - user_id: {chat_request.user_id}, "
f"nickname: {chat_request.user_nickname}, "
f"subscription_type: {user_subscription}, "
f"required: {required_level}"
)
raise HTTPException(
status_code=403,
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
)
logger.info(
f"[Stream] 权限检查通过 - user_id: {chat_request.user_id}, "
f"nickname: {chat_request.user_nickname}, "
f"subscription_type: {user_subscription}"
)
# 判断是否是新会话
is_new_session = not chat_request.session_id
session_id = chat_request.session_id or str(uuid.uuid4())
# ==================== 加载历史对话(多轮对话记忆)====================
chat_history = []
if not is_new_session:
try:
# 加载该会话的历史消息(最近 20 条)
history = es_client.get_chat_history(session_id, limit=20)
chat_history = history
logger.info(f"[Stream] 已加载 {len(chat_history)} 条历史消息")
except Exception as e:
logger.error(f"[Stream] 加载历史消息失败: {e}")
# 保存用户消息到 ES
try:
es_client.save_chat_message(
session_id=session_id,
user_id=chat_request.user_id or "anonymous",
user_nickname=chat_request.user_nickname or "匿名用户",
user_avatar=chat_request.user_avatar or "",
message_type="user",
message=chat_request.message,
is_first_message=is_new_session, # 标记是否为首条消息
)
logger.info(f"[ES] 用户消息已保存到会话 {session_id}")
except Exception as e:
logger.error(f"[ES] 保存用户消息失败: {e}")
# ==================== 动态工具过滤 ====================
# 获取所有可用工具
all_tools = [tool.dict() for tool in TOOLS]
# 添加特殊工具:summarize_news
all_tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据(JSON格式)"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响'、'投资机会'等"
}
},
"required": ["data"]
}
})
# 如果用户指定了工具列表,则进行过滤
if chat_request.tools is not None and len(chat_request.tools) > 0:
selected_tool_names = set(chat_request.tools)
tools = [tool for tool in all_tools if tool["name"] in selected_tool_names]
logger.info(f"[工具过滤] 用户选择了 {len(tools)}/{len(all_tools)} 个工具: {selected_tool_names}")
else:
# 默认使用全部工具
tools = all_tools
logger.info(f"[工具过滤] 使用全部 {len(tools)} 个工具")
# ==================== 动态模型选择 ====================
selected_model = chat_request.model or "deepseek"
model_config = MODEL_CONFIGS.get(selected_model, MODEL_CONFIGS["deepseek"])
logger.info(f"[模型选择] 使用模型: {selected_model} ({model_config['model']})")
# 返回流式响应
return StreamingResponse(
agent.process_query_stream(
user_query=chat_request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
session_id=session_id,
user_id=chat_request.user_id,
user_nickname=chat_request.user_nickname,
user_avatar=chat_request.user_avatar,
cookies=cookies, # 传递 cookies 用于认证 API 调用
model_config=model_config, # 传递选中的模型配置
chat_history=chat_history, # 传递历史对话(多轮对话记忆)
is_new_session=is_new_session, # 传递是否是新会话(用于生成标题)
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
# ==================== 聊天记录管理 API ====================
@app.get("/agent/sessions")
async def get_chat_sessions(user_id: str, limit: int = 50):
"""
获取用户的聊天会话列表
Args:
user_id: 用户ID
limit: 返回数量(默认50)
Returns:
会话列表
"""
try:
sessions = es_client.get_chat_sessions(user_id, limit)
return {
"success": True,
"data": sessions,
"count": len(sessions)
}
except Exception as e:
logger.error(f"获取会话列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/agent/history/{session_id}")
async def get_chat_history(session_id: str, limit: int = 100):
"""
获取指定会话的聊天历史
Args:
session_id: 会话ID
limit: 返回数量(默认100)
Returns:
聊天记录列表
"""
try:
messages = es_client.get_chat_history(session_id, limit)
return {
"success": True,
"data": messages,
"count": len(messages)
}
except Exception as e:
logger.error(f"获取聊天历史失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/search")
async def search_chat_history(user_id: str, query: str, top_k: int = 10):
"""
向量搜索聊天历史
Args:
user_id: 用户ID
query: 查询文本
top_k: 返回数量(默认10)
Returns:
相关聊天记录列表
"""
try:
results = es_client.search_chat_history(user_id, query, top_k)
return {
"success": True,
"data": results,
"count": len(results)
}
except Exception as e:
logger.error(f"向量搜索失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== 投研会议室系统 (V2 - 流式+工具调用) ====================
import random
# 投研会议室专用模型配置
MEETING_MODEL_CONFIGS = {
"kimi-k2-thinking": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-k2-thinking",
},
"deepseek": {
"api_key": "sk-7363bdb28d7d4bf0aa68eb9449f8f063",
"base_url": "https://api.deepseek.com",
"model": "deepseek-chat",
},
"deepmoney": {
"api_key": "",
"base_url": "http://111.62.35.50:8000/v1",
"model": "deepmoney",
},
}
def clean_deepseek_tool_markers(content: str) -> str:
"""
清理 DeepSeek 模型输出中的工具调用标记
DeepSeek 有时会以文本形式输出工具调用,格式如:
<|tool▁calls▁begin|><|tool▁call▁begin|>tool_name<|tool▁sep|>{"args": "value"}<|tool▁call▁end|><|tool▁calls▁end|>
"""
import re
if not content:
return content
# 清理 DeepSeek 工具调用标记
# 匹配 <|tool▁calls▁begin|> ... <|tool▁calls▁end|> 整个块
pattern = r'<|tool▁calls▁begin|>.*?<|tool▁calls▁end|>'
cleaned = re.sub(pattern, '', content, flags=re.DOTALL)
# 也清理可能残留的单个标记
markers = [
'<|tool▁calls▁begin|>',
'<|tool▁calls▁end|>',
'<|tool▁call▁begin|>',
'<|tool▁call▁end|>',
'<|tool▁sep|>',
]
for marker in markers:
cleaned = cleaned.replace(marker, '')
return cleaned.strip()
# 每个角色可用的工具列表
ROLE_TOOLS = {
"buffett": ["search_china_news", "search_research_reports", "get_stock_basic_info", "get_stock_financial_index"],
"big_short": ["search_china_news", "get_stock_financial_index", "get_stock_balance_sheet", "get_stock_cashflow"],
"simons": [
# 基础数据
"get_stock_trade_data", "search_limit_up_stocks", "get_concept_statistics",
# 经典技术指标
"get_macd_signal", "check_oscillator_status", "analyze_bollinger_bands", "calc_stop_loss_atr",
# 资金与情绪
"analyze_market_heat", "check_volume_price_divergence", "analyze_obv_trend",
# 形态与突破
"check_new_high_breakout", "identify_candlestick_pattern", "find_price_gaps",
# 风险与估值
"calc_max_drawdown", "check_valuation_rank", "calc_price_zscore",
# 分钟级高阶算子
"calc_market_profile_vpoc", "calc_realized_volatility", "analyze_buying_pressure", "calc_parkinson_volatility",
# 高级趋势分析
"calc_bollinger_squeeze", "calc_trend_slope", "calc_hurst_exponent", "decompose_trend_simple",
# 流动性与统计
"calc_amihud_illiquidity", "calc_price_entropy", "calc_rsi_divergence",
# 配对与策略
"test_cointegration", "calc_kelly_position", "search_similar_kline",
# 综合分析
"get_comprehensive_analysis",
],
"leek": [], # 韭菜不用工具
"fund_manager": ["search_china_news", "search_research_reports", "get_stock_basic_info"],
}
# 投研会议室角色配置
MEETING_ROLES = {
"buffett": {
"id": "buffett",
"name": "巴菲特",
"nickname": "唱多者",
"role_type": "bull",
"avatar": "/images/agent/巴菲特.png",
"model": "kimi-k2-thinking",
"color": "#10B981",
"description": "主观多头,善于分析事件的潜在利好和长期价值",
"tools": ROLE_TOOLS["buffett"],
"system_prompt": """你是"巴菲特",一位资深的价值投资者,以长期持有优质公司著称。你在投研会议中担任「看多分析师」角色。
## 你的投资哲学
- **护城河理论**:寻找具有持久竞争优势的公司(品牌、成本优势、网络效应、转换成本)
- **安全边际**:以低于内在价值的价格买入,为错误留有余地
- **长期主义**:关注企业的长期盈利能力,而非短期波动
- **能力圈**:只投资自己能理解的业务
## 分析框架(请按此思维链分析)
### 第一步:收集数据
必须先调用工具获取事实依据,不要凭空臆断:
- `search_china_news`: 搜索该标的/事件的最新新闻动态
- `search_research_reports`: 获取券商研报的专业观点
- `get_stock_basic_info`: 了解公司基本面(主营业务、行业地位)
- `get_stock_financial_index`: 获取关键财务指标(ROE、毛利率、营收增速)
### 第二步:价值分析维度
基于获取的数据,从以下维度寻找看多逻辑:
1. **商业模式**:盈利模式是否清晰?是否有复购/粘性?
2. **竞争优势**:护城河是什么?能持续多久?
3. **成长空间**:行业天花板多高?市占率提升空间?
4. **管理层**:管理团队是否优秀?是否与股东利益一致?
5. **估值水平**:当前估值是否合理?有无安全边际?
### 第三步:形成结论
给出明确的看多观点,必须包含:
- **核心看多逻辑**(1-2个最关键的理由)
- **数据支撑**(引用工具返回的具体数据)
- **潜在催化剂**(什么因素可能推动股价上涨)
## 输出要求
- 必须基于工具返回的数据发表观点,不要编造数据
- 观点要有说服力,但不要盲目乐观
- 如果前面有其他人发言,要针对性回应,特别是反驳空头观点
- 发言控制在 250 字以内,言简意赅"""
},
"big_short": {
"id": "big_short",
"name": "大空头",
"nickname": "大空头",
"role_type": "bear",
"avatar": "/images/agent/大空头.png",
"model": "kimi-k2-thinking",
"color": "#EF4444",
"description": "善于分析事件和财报中的风险因素",
"tools": ROLE_TOOLS["big_short"],
"system_prompt": """你是"大空头",一位专业的风险分析师,擅长发现市场忽视的风险。你在投研会议中担任「看空分析师」角色。
## 你的分析理念
- **逆向思维**:当所有人都看好时,寻找潜在的风险点
- **财务侦探**:深挖财报,发现隐藏的问题(应收账款异常、存货积压、现金流恶化)
- **估值锚定**:警惕估值泡沫,历史证明高估值终将回归
- **黑天鹅意识**:关注尾部风险,小概率事件一旦发生杀伤力巨大
## 分析框架(请按此思维链分析)
### 第一步:收集数据
必须先调用工具获取事实依据,重点关注负面信息:
- `search_china_news`: 搜索该标的的负面新闻、风险事件、监管处罚
- `get_stock_financial_index`: 获取财务指标,关注 ROE 下滑、负债率、应收账款周转
- `get_stock_balance_sheet`: 分析资产质量(商誉减值风险、存货跌价、应收账款坏账)
- `get_stock_cashflow`: 检查现金流健康度(经营现金流是否覆盖净利润?是否靠筹资续命?)
### 第二步:风险分析维度
基于获取的数据,从以下维度挖掘风险:
1. **财务风险**:
- 应收账款/营收比例是否异常?(可能虚增收入)
- 存货周转是否恶化?(可能滞销)
- 经营现金流/净利润比例?(< 80% 需警惕)
- 商誉/净资产比例?(> 30% 有减值风险)
2. **业务风险**:
- 行业是否见顶?增速是否放缓?
- 竞争是否加剧?毛利率是否下滑?
- 客户集中度是否过高?
3. **估值风险**:
- PE/PB 处于历史什么分位?
- 相比同行是否高估?
- 业绩能否支撑当前估值?
4. **外部风险**:
- 政策风险?监管趋严?
- 行业黑天鹅?技术颠覆?
### 第三步:形成结论
给出明确的风险警示,必须包含:
- **核心风险点**(1-2个最致命的风险)
- **数据支撑**(引用工具返回的具体异常数据)
- **风险触发条件**(什么情况下风险会爆发)
## 输出要求
- 必须基于工具返回的数据指出风险,不要无中生有
- 分析要犀利深刻,但不要为了唱空而唱空
- 如果前面有多头发言,要针对性反驳,指出其逻辑漏洞
- 发言控制在 250 字以内,直击要害"""
},
"simons": {
"id": "simons",
"name": "量化研究员",
"nickname": "西蒙斯",
"role_type": "quant",
"avatar": "/images/agent/simons.png",
"model": "kimi-k2-thinking",
"color": "#3B82F6",
"description": "中性立场,使用专业量化因子分析技术指标和市场特征",
"tools": ROLE_TOOLS["simons"],
"system_prompt": """你是"量化研究员"(昵称:西蒙斯),一位专业的量化交易研究员,擅长使用各类量化因子分析市场。你在投研会议中担任「技术分析师」角色,保持中性客观。
## 你的分析理念
- **因子驱动**:使用经过验证的量化因子,而非主观判断
- **概率思维**:没有确定性,只有概率和赔率
- **多维验证**:从趋势、动量、波动、资金多个维度交叉验证
- **风险量化**:用数字衡量风险,止损止盈有据可依
## 你可用的量化因子工具(28个)
### 快速综合分析(推荐首选)
- `get_comprehensive_analysis`: 一次性获取MACD、RSI、KDJ、布林带、量能、K线形态等多指标汇总
### 趋势与动量因子
- `get_macd_signal`: MACD趋势判定(金叉/死叉/背离)
- `calc_trend_slope`: 趋势线性回归斜率(R²拟合度)
- `calc_hurst_exponent`: Hurst指数(判断趋势/震荡市场)
- `check_new_high_breakout`: 唐奇安通道突破(新高/新低信号)
### 超买超卖因子
- `check_oscillator_status`: KDJ/RSI超买超卖状态
- `calc_rsi_divergence`: RSI背离检测(顶底背离)
- `calc_price_zscore`: Z-Score均值回归(乖离率标准化)
### 波动率因子
- `analyze_bollinger_bands`: 布林带通道分析
- `calc_bollinger_squeeze`: 布林带挤压(变盘预警)
- `calc_stop_loss_atr`: ATR动态止损位
- `calc_realized_volatility`: 分钟级已实现波动率
- `calc_parkinson_volatility`: 帕金森波动率(更精确)
### 资金流向与量价因子
- `analyze_market_heat`: 换手率活跃度+OBV趋势
- `analyze_obv_trend`: OBV能量潮独立分析
- `check_volume_price_divergence`: 量价背离检测
- `analyze_buying_pressure`: 买卖压力失衡(主力意图)
- `calc_market_profile_vpoc`: VPOC筹码峰(成交密集区)
### 形态识别因子
- `identify_candlestick_pattern`: K线组合形态(10+种)
- `find_price_gaps`: 跳空缺口分析
- `search_similar_kline`: 相似K线检索(历史形态预测)
### 风险与估值因子
- `calc_max_drawdown`: 最大回撤+夏普比率
- `check_valuation_rank`: PE历史百分位+PEG
- `calc_amihud_illiquidity`: Amihud流动性因子
### 高级分析因子
- `decompose_trend_simple`: 趋势分解(趋势+周期+残差)
- `calc_price_entropy`: 价格熵值(市场混乱度)
- `test_cointegration`: 协整性测试(配对交易)
- `calc_kelly_position`: 凯利公式最优仓位
## 分析框架(请按此流程)
### 第一步:快速扫描
首先调用 `get_comprehensive_analysis` 获取综合技术面快照,了解整体状况。
### 第二步:深度分析(根据情况选择)
根据综合分析结果,选择相关因子深入分析:
- 如果趋势不明:调用 `calc_hurst_exponent` 判断市场类型,`calc_trend_slope` 量化趋势强度
- 如果疑似顶底:调用 `calc_rsi_divergence` 检测背离,`calc_bollinger_squeeze` 看是否变盘
- 如果量能异常:调用 `analyze_obv_trend` 看资金流向,`analyze_buying_pressure` 看主力意图
- 如果波动加大:调用 `calc_realized_volatility` 或 `calc_parkinson_volatility` 精确测量
- 如果要设止损:调用 `calc_stop_loss_atr` 获取ATR止损位
### 第三步:形成结论
给出量化分析结论,必须包含:
- **核心因子信号**(列出2-3个关键因子的具体数值和判断)
- **趋势判断**(上涨/下跌/震荡,并给出概率估计)
- **关键价位**(支撑位、压力位、止损位)
- **量化建议**(基于因子信号的交易建议)
## 输出要求
- **必须调用工具**:至少调用1个综合分析+1-2个专项因子
- **数据说话**:每个结论都要有具体数值支撑
- **保持中性**:不偏向多头或空头,让因子说话
- **简洁专业**:发言控制在 300 字以内,用专业术语但要解释关键数值含义"""
},
"leek": {
"id": "leek",
"name": "韭菜",
"nickname": "牢大",
"role_type": "retail",
"avatar": "/images/agent/牢大.png",
"model": "deepmoney",
"color": "#F59E0B",
"description": "贪婪又讨厌亏损,热爱追涨杀跌",
"tools": [],
"system_prompt": """你是"韭菜"(昵称:牢大),一个典型的散户投资者。你在投研会议中代表普通散户的声音。
## 你的特点
- **贪婪与恐惧**:涨了怕踏空,跌了怕套牢
- **追涨杀跌**:看到涨停就想冲,看到下跌就想跑
- **消息驱动**:特别喜欢小道消息,相信"内部消息"
- **急功近利**:想一夜暴富,对慢牛没耐心
- **从众心理**:别人买什么就跟什么,别人卖就恐慌
## 你的语言风格
- 口语化、情绪化
- 喜欢用股吧/雪球常见的散户用语
- 会用网络流行语和表情
- 经常问"能不能上车"、"还能涨吗"、"要不要割肉"
## 回应方式
1. 如果看到利好消息:兴奋,想冲进去,担心踏空
2. 如果看到利空消息:恐慌,想跑路,后悔没早卖
3. 如果看到专业分析:似懂非懂,但容易被带节奏
4. 对量化分析:完全听不懂,直接问"说人话,能买吗"
## 输出要求
- 不需要调用工具,直接用散户视角发表看法
- 参考前面其他人的发言,用最朴素直白的方式回应
- 要体现散户的真实心态和困惑
- 发言控制在 150 字以内,要接地气"""
},
"fund_manager": {
"id": "fund_manager",
"name": "基金经理",
"nickname": "决策者",
"role_type": "manager",
"avatar": "/images/agent/基金经理.png",
"model": "deepseek",
"color": "#8B5CF6",
"description": "综合分析做出最终决策",
"tools": ROLE_TOOLS["fund_manager"],
"system_prompt": """你是"基金经理",投研会议的最终决策者。你需要综合所有人的观点,做出理性的投资建议。
## 你的角色定位
- **决策者**:综合多空观点,做出最终判断
- **风控官**:平衡收益与风险,不能只看收益
- **裁判员**:评估各方观点的质量和可信度
- **智者**:韭菜的观点通常是反向指标,要辩证看待
## 分析框架
### 第一步:回顾各方观点
总结前面发言者的核心观点:
- 多头(巴菲特)的看多逻辑和依据
- 空头(大空头)的风险提示和依据
- 量化(西蒙斯)的技术分析结论
- 韭菜(牢大)的市场情绪反映
### 第二步:观点评估
评估各方观点的质量:
- 哪些观点有数据支撑?
- 哪些观点逻辑自洽?
- 哪些观点可能存在偏见?
- 韭菜的观点是否构成反向指标?
### 第三步:形成决策
如果需要补充信息,可以调用工具:
- `search_china_news`: 搜索补充信息
- `search_research_reports`: 获取券商观点
- `get_stock_basic_info`: 确认基本面信息
## 输出格式(必须严格按此格式)
### 📊 综合评估
[对该标的/事件的整体判断,2-3句话]
### 🎯 关键观点
- 多头核心逻辑:[一句话总结]
- 空头核心逻辑:[一句话总结]
- 本次采纳:[采纳哪方观点,为什么]
### ⚠️ 风险提示
[列出 2-3 个需要关注的风险]
### 💡 操作建议
- **建议**:买入 / 增持 / 持有 / 减持 / 卖出 / 观望
- **仓位**:建议仓位比例(如 10%-20%)
- **时机**:短期/中期/长期
### 📈 信心指数:X/10
[给出 1-10 分的信心评分,并简述理由]
## 输出要求
- 必须综合所有人的观点,不能只听一方
- 决策要有理有据,不能拍脑袋
- 要给出明确的操作建议,不要模棱两可
- 发言控制在 350 字以内"""
}
}
class MeetingRequest(BaseModel):
"""投研会议请求"""
topic: str
user_id: str = "anonymous"
user_nickname: str = "匿名用户"
session_id: Optional[str] = None
user_message: Optional[str] = None
conversation_history: List[Dict[str, Any]] = []
def get_random_speaking_order() -> List[str]:
"""随机生成发言顺序(不包括基金经理)"""
roles = ["buffett", "big_short", "simons", "leek"]
random.shuffle(roles)
return roles
async def call_role_tool(role_id: str, tool_name: str, arguments: dict) -> dict:
"""调用角色的工具"""
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
return {"success": False, "error": f"Unknown tool: {tool_name}"}
try:
result = await handler(arguments)
return {"success": True, "tool": tool_name, "result": result}
except Exception as e:
logger.error(f"Tool {tool_name} failed: {e}")
return {"success": False, "tool": tool_name, "error": str(e)}
async def stream_role_response(
role_id: str,
topic: str,
context: str,
tools: List[dict]
) -> AsyncGenerator[dict, None]:
"""流式生成角色回复,支持工具调用"""
role = MEETING_ROLES.get(role_id)
if not role:
yield {"type": "error", "error": f"Unknown role: {role_id}"}
return
model_name = role["model"]
model_config = MEETING_MODEL_CONFIGS.get(model_name)
if not model_config:
yield {"type": "error", "error": f"Unknown model: {model_name}"}
return
try:
client = OpenAI(
api_key=model_config["api_key"],
base_url=model_config["base_url"],
timeout=180
)
messages = [
{"role": "system", "content": role["system_prompt"]},
{"role": "user", "content": f"议题:{topic}\n\n{context}"}
]
# 准备工具定义(如果该角色有工具)
role_tool_names = role.get("tools", [])
openai_tools = None
if role_tool_names:
openai_tools = []
for tool in TOOLS:
if tool.name in role_tool_names:
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
# 第一次调用:可能触发工具调用
tool_calls_made = []
# 从模型配置获取 max_tokens,默认 8192
max_tokens = model_config.get("max_tokens", 8192)
if openai_tools:
response = client.chat.completions.create(
model=model_config["model"],
messages=messages,
tools=openai_tools,
tool_choice="auto",
stream=False, # 工具调用不使用流式
temperature=0.7,
max_tokens=max_tokens,
)
assistant_message = response.choices[0].message
# 处理工具调用
if assistant_message.tool_calls:
messages.append(assistant_message)
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_call_id = tool_call.id
try:
arguments = json.loads(tool_call.function.arguments)
except:
arguments = {}
# 发送工具调用开始事件
yield {
"type": "tool_call_start",
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"arguments": arguments
}
# 执行工具调用
start_time = time.time()
result = await call_role_tool(role_id, tool_name, arguments)
execution_time = time.time() - start_time
tool_calls_made.append({
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"arguments": arguments,
"result": result,
"execution_time": execution_time
})
# 发送工具调用结果事件
yield {
"type": "tool_call_result",
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"result": result,
"status": "success" if result.get("success") else "error",
"execution_time": execution_time
}
# 添加工具结果到消息
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result, ensure_ascii=False)
})
# 流式生成最终回复
stream = client.chat.completions.create(
model=model_config["model"],
messages=messages,
stream=True,
temperature=0.7,
max_tokens=max_tokens,
)
full_content = ""
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
yield {
"type": "content_delta",
"content": content
}
# 清理 DeepSeek 工具调用标记
full_content = clean_deepseek_tool_markers(full_content)
# 发送完成事件
yield {
"type": "content_done",
"full_content": full_content,
"tool_calls": tool_calls_made
}
except Exception as e:
logger.error(f"Role {role_id} stream failed: {e}")
yield {"type": "error", "error": str(e)}
@app.post("/agent/meeting/stream")
async def stream_investment_meeting(request: MeetingRequest):
"""
流式投研会议 V2
- 随机发言顺序
- 每个角色流式输出
- 支持工具调用
- 支持用户中途发言
"""
logger.info(f"[Meeting V2] 启动: {request.topic}")
async def generate_meeting_stream() -> AsyncGenerator[str, None]:
session_id = request.session_id or str(uuid.uuid4())
round_number = len(request.conversation_history) // 5 + 1
# 发送会话开始
yield f"data: {json.dumps({'type': 'session_start', 'session_id': session_id, 'round': round_number}, ensure_ascii=False)}\n\n"
# 构建上下文
context_parts = []
if request.conversation_history:
context_parts.append("之前的讨论:")
for msg in request.conversation_history:
context_parts.append(f"【{msg.get('role_name', '未知')}】:{msg.get('content', '')}")
if request.user_message:
context_parts.append(f"\n用户刚才说:{request.user_message}")
context = "\n".join(context_parts) if context_parts else "这是第一轮讨论,请针对议题发表你的观点。"
# 随机发言顺序
speaking_order = get_random_speaking_order()
yield f"data: {json.dumps({'type': 'order_decided', 'order': speaking_order}, ensure_ascii=False)}\n\n"
all_messages = []
accumulated_context = context
# 依次让每个角色发言
for role_id in speaking_order:
role = MEETING_ROLES[role_id]
# 发送开始发言事件
yield f"data: {json.dumps({'type': 'speaking_start', 'role_id': role_id, 'role_name': role['name'], 'color': role['color']}, ensure_ascii=False)}\n\n"
# 准备工具列表
role_tools = [t for t in TOOLS if t.name in role.get("tools", [])]
# 流式生成回复
full_content = ""
tool_calls = []
async for event in stream_role_response(role_id, request.topic, accumulated_context, role_tools):
if event["type"] == "tool_call_start":
yield f"data: {json.dumps({'type': 'tool_call_start', 'role_id': role_id, 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'arguments': event['arguments']}, ensure_ascii=False)}\n\n"
elif event["type"] == "tool_call_result":
yield f"data: {json.dumps({'type': 'tool_call_result', 'role_id': role_id, 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'result': event['result'], 'status': event['status'], 'execution_time': event['execution_time']}, ensure_ascii=False)}\n\n"
tool_calls.append(event)
elif event["type"] == "content_delta":
yield f"data: {json.dumps({'type': 'content_delta', 'role_id': role_id, 'content': event['content']}, ensure_ascii=False)}\n\n"
full_content += event["content"]
elif event["type"] == "content_done":
full_content = event["full_content"]
tool_calls = event.get("tool_calls", [])
elif event["type"] == "error":
yield f"data: {json.dumps({'type': 'error', 'role_id': role_id, 'error': event['error']}, ensure_ascii=False)}\n\n"
full_content = f"[{role['name']}暂时无法发言]"
# 构建完整消息
message = {
"role_id": role_id,
"role_name": role["name"],
"nickname": role["nickname"],
"avatar": role["avatar"],
"color": role["color"],
"content": full_content,
"tool_calls": tool_calls,
"timestamp": datetime.now().isoformat(),
"round_number": round_number
}
all_messages.append(message)
# 发送消息完成事件
yield f"data: {json.dumps({'type': 'message_complete', 'message': message}, ensure_ascii=False)}\n\n"
# 更新上下文
accumulated_context += f"\n\n【{role['name']}】:{full_content}"
await asyncio.sleep(0.3)
# 基金经理总结
fund_manager = MEETING_ROLES["fund_manager"]
yield f"data: {json.dumps({'type': 'speaking_start', 'role_id': 'fund_manager', 'role_name': fund_manager['name'], 'color': fund_manager['color']}, ensure_ascii=False)}\n\n"
fm_full_content = ""
fm_tool_calls = []
fm_tools = [t for t in TOOLS if t.name in fund_manager.get("tools", [])]
async for event in stream_role_response("fund_manager", request.topic, accumulated_context, fm_tools):
if event["type"] == "tool_call_start":
yield f"data: {json.dumps({'type': 'tool_call_start', 'role_id': 'fund_manager', 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'arguments': event['arguments']}, ensure_ascii=False)}\n\n"
elif event["type"] == "tool_call_result":
yield f"data: {json.dumps({'type': 'tool_call_result', 'role_id': 'fund_manager', 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'result': event['result'], 'status': event['status'], 'execution_time': event['execution_time']}, ensure_ascii=False)}\n\n"
fm_tool_calls.append(event)
elif event["type"] == "content_delta":
yield f"data: {json.dumps({'type': 'content_delta', 'role_id': 'fund_manager', 'content': event['content']}, ensure_ascii=False)}\n\n"
fm_full_content += event["content"]
elif event["type"] == "content_done":
fm_full_content = event["full_content"]
elif event["type"] == "error":
fm_full_content = "[基金经理暂时无法发言]"
fm_message = {
"role_id": "fund_manager",
"role_name": fund_manager["name"],
"nickname": fund_manager["nickname"],
"avatar": fund_manager["avatar"],
"color": fund_manager["color"],
"content": fm_full_content,
"tool_calls": fm_tool_calls,
"timestamp": datetime.now().isoformat(),
"round_number": round_number,
"is_conclusion": True
}
yield f"data: {json.dumps({'type': 'message_complete', 'message': fm_message}, ensure_ascii=False)}\n\n"
# 发送会议状态(不强制结束,用户可以继续)
yield f"data: {json.dumps({'type': 'round_end', 'round_number': round_number, 'can_continue': True}, ensure_ascii=False)}\n\n"
return StreamingResponse(
generate_meeting_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/agent/meeting/roles")
async def get_meeting_roles():
"""获取所有会议角色配置"""
return {
"success": True,
"roles": [
{
"id": role["id"],
"name": role["name"],
"nickname": role["nickname"],
"role_type": role["role_type"],
"avatar": role["avatar"],
"color": role["color"],
"description": role["description"],
"tools": role.get("tools", []),
}
for role in MEETING_ROLES.values()
]
}
# ==================== 健康检查 ====================
@app.get("/health")
async def health_check():
"""健康检查"""
# 检查各个后端服务的健康状态
services_status = {}
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news?query=test&top_k=1", timeout=5.0)
services_status["news_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["news_api"] = "unhealthy"
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/", timeout=5.0)
services_status["concept_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["concept_api"] = "unhealthy"
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/health", timeout=5.0)
services_status["stock_analysis_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["stock_analysis_api"] = "unhealthy"
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": services_status
}
# ==================== 错误处理 ====================
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP异常处理"""
return JSONResponse(
status_code=exc.status_code,
content={
"success": False,
"error": exc.detail,
"timestamp": datetime.now().isoformat()
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
logger.error(f"Unexpected error: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "Internal server error",
"detail": str(exc),
"timestamp": datetime.now().isoformat()
}
)
# ==================== 应用启动/关闭 ====================
@app.on_event("startup")
async def startup_event():
"""应用启动"""
logger.info("MCP Server starting up...")
logger.info(f"Registered {len(TOOLS)} tools")
# 初始化数据库连接池
try:
await db.get_pool()
logger.info("MySQL connection pool initialized")
except Exception as e:
logger.error(f"Failed to initialize MySQL pool: {e}")
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭"""
logger.info("MCP Server shutting down...")
await HTTP_CLIENT.aclose()
# 关闭数据库连接池
try:
await db.close_pool()
logger.info("MySQL connection pool closed")
except Exception as e:
logger.error(f"Failed to close MySQL pool: {e}")
# ==================== 主程序 ====================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"mcp_server:app",
host="0.0.0.0",
port=8900,
reload=True,
log_level="info"
)