Files
vf_react/mcp_server.py
2025-11-28 17:00:02 +08:00

2975 lines
108 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MCP Server for Financial Data Search
基于FastAPI的MCP服务端整合多个金融数据搜索API
支持LLM调用和Web聊天功能
"""
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Literal, AsyncGenerator
from datetime import datetime, date
import logging
import httpx
import time
from enum import Enum
import mcp_database as db
from openai import OpenAI
import json
import asyncio
import uuid
from mcp_elasticsearch import es_client
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建FastAPI应用
app = FastAPI(
title="Financial Data MCP Server",
description="Model Context Protocol server for financial data search and analysis",
version="1.0.0"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==================== 配置 ====================
class ServiceEndpoints:
"""API服务端点配置"""
NEWS_API = "http://222.128.1.157:21891" # 新闻API
ROADSHOW_API = "http://222.128.1.157:19800" # 路演API
CONCEPT_API = "http://222.128.1.157:16801" # 概念API本地
STOCK_ANALYSIS_API = "http://222.128.1.157:8811" # 涨停分析+研报API
MAIN_APP_API = "http://127.0.0.1:5001" # 主应用API自选股、自选事件等
# HTTP客户端配置
HTTP_CLIENT = httpx.AsyncClient(timeout=60.0)
# ==================== Agent系统配置 ====================
# ==================== 多模型配置 ====================
# 模型配置字典(支持动态切换)
MODEL_CONFIGS = {
"kimi-k2": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "moonshot-v1-8k", # 快速模型
},
"kimi-k2-thinking": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-k2-thinking", # 深度思考模型
},
"glm-4.6": {
"api_key": "", # 需要配置智谱AI密钥
"base_url": "https://open.bigmodel.cn/api/paas/v4",
"model": "glm-4",
},
"deepmoney": {
"api_key": "", # 空值
"base_url": "http://111.62.35.50:8000/v1",
"model": "deepmoney",
},
"gemini-3": {
"api_key": "", # 需要配置Google API密钥
"base_url": "https://generativelanguage.googleapis.com/v1",
"model": "gemini-pro",
},
}
# 保持向后兼容的配置(默认使用 kimi-k2-thinking
KIMI_CONFIG = MODEL_CONFIGS["kimi-k2-thinking"]
DEEPMONEY_CONFIG = MODEL_CONFIGS["deepmoney"]
# ==================== MCP协议数据模型 ====================
class ToolParameter(BaseModel):
"""工具参数定义"""
type: str
description: str
enum: Optional[List[str]] = None
default: Optional[Any] = None
class ToolDefinition(BaseModel):
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any] # 支持完整的 JSON Schema 格式
class ToolCallRequest(BaseModel):
"""工具调用请求"""
tool: str
arguments: Dict[str, Any] = {}
class ToolCallResponse(BaseModel):
"""工具调用响应"""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
# ==================== Agent系统数据模型 ====================
class ToolCall(BaseModel):
"""工具调用"""
tool: str
arguments: Dict[str, Any]
reason: str
class ExecutionPlan(BaseModel):
"""执行计划"""
goal: str
steps: List[ToolCall]
reasoning: str
class StepResult(BaseModel):
"""单步执行结果"""
step_index: int
tool: str
arguments: Dict[str, Any]
status: Literal["success", "failed", "skipped"]
result: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0
class AgentResponse(BaseModel):
"""Agent响应"""
success: bool
message: str
plan: Optional[ExecutionPlan] = None
step_results: List[StepResult] = []
final_summary: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class ConversationMessage(BaseModel):
"""对话历史消息"""
isUser: bool
content: str
class AgentChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_history: List[ConversationMessage] = []
user_id: Optional[str] = None # 用户ID
user_nickname: Optional[str] = None # 用户昵称
user_avatar: Optional[str] = None # 用户头像URL
subscription_type: Optional[str] = None # 用户订阅类型free/pro/max
session_id: Optional[str] = None # 会话ID如果为空则创建新会话
model: Optional[str] = "kimi-k2-thinking" # 选择的模型kimi-k2, kimi-k2-thinking, glm-4.6, deepmoney, gemini-3
tools: Optional[List[str]] = None # 选择的工具列表工具名称数组如果为None则使用全部工具
# ==================== MCP工具定义 ====================
TOOLS: List[ToolDefinition] = [
ToolDefinition(
name="search_news",
description="搜索全球新闻,支持关键词搜索和日期过滤。适用于查找国际新闻、行业动态等。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,例如:'人工智能''新能源汽车'"
},
"source": {
"type": "string",
"description": "新闻来源筛选,可选"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量默认20",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_china_news",
description="搜索中国新闻使用KNN语义搜索。支持精确匹配模式适合查找股票、公司相关新闻。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"exact_match": {
"type": "boolean",
"description": "是否精确匹配用于股票代码、公司名称等默认false",
"default": False
},
"source": {
"type": "string",
"description": "新闻来源筛选"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量默认20",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_medical_news",
description="搜索医疗健康类新闻,包括医药、医疗设备、生物技术等领域。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"source": {
"type": "string",
"description": "新闻来源"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"top_k": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_roadshows",
description="搜索上市公司路演、投资者交流活动记录。可按公司代码、日期范围搜索。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,可以是公司名称、主题等"
},
"company_code": {
"type": "string",
"description": "公司股票代码,例如:'600519.SH'"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
},
"size": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="search_concepts",
description="搜索股票概念板块,支持按涨跌幅、股票数量排序。返回概念详情及相关股票列表。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词,例如:'新能源''人工智能'"
},
"size": {
"type": "integer",
"description": "每页结果数量",
"default": 10
},
"page": {
"type": "integer",
"description": "页码",
"default": 1
},
"sort_by": {
"type": "string",
"description": "排序方式change_pct(涨跌幅), _score(相关度), stock_count(股票数), concept_name(名称)",
"enum": ["change_pct", "_score", "stock_count", "concept_name"],
"default": "change_pct"
},
"trade_date": {
"type": "string",
"description": "交易日期格式YYYY-MM-DD默认最新"
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_concept_details",
description="根据概念ID获取详细信息包括描述、相关股票、涨跌幅数据等。",
parameters={
"type": "object",
"properties": {
"concept_id": {
"type": "string",
"description": "概念ID"
},
"trade_date": {
"type": "string",
"description": "交易日期格式YYYY-MM-DD"
}
},
"required": ["concept_id"]
}
),
ToolDefinition(
name="get_stock_concepts",
description="查询指定股票的所有相关概念板块,包括涨跌幅信息。",
parameters={
"type": "object",
"properties": {
"stock_code": {
"type": "string",
"description": "股票代码或名称"
},
"size": {
"type": "integer",
"description": "返回概念数量",
"default": 50
},
"sort_by": {
"type": "string",
"description": "排序方式",
"enum": ["stock_count", "concept_name", "recent"],
"default": "stock_count"
},
"trade_date": {
"type": "string",
"description": "交易日期格式YYYY-MM-DD"
}
},
"required": ["stock_code"]
}
),
ToolDefinition(
name="get_concept_statistics",
description="获取概念板块统计数据,包括涨幅榜、跌幅榜、活跃榜、波动榜、连涨榜。",
parameters={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "统计天数与start_date/end_date互斥"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"min_stock_count": {
"type": "integer",
"description": "最少股票数量过滤",
"default": 3
}
},
"required": []
}
),
ToolDefinition(
name="search_limit_up_stocks",
description="搜索涨停股票,支持按日期、关键词、板块等条件搜索。包括混合语义搜索。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词(涨停原因、公司名称等)"
},
"date": {
"type": "string",
"description": "日期格式YYYYMMDD"
},
"mode": {
"type": "string",
"description": "搜索模式",
"enum": ["hybrid", "text", "vector"],
"default": "hybrid"
},
"sectors": {
"type": "array",
"items": {"type": "string"},
"description": "板块筛选"
},
"page_size": {
"type": "integer",
"description": "每页结果数",
"default": 20
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_daily_stock_analysis",
description="获取指定日期的涨停股票分析,包括板块分析、词云、趋势图表等。",
parameters={
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "日期格式YYYYMMDD"
}
},
"required": ["date"]
}
),
ToolDefinition(
name="search_research_reports",
description="搜索研究报告,支持文本和语义混合搜索。可按作者、证券、日期等筛选。",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"mode": {
"type": "string",
"description": "搜索模式",
"enum": ["hybrid", "text", "vector"],
"default": "hybrid"
},
"exact_match": {
"type": "string",
"description": "是否精确匹配0=模糊1=精确",
"enum": ["0", "1"],
"default": "0"
},
"security_code": {
"type": "string",
"description": "证券代码筛选"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"size": {
"type": "integer",
"description": "返回结果数量",
"default": 10
}
},
"required": ["query"]
}
),
ToolDefinition(
name="get_stock_basic_info",
description="获取股票基本信息,包括公司名称、行业、地址、主营业务、高管等基础数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码例如600519"
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_financial_index",
description="获取股票财务指标,包括每股收益、净资产收益率、营收增长率等关键财务数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数默认10",
"default": 10
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_trade_data",
description="获取股票交易数据,包括价格、成交量、涨跌幅、换手率等日线行情数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数默认30",
"default": 30
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_balance_sheet",
description="获取股票资产负债表,包括资产、负债、所有者权益等财务状况数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数默认8",
"default": 8
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="get_stock_cashflow",
description="获取股票现金流量表,包括经营、投资、筹资活动现金流数据。",
parameters={
"type": "object",
"properties": {
"seccode": {
"type": "string",
"description": "股票代码"
},
"start_date": {
"type": "string",
"description": "开始日期格式YYYY-MM-DD"
},
"end_date": {
"type": "string",
"description": "结束日期格式YYYY-MM-DD"
},
"limit": {
"type": "integer",
"description": "返回条数默认8",
"default": 8
}
},
"required": ["seccode"]
}
),
ToolDefinition(
name="search_stocks_by_criteria",
description="按条件搜索股票,支持按行业、地区、市值等条件筛选股票列表。",
parameters={
"type": "object",
"properties": {
"industry": {
"type": "string",
"description": "行业名称,支持模糊匹配"
},
"province": {
"type": "string",
"description": "省份名称"
},
"min_market_cap": {
"type": "number",
"description": "最小市值(亿元)"
},
"max_market_cap": {
"type": "number",
"description": "最大市值(亿元)"
},
"limit": {
"type": "integer",
"description": "返回条数默认50",
"default": 50
}
},
"required": []
}
),
ToolDefinition(
name="get_stock_comparison",
description="股票对比分析,支持多只股票的财务指标或交易数据对比。",
parameters={
"type": "object",
"properties": {
"seccodes": {
"type": "array",
"items": {"type": "string"},
"description": "股票代码列表至少2个"
},
"metric": {
"type": "string",
"description": "对比指标类型",
"enum": ["financial", "trade"],
"default": "financial"
}
},
"required": ["seccodes"]
}
),
ToolDefinition(
name="get_user_watchlist",
description="获取用户的自选股列表及实时行情数据。返回用户关注的股票及其当前价格、涨跌幅等信息。",
parameters={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户ID可选如果不提供则使用当前会话用户"
}
},
"required": []
}
),
ToolDefinition(
name="get_user_following_events",
description="获取用户关注的事件列表。返回用户关注的热点事件及其基本信息(标题、类型、热度、关注人数等)。",
parameters={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "用户ID可选如果不提供则使用当前会话用户"
}
},
"required": []
}
),
]
# ==================== MCP协议端点 ====================
@app.get("/")
async def root():
"""服务根端点"""
return {
"name": "Financial Data MCP Server",
"version": "1.0.0",
"protocol": "MCP",
"description": "Model Context Protocol server for financial data search and analysis"
}
@app.get("/tools")
async def list_tools():
"""列出所有可用工具"""
return {
"tools": [tool.dict() for tool in TOOLS]
}
@app.get("/tools/{tool_name}")
async def get_tool(tool_name: str):
"""获取特定工具的定义"""
tool = next((t for t in TOOLS if t.name == tool_name), None)
if not tool:
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
return tool.dict()
@app.post("/tools/call")
async def call_tool(request: ToolCallRequest):
"""调用工具"""
logger.info(f"Tool call: {request.tool} with args: {request.arguments}")
try:
# 路由到对应的工具处理函数
handler = TOOL_HANDLERS.get(request.tool)
if not handler:
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found")
result = await handler(request.arguments)
return ToolCallResponse(
success=True,
data=result,
metadata={
"tool": request.tool,
"timestamp": datetime.now().isoformat()
}
)
except Exception as e:
logger.error(f"Tool call error: {str(e)}", exc_info=True)
return ToolCallResponse(
success=False,
error=str(e),
metadata={
"tool": request.tool,
"timestamp": datetime.now().isoformat()
}
)
# ==================== 工具处理函数 ====================
async def handle_search_news(args: Dict[str, Any]) -> Any:
"""处理新闻搜索"""
params = {
"query": args.get("query"),
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 20)
}
# 移除None值
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_china_news(args: Dict[str, Any]) -> Any:
"""处理中国新闻搜索"""
params = {
"query": args.get("query"),
"exact_match": args.get("exact_match", False),
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 20)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_china_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_medical_news(args: Dict[str, Any]) -> Any:
"""处理医疗新闻搜索"""
params = {
"query": args["query"],
"source": args.get("source"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"top_k": args.get("top_k", 10)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_medical_news", params=params)
response.raise_for_status()
return response.json()
async def handle_search_roadshows(args: Dict[str, Any]) -> Any:
"""处理路演搜索"""
params = {
"query": args["query"],
"company_code": args.get("company_code"),
"start_date": args.get("start_date"),
"end_date": args.get("end_date"),
"size": args.get("size", 10)
}
params = {k: v for k, v in params.items() if v is not None}
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.ROADSHOW_API}/search", params=params)
response.raise_for_status()
return response.json()
async def handle_search_concepts(args: Dict[str, Any]) -> Any:
"""处理概念搜索"""
payload = {
"query": args["query"],
"size": args.get("size", 10),
"page": args.get("page", 1),
"search_size": 100,
"sort_by": args.get("sort_by", "change_pct"),
"use_knn": True
}
if args.get("trade_date"):
payload["trade_date"] = args["trade_date"]
response = await HTTP_CLIENT.post(f"{ServiceEndpoints.CONCEPT_API}/search", json=payload)
response.raise_for_status()
return response.json()
async def handle_get_concept_details(args: Dict[str, Any]) -> Any:
"""处理概念详情获取"""
concept_id = args["concept_id"]
params = {}
if args.get("trade_date"):
params["trade_date"] = args["trade_date"]
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.CONCEPT_API}/concept/{concept_id}",
params=params
)
response.raise_for_status()
return response.json()
async def handle_get_stock_concepts(args: Dict[str, Any]) -> Any:
"""处理股票概念获取"""
stock_code = args["stock_code"]
params = {
"size": args.get("size", 50),
"sort_by": args.get("sort_by", "stock_count"),
"include_description": True
}
if args.get("trade_date"):
params["trade_date"] = args["trade_date"]
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.CONCEPT_API}/stock/{stock_code}/concepts",
params=params
)
response.raise_for_status()
return response.json()
async def handle_get_concept_statistics(args: Dict[str, Any]) -> Any:
"""处理概念统计获取"""
params = {}
if args.get("days"):
params["days"] = args["days"]
if args.get("start_date"):
params["start_date"] = args["start_date"]
if args.get("end_date"):
params["end_date"] = args["end_date"]
if args.get("min_stock_count"):
params["min_stock_count"] = args["min_stock_count"]
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/statistics", params=params)
response.raise_for_status()
return response.json()
async def handle_search_limit_up_stocks(args: Dict[str, Any]) -> Any:
"""处理涨停股票搜索"""
payload = {
"query": args["query"],
"mode": args.get("mode", "hybrid"),
"page_size": args.get("page_size", 20)
}
if args.get("date"):
payload["date"] = args["date"]
if args.get("sectors"):
payload["sectors"] = args["sectors"]
response = await HTTP_CLIENT.post(
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/stocks/search/hybrid",
json=payload
)
response.raise_for_status()
return response.json()
async def handle_get_daily_stock_analysis(args: Dict[str, Any]) -> Any:
"""处理每日股票分析获取"""
date = args["date"]
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/analysis/daily/{date}"
)
response.raise_for_status()
return response.json()
async def handle_search_research_reports(args: Dict[str, Any]) -> Any:
"""处理研报搜索"""
params = {
"query": args["query"],
"mode": args.get("mode", "hybrid"),
"exact_match": args.get("exact_match", "0"),
"size": args.get("size", 10)
}
if args.get("security_code"):
params["security_code"] = args["security_code"]
if args.get("start_date"):
params["start_date"] = args["start_date"]
if args.get("end_date"):
params["end_date"] = args["end_date"]
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/search", params=params)
response.raise_for_status()
return response.json()
async def handle_get_stock_basic_info(args: Dict[str, Any]) -> Any:
"""处理股票基本信息查询"""
seccode = args["seccode"]
result = await db.get_stock_basic_info(seccode)
if result:
return {"success": True, "data": result}
else:
return {"success": False, "error": f"未找到股票代码 {seccode} 的信息"}
async def handle_get_stock_financial_index(args: Dict[str, Any]) -> Any:
"""处理股票财务指标查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 10)
result = await db.get_stock_financial_index(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_trade_data(args: Dict[str, Any]) -> Any:
"""处理股票交易数据查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 30)
result = await db.get_stock_trade_data(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_balance_sheet(args: Dict[str, Any]) -> Any:
"""处理资产负债表查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 8)
result = await db.get_stock_balance_sheet(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_cashflow(args: Dict[str, Any]) -> Any:
"""处理现金流量表查询"""
seccode = args["seccode"]
start_date = args.get("start_date")
end_date = args.get("end_date")
limit = args.get("limit", 8)
result = await db.get_stock_cashflow(seccode, start_date, end_date, limit)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_search_stocks_by_criteria(args: Dict[str, Any]) -> Any:
"""处理按条件搜索股票"""
industry = args.get("industry")
province = args.get("province")
min_market_cap = args.get("min_market_cap")
max_market_cap = args.get("max_market_cap")
limit = args.get("limit", 50)
result = await db.search_stocks_by_criteria(
industry, province, min_market_cap, max_market_cap, limit
)
return {
"success": True,
"data": result,
"count": len(result)
}
async def handle_get_stock_comparison(args: Dict[str, Any]) -> Any:
"""处理股票对比分析"""
seccodes = args["seccodes"]
metric = args.get("metric", "financial")
result = await db.get_stock_comparison(seccodes, metric)
return {
"success": True,
"data": result
}
async def handle_get_user_watchlist(args: Dict[str, Any]) -> Any:
"""获取用户自选股列表及实时行情"""
try:
# 从 agent 实例获取 cookies如果可用
cookies = getattr(agent, 'cookies', {})
# 调用主应用的自选股API
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.MAIN_APP_API}/api/account/watchlist/realtime",
headers={
"Content-Type": "application/json"
},
cookies=cookies # 传递用户的 session cookie
)
if response.status_code == 200:
data = response.json()
logger.info(f"[Watchlist] 成功获取 {len(data.get('data', []))} 只自选股")
return data
elif response.status_code == 401:
logger.warning("[Watchlist] 未登录或会话已过期")
return {
"success": False,
"error": "未登录或会话已过期",
"data": []
}
else:
logger.error(f"[Watchlist] 获取失败: {response.status_code}")
return {
"success": False,
"error": f"获取自选股失败: {response.status_code}",
"data": []
}
except Exception as e:
logger.error(f"[Watchlist] 获取用户自选股失败: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"data": []
}
async def handle_get_user_following_events(args: Dict[str, Any]) -> Any:
"""获取用户关注的事件列表"""
try:
# 从 agent 实例获取 cookies如果可用
cookies = getattr(agent, 'cookies', {})
# 调用主应用的关注事件API
response = await HTTP_CLIENT.get(
f"{ServiceEndpoints.MAIN_APP_API}/api/account/events/following",
headers={
"Content-Type": "application/json"
},
cookies=cookies # 传递用户的 session cookie
)
if response.status_code == 200:
data = response.json()
logger.info(f"[FollowingEvents] 成功获取 {len(data.get('data', []))} 个关注事件")
return data
elif response.status_code == 401:
logger.warning("[FollowingEvents] 未登录或会话已过期")
return {
"success": False,
"error": "未登录或会话已过期",
"data": []
}
else:
logger.error(f"[FollowingEvents] 获取失败: {response.status_code}")
return {
"success": False,
"error": f"获取关注事件失败: {response.status_code}",
"data": []
}
except Exception as e:
logger.error(f"[FollowingEvents] 获取用户关注事件失败: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"data": []
}
# 工具处理函数映射
TOOL_HANDLERS = {
"search_news": handle_search_news,
"search_china_news": handle_search_china_news,
"search_medical_news": handle_search_medical_news,
"search_roadshows": handle_search_roadshows,
"search_concepts": handle_search_concepts,
"get_concept_details": handle_get_concept_details,
"get_stock_concepts": handle_get_stock_concepts,
"get_concept_statistics": handle_get_concept_statistics,
"search_limit_up_stocks": handle_search_limit_up_stocks,
"get_daily_stock_analysis": handle_get_daily_stock_analysis,
"search_research_reports": handle_search_research_reports,
"get_stock_basic_info": handle_get_stock_basic_info,
"get_stock_financial_index": handle_get_stock_financial_index,
"get_stock_trade_data": handle_get_stock_trade_data,
"get_stock_balance_sheet": handle_get_stock_balance_sheet,
"get_stock_cashflow": handle_get_stock_cashflow,
"search_stocks_by_criteria": handle_search_stocks_by_criteria,
"get_stock_comparison": handle_get_stock_comparison,
"get_user_watchlist": handle_get_user_watchlist,
"get_user_following_events": handle_get_user_following_events,
}
# ==================== Agent系统实现 ====================
class MCPAgentIntegrated:
"""集成版 MCP Agent - 使用 Kimi 和 DeepMoney"""
def __init__(self):
# 初始化 Kimi 客户端(计划制定)
self.kimi_client = OpenAI(
api_key=KIMI_CONFIG["api_key"],
base_url=KIMI_CONFIG["base_url"],
)
self.kimi_model = KIMI_CONFIG["model"]
# 初始化 DeepMoney 客户端(新闻总结)
self.deepmoney_client = OpenAI(
api_key=DEEPMONEY_CONFIG["api_key"],
base_url=DEEPMONEY_CONFIG["base_url"],
)
self.deepmoney_model = DEEPMONEY_CONFIG["model"]
def get_planning_prompt(self, tools: List[dict]) -> str:
"""获取计划制定的系统提示词"""
tools_desc = "\n\n".join([
f"**{tool['name']}**\n"
f"描述:{tool['description']}\n"
f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
for tool in tools
])
return f"""你是"价小前"北京价值前沿科技公司的AI投研聊天助手。
## 你的人格特征
- **名字**: 价小前
- **身份**: 北京价值前沿科技公司的专业AI投研助手
- **专业领域**: 股票投资研究、市场分析、新闻解读、财务分析
- **性格**: 专业、严谨、友好,擅长用简洁的语言解释复杂的金融概念
- **服务宗旨**: 帮助投资者做出更明智的投资决策,提供数据驱动的研究支持
## 可用工具
{tools_desc}
## 特殊工具
- **summarize_news**: 使用 DeepMoney 模型总结新闻数据
- 参数: {{"data": "新闻列表JSON", "focus": "关注点"}}
- 适用场景: 当需要总结新闻、研报等文本数据时
## 重要知识
- 贵州茅台: 600519
- 涨停: 涨幅约10%
- 概念板块: 相同题材股票分类
## 任务
分析用户问题,制定详细的执行计划。返回 JSON
```json
{{
"goal": "用户目标",
"reasoning": "分析思路",
"steps": [
{{
"tool": "工具名",
"arguments": {{"参数": ""}},
"reason": "原因"
}}
]
}}
```
## 规划原则
1. **先收集数据,再分析总结**
2. **使用 summarize_news 总结新闻类数据**
3. **根据问题复杂度灵活规划步骤数**
- 简单问题如查询单只股票2-3 步
- 中等复杂度如对比分析3-5 步
- 复杂问题如多维度深度分析5-8 步
- 避免过度拆分简单任务
4. **每个步骤应有明确目的,避免冗余**
5. **最后通常需要总结步骤**(除非用户只要原始数据)
## 示例
**示例 1: 简单查询2 步)**
用户:"贵州茅台最近有什么新闻"
```json
{{
"goal": "查询并总结贵州茅台最新新闻",
"reasoning": "简单的新闻查询,只需搜索和总结两步",
"steps": [
{{"tool": "search_china_news", "arguments": {{"query": "贵州茅台", "top_k": 10}}, "reason": "搜索新闻"}},
{{"tool": "summarize_news", "arguments": {{"data": "新闻数据", "focus": "重要动态"}}, "reason": "总结要点"}}
]
}}
```
**示例 2: 中等复杂度4 步)**
用户:"对比分析贵州茅台和五粮液的投资价值"
```json
{{
"goal": "对比分析两只股票的投资价值",
"reasoning": "需要分别获取两只股票的数据,然后对比分析",
"steps": [
{{"tool": "get_stock_info", "arguments": {{"stock_code": "600519"}}, "reason": "获取茅台数据"}},
{{"tool": "get_stock_info", "arguments": {{"stock_code": "000858"}}, "reason": "获取五粮液数据"}},
{{"tool": "search_china_news", "arguments": {{"query": "茅台 五粮液 对比", "top_k": 5}}, "reason": "搜索对比分析文章"}},
{{"tool": "summarize_news", "arguments": {{"data": "新闻", "focus": "投资价值对比"}}, "reason": "总结对比结论"}}
]
}}
```
**示例 3: 复杂分析6 步)**
用户:"全面分析人工智能概念板块的投资机会"
```json
{{
"goal": "深度分析人工智能板块的投资机会",
"reasoning": "需要获取板块数据、龙头股、资金流向、新闻动态等多维度信息",
"steps": [
{{"tool": "get_concept_stocks", "arguments": {{"concept": "人工智能"}}, "reason": "获取概念成分股"}},
{{"tool": "get_concept_money_flow", "arguments": {{"concept": "人工智能"}}, "reason": "获取资金流向"}},
{{"tool": "get_limit_up_stocks", "arguments": {{"concept": "人工智能"}}, "reason": "查看涨停股情况"}},
{{"tool": "search_china_news", "arguments": {{"query": "人工智能概念股", "top_k": 15}}, "reason": "搜索最新新闻"}},
{{"tool": "get_stock_info", "arguments": {{"stock_code": "300496"}}, "reason": "分析龙头股中科创达"}},
{{"tool": "summarize_news", "arguments": {{"data": "所有数据", "focus": "投资机会和风险"}}, "reason": "综合分析总结"}}
]
}}
```
**重要提示**
- 简单问题不要硬凑步骤2-3 步足够
- 复杂问题可以拆分到 6-8 步,但每步必须有实际价值
- 避免重复调用相同工具(除非参数不同)
只返回JSON不要其他内容。"""
async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan:
"""阶段1: 使用 Kimi 创建执行计划(带思考过程)"""
logger.info(f"[Planning] Kimi开始制定计划: {user_query}")
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "user", "content": user_query},
]
# 使用 Kimi 思考模型
response = self.kimi_client.chat.completions.create(
model=self.kimi_model,
messages=messages,
temperature=1.0, # Kimi 推荐
max_tokens=16000, # 足够容纳 reasoning_content
)
choice = response.choices[0]
message = choice.message
# 提取思考过程
reasoning_content = ""
if hasattr(message, "reasoning_content"):
reasoning_content = getattr(message, "reasoning_content")
logger.info(f"[Planning] Kimi思考过程: {reasoning_content[:200]}...")
# 提取计划内容
plan_json = message.content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)}")
return plan
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
tool_handlers: Dict[str, Any],
) -> Dict[str, Any]:
"""执行单个工具"""
# 特殊工具summarize_news使用 DeepMoney
if tool_name == "summarize_news":
return await self.summarize_news_with_deepmoney(
data=arguments.get("data", ""),
focus=arguments.get("focus", "关键信息"),
)
# 调用 MCP 工具
handler = tool_handlers.get(tool_name)
if not handler:
raise ValueError(f"Tool '{tool_name}' not found")
result = await handler(arguments)
return result
async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str:
"""使用 DeepMoney 模型总结新闻"""
logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}")
messages = [
{
"role": "system",
"content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。"
},
{
"role": "user",
"content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}"
},
]
try:
response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=1000,
)
summary = response.choices[0].message.content
logger.info(f"[DeepMoney] 总结完成")
return summary
except Exception as e:
logger.error(f"[DeepMoney] 总结失败: {str(e)}")
# 降级:返回简化摘要
return f"新闻总结失败,原始数据:{data[:500]}..."
async def execute_plan(
self,
plan: ExecutionPlan,
tool_handlers: Dict[str, Any],
) -> List[StepResult]:
"""阶段2: 执行计划"""
logger.info(f"[Execution] 开始执行: {len(plan.steps)}")
results = []
collected_data = {}
for i, step in enumerate(plan.steps):
logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}")
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
# 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据"
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
# 将收集的数据传递
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
results.append(step_result)
# 收集数据
collected_data[f"step_{i+1}_{step.tool}"] = result
logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s")
except Exception as e:
logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}")
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
results.append(step_result)
# 继续执行其他步骤
continue
logger.info(f"[Execution] 执行完成")
return results
async def generate_final_summary(
self,
user_query: str,
plan: ExecutionPlan,
step_results: List[StepResult],
) -> str:
"""阶段3: 使用 Kimi 生成最终总结"""
logger.info("[Summary] Kimi生成最终总结")
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
if not successful_results:
return "很抱歉,所有步骤都执行失败,无法生成分析报告。"
# 构建结果文本(精简版)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {str(r.result)[:800]}..."
for r in successful_results[:3] # 只取前3个避免超长
])
messages = [
{
"role": "system",
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
## 数据可视化能力
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
支持的图表类型:
- 折线图line适合时间序列数据如股价走势、财务指标趋势
- 柱状图bar适合对比数据如不同年份的收入、利润对比
- 饼图pie适合占比数据如业务结构、资产分布
### 图表格式(使用 Markdown 代码块)
在报告中插入图表时,使用以下格式:
```echarts
{
"title": {"text": "图表标题"},
"tooltip": {},
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
"yAxis": {"type": "value"},
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
}
```
### 示例
如果有股价数据,可以这样呈现:
**股价走势分析**
近30日股价呈现上涨趋势最高达到1850元。
```echarts
{
"title": {"text": "近30日股价走势", "left": "center"},
"tooltip": {"trigger": "axis"},
"xAxis": {"type": "category", "data": ["2024-01-01", "2024-01-02", "2024-01-03"]},
"yAxis": {"type": "value", "name": "股价(元)"},
"series": [{"name": "收盘价", "type": "line", "data": [1800, 1820, 1850], "smooth": true}]
}
```
**重要提示**
- ECharts 配置必须是合法的 JSON 格式
- 只在有明确数值数据时才生成图表
- 不要凭空捏造数据"""
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告500字以内。如果结果中包含数值型数据请使用 ECharts 图表进行可视化展示。"""
},
]
try:
response = self.kimi_client.chat.completions.create(
model="kimi-k2-turbo-preview", # 使用非思考模型,更快
messages=messages,
temperature=0.7,
max_tokens=2000, # 增加 token 限制以支持图表配置
)
summary = response.choices[0].message.content
logger.info("[Summary] 总结完成")
return summary
except Exception as e:
logger.error(f"[Summary] 总结失败: {str(e)}")
# 降级:返回最后一步的结果
if successful_results:
last_result = successful_results[-1]
if isinstance(last_result.result, str):
return last_result.result
else:
return json.dumps(last_result.result, ensure_ascii=False, indent=2)
return "总结生成失败"
async def process_query(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
) -> AgentResponse:
"""主流程(非流式)"""
logger.info(f"[Agent] 处理查询: {user_query}")
try:
# 阶段1: Kimi 制定计划
plan = await self.create_plan(user_query, tools)
# 阶段2: 执行工具
step_results = await self.execute_plan(plan, tool_handlers)
# 阶段3: Kimi 生成总结
final_summary = await self.generate_final_summary(
user_query, plan, step_results
)
return AgentResponse(
success=True,
message=final_summary,
plan=plan,
step_results=step_results,
final_summary=final_summary,
metadata={
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
"model_used": {
"planning": self.kimi_model,
"summarization": "kimi-k2-turbo-preview",
"news_summary": self.deepmoney_model,
},
},
)
except Exception as e:
logger.error(f"[Agent] 错误: {str(e)}", exc_info=True)
return AgentResponse(
success=False,
message=f"处理失败: {str(e)}",
)
async def process_query_stream(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
session_id: str = None,
user_id: str = None,
user_nickname: str = None,
user_avatar: str = None,
cookies: dict = None,
model_config: dict = None, # 新增:动态模型配置
) -> AsyncGenerator[str, None]:
"""主流程(流式输出)- 逐步返回执行结果"""
logger.info(f"[Agent Stream] 处理查询: {user_query}")
# 将 cookies 存储为实例属性,供工具调用时使用
self.cookies = cookies or {}
# 如果传入了自定义模型配置,使用自定义配置,否则使用默认的 Kimi
if model_config:
planning_client = OpenAI(
api_key=model_config["api_key"],
base_url=model_config["base_url"],
)
planning_model = model_config["model"]
logger.info(f"[Agent Stream] 使用自定义模型: {planning_model}")
else:
planning_client = self.kimi_client
planning_model = self.kimi_model
logger.info(f"[Agent Stream] 使用默认模型: {planning_model}")
try:
# 发送开始事件
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
# 阶段1: 使用选中的模型制定计划(流式,带 DeepMoney 备选)
yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."})
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "user", "content": user_query},
]
reasoning_content = ""
plan_content = ""
use_fallback = False
try:
# 尝试使用选中的模型流式 API
stream = planning_client.chat.completions.create(
model=planning_model,
messages=messages,
temperature=1.0,
max_tokens=16000,
stream=True, # 启用流式输出
)
# 逐块接收 Kimi 的响应
for chunk in stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
plan_content += content_chunk
# 发送思考过程片段
yield self._format_sse("thinking", {
"content": content_chunk,
"stage": "planning"
})
# 提取 reasoning_content如果有
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'reasoning_content'):
reasoning_chunk = chunk.choices[0].delta.reasoning_content
if reasoning_chunk:
reasoning_content += reasoning_chunk
# 发送推理过程片段
yield self._format_sse("reasoning", {
"content": reasoning_chunk
})
except Exception as kimi_error:
# 检查是否是内容风控错误400
error_str = str(kimi_error)
if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str):
logger.warning(f"[Planning] Kimi 内容风控拒绝,切换到 DeepMoney: {error_str}")
use_fallback = True
yield self._format_sse("status", {
"stage": "planning",
"message": "切换到备用模型制定计划..."
})
try:
# 使用 DeepMoney 备选方案(非流式,因为 DeepMoney 可能不支持流式)
fallback_response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=16000,
)
plan_content = fallback_response.choices[0].message.content
# 发送完整的计划内容(一次性)
yield self._format_sse("thinking", {
"content": plan_content,
"stage": "planning"
})
logger.info(f"[Planning] DeepMoney 备选方案成功")
except Exception as fallback_error:
logger.error(f"[Planning] DeepMoney 备选方案也失败: {fallback_error}")
raise Exception(f"Kimi 和 DeepMoney 都无法生成计划: {kimi_error}, {fallback_error}")
else:
# 不是内容风控错误,直接抛出
logger.error(f"[Planning] Kimi 调用失败(非风控原因): {kimi_error}")
raise
# 解析完整的计划
plan_json = plan_content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)}")
# 发送完整计划
yield self._format_sse("plan", {
"goal": plan.goal,
"reasoning": plan.reasoning,
"steps": [
{"tool": step.tool, "arguments": step.arguments, "reason": step.reason}
for step in plan.steps
],
})
# 阶段2: 执行工具(逐步返回)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"})
step_results = []
collected_data = {}
for i, step in enumerate(plan.steps):
# 发送步骤开始事件
yield self._format_sse("step_start", {
"step_index": i,
"tool": step.tool,
"arguments": step.arguments,
"reason": step.reason,
})
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
step_results.append(step_result)
collected_data[f"step_{i+1}_{step.tool}"] = result
# 发送步骤完成事件(包含结果)
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"status": "success",
"result": result,
"execution_time": execution_time,
})
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
step_results.append(step_result)
# 发送步骤失败事件
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"status": "failed",
"error": str(e),
"execution_time": execution_time,
})
# 阶段3: Kimi 生成总结(流式)
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
if not successful_results:
yield self._format_sse("summary", {
"content": "很抱歉,所有步骤都执行失败,无法生成分析报告。",
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": 0,
"failed_steps": len(step_results),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
else:
# 构建结果文本(精简版)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {str(r.result)[:800]}..."
for r in successful_results[:3] # 只取前3个避免超长
])
messages = [
{
"role": "system",
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
## 数据可视化能力
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
支持的图表类型:
- 折线图line适合时间序列数据如股价走势、财务指标趋势
- 柱状图bar适合对比数据如不同年份的收入、利润对比
- 饼图pie适合占比数据如业务结构、资产分布
### 图表格式(使用 Markdown 代码块)
在报告中插入图表时,使用以下格式:
```echarts
{
"title": {"text": "图表标题"},
"tooltip": {},
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
"yAxis": {"type": "value"},
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
}
```
**重要提示**
- ECharts 配置必须是合法的 JSON 格式
- 只在有明确数值数据时才生成图表
- 不要凭空捏造数据"""
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告500字以内。如果结果中包含数值型数据请使用 ECharts 图表进行可视化展示。"""
},
]
# 使用流式 API 生成总结(带 DeepMoney 备选)
final_summary = ""
try:
summary_stream = self.kimi_client.chat.completions.create(
model="kimi-k2-turbo-preview",
messages=messages,
temperature=0.7,
max_tokens=2000,
stream=True, # 启用流式输出
)
# 逐块发送总结内容
for chunk in summary_stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
final_summary += content_chunk
# 发送总结片段
yield self._format_sse("summary_chunk", {
"content": content_chunk
})
logger.info("[Summary] 流式总结完成")
except Exception as kimi_error:
# 检查是否是内容风控错误400
error_str = str(kimi_error)
if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str):
logger.warning(f"[Summary] Kimi 内容风控拒绝,切换到 DeepMoney: {error_str}")
yield self._format_sse("status", {
"stage": "summarizing",
"message": "切换到备用模型生成总结..."
})
try:
# 使用 DeepMoney 备选方案(非流式)
fallback_response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=2000,
)
final_summary = fallback_response.choices[0].message.content
# 发送完整的总结内容(一次性)
yield self._format_sse("summary_chunk", {
"content": final_summary
})
logger.info(f"[Summary] DeepMoney 备选方案成功")
except Exception as fallback_error:
logger.error(f"[Summary] DeepMoney 备选方案也失败: {fallback_error}")
# 使用降级方案:简单拼接执行结果
final_summary = f"执行了 {len(plan.steps)} 个步骤,其中 {len(successful_results)} 个成功。\n\n执行结果:\n{results_text[:500]}..."
yield self._format_sse("summary_chunk", {
"content": final_summary
})
logger.warning("[Summary] 使用降级方案(简单拼接)")
else:
# 不是内容风控错误,直接抛出
logger.error(f"[Summary] Kimi 调用失败(非风控原因): {kimi_error}")
raise
# 发送完整的总结和元数据
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": len(successful_results),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
# 保存 Agent 回复到 ES如果提供了 session_id
if session_id and user_id:
try:
# 将执行步骤转换为 JSON 字符串
steps_json = json.dumps(
[{"tool": step.tool, "status": step.status, "result": step.result} for step in step_results],
ensure_ascii=False
)
# 将 plan 转换为 JSON 字符串ES 中 plan 字段是 text 类型)
plan_json = json.dumps({
"goal": plan.goal,
"reasoning": plan.reasoning,
"steps": [{"tool": step.tool, "arguments": step.arguments, "reason": step.reason} for step in plan.steps]
}, ensure_ascii=False)
es_client.save_chat_message(
session_id=session_id,
user_id=user_id,
user_nickname=user_nickname or "匿名用户",
user_avatar=user_avatar or "",
message_type="assistant",
message=final_summary,
plan=plan_json,
steps=steps_json,
)
logger.info(f"[ES] Agent 回复已保存到会话 {session_id}")
except Exception as e:
logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True)
# 发送完成事件
yield self._format_sse("done", {"message": "处理完成"})
except Exception as e:
logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)
yield self._format_sse("error", {"message": f"处理失败: {str(e)}"})
def _format_sse(self, event: str, data: dict) -> str:
"""格式化 SSE 消息"""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
# 创建 Agent 实例(全局)
agent = MCPAgentIntegrated()
# ==================== Web聊天接口 ====================
class ChatMessage(BaseModel):
"""聊天消息"""
role: Literal["user", "assistant", "system"]
content: str
class ChatRequest(BaseModel):
"""聊天请求"""
messages: List[ChatMessage]
stream: bool = False
@app.post("/chat")
async def chat(request: ChatRequest):
"""
Web聊天接口
这是一个简化的接口实际应该集成LLM API如OpenAI、Claude等
这里只是演示如何使用工具
"""
# TODO: 集成实际的LLM API
# 1. 将消息发送给LLM
# 2. LLM返回需要调用的工具
# 3. 调用工具并获取结果
# 4. 将工具结果返回给LLM
# 5. LLM生成最终回复
return {
"message": "Chat endpoint placeholder - integrate with your LLM provider",
"available_tools": len(TOOLS),
"hint": "Use POST /tools/call to invoke tools"
}
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(request: AgentChatRequest):
"""智能代理对话端点(非流式)"""
logger.info(f"Agent chat: {request.message} (user: {request.user_id})")
# ==================== 权限检查 ====================
# 订阅等级判断函数(与 app.py 保持一致)
def subscription_level(sub_type: str) -> int:
"""将订阅类型映射到等级数值free=0, pro=1, max=2"""
mapping = {'free': 0, 'pro': 1, 'max': 2}
return mapping.get((sub_type or 'free').lower(), 0)
# 获取用户订阅类型(默认为 free
user_subscription = (request.subscription_type or 'free').lower()
required_level = 'max'
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
if not has_access:
logger.warning(
f"权限检查失败 - user_id: {request.user_id}, "
f"nickname: {request.user_nickname}, "
f"subscription_type: {user_subscription}, "
f"required: {required_level}"
)
raise HTTPException(
status_code=403,
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
)
logger.info(
f"权限检查通过 - user_id: {request.user_id}, "
f"nickname: {request.user_nickname}, "
f"subscription_type: {user_subscription}"
)
# ==================== 会话管理 ====================
# 如果没有提供 session_id创建新会话
session_id = request.session_id or str(uuid.uuid4())
# 保存用户消息到 ES
try:
es_client.save_chat_message(
session_id=session_id,
user_id=request.user_id or "anonymous",
user_nickname=request.user_nickname or "匿名用户",
user_avatar=request.user_avatar or "",
message_type="user",
message=request.message,
)
except Exception as e:
logger.error(f"保存用户消息失败: {e}")
# 获取工具列表
tools = [tool.dict() for tool in TOOLS]
# 添加特殊工具summarize_news
tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据JSON格式"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响''投资机会'"
}
},
"required": ["data"]
}
})
# 处理查询
response = await agent.process_query(
user_query=request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
)
# 保存 Agent 回复到 ES
try:
# 将执行步骤转换为JSON字符串
steps_json = json.dumps(
[{"tool": step.tool, "status": step.status, "result": step.result} for step in response.step_results],
ensure_ascii=False
)
# 将 plan 转换为 JSON 字符串ES 中 plan 字段是 text 类型)
plan_json = json.dumps(response.plan.dict(), ensure_ascii=False) if response.plan else None
es_client.save_chat_message(
session_id=session_id,
user_id=request.user_id or "anonymous",
user_nickname=request.user_nickname or "匿名用户",
user_avatar=request.user_avatar or "",
message_type="assistant",
message=response.final_summary, # 使用 final_summary 而不是 final_answer
plan=plan_json, # 传递 JSON 字符串而不是字典
steps=steps_json,
)
except Exception as e:
logger.error(f"保存 Agent 回复失败: {e}", exc_info=True)
# 在响应中返回 session_id
response_dict = response.dict()
response_dict["session_id"] = session_id
return response_dict
@app.post("/agent/chat/stream")
async def agent_chat_stream(chat_request: AgentChatRequest, request: Request):
"""智能代理对话端点(流式 SSE"""
logger.info(f"Agent chat stream: {chat_request.message}")
# 获取请求的 cookies用于转发到需要认证的 API
cookies = request.cookies
# ==================== 权限检查 ====================
# 订阅等级判断函数(与 app.py 保持一致)
def subscription_level(sub_type: str) -> int:
"""将订阅类型映射到等级数值free=0, pro=1, max=2"""
mapping = {'free': 0, 'pro': 1, 'max': 2}
return mapping.get((sub_type or 'free').lower(), 0)
# 获取用户订阅类型(默认为 free
user_subscription = (chat_request.subscription_type or 'free').lower()
required_level = 'max'
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
if not has_access:
logger.warning(
f"[Stream] 权限检查失败 - user_id: {chat_request.user_id}, "
f"nickname: {chat_request.user_nickname}, "
f"subscription_type: {user_subscription}, "
f"required: {required_level}"
)
raise HTTPException(
status_code=403,
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
)
logger.info(
f"[Stream] 权限检查通过 - user_id: {chat_request.user_id}, "
f"nickname: {chat_request.user_nickname}, "
f"subscription_type: {user_subscription}"
)
# 如果没有提供 session_id创建新会话
session_id = chat_request.session_id or str(uuid.uuid4())
# 保存用户消息到 ES
try:
es_client.save_chat_message(
session_id=session_id,
user_id=chat_request.user_id or "anonymous",
user_nickname=chat_request.user_nickname or "匿名用户",
user_avatar=chat_request.user_avatar or "",
message_type="user",
message=chat_request.message,
)
logger.info(f"[ES] 用户消息已保存到会话 {session_id}")
except Exception as e:
logger.error(f"[ES] 保存用户消息失败: {e}")
# ==================== 动态工具过滤 ====================
# 获取所有可用工具
all_tools = [tool.dict() for tool in TOOLS]
# 添加特殊工具summarize_news
all_tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据JSON格式"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响''投资机会'"
}
},
"required": ["data"]
}
})
# 如果用户指定了工具列表,则进行过滤
if chat_request.tools is not None and len(chat_request.tools) > 0:
selected_tool_names = set(chat_request.tools)
tools = [tool for tool in all_tools if tool["name"] in selected_tool_names]
logger.info(f"[工具过滤] 用户选择了 {len(tools)}/{len(all_tools)} 个工具: {selected_tool_names}")
else:
# 默认使用全部工具
tools = all_tools
logger.info(f"[工具过滤] 使用全部 {len(tools)} 个工具")
# ==================== 动态模型选择 ====================
selected_model = chat_request.model or "kimi-k2-thinking"
model_config = MODEL_CONFIGS.get(selected_model, MODEL_CONFIGS["kimi-k2-thinking"])
logger.info(f"[模型选择] 使用模型: {selected_model} ({model_config['model']})")
# 返回流式响应
return StreamingResponse(
agent.process_query_stream(
user_query=chat_request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
session_id=session_id,
user_id=chat_request.user_id,
user_nickname=chat_request.user_nickname,
user_avatar=chat_request.user_avatar,
cookies=cookies, # 传递 cookies 用于认证 API 调用
model_config=model_config, # 传递选中的模型配置
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
# ==================== 聊天记录管理 API ====================
@app.get("/agent/sessions")
async def get_chat_sessions(user_id: str, limit: int = 50):
"""
获取用户的聊天会话列表
Args:
user_id: 用户ID
limit: 返回数量默认50
Returns:
会话列表
"""
try:
sessions = es_client.get_chat_sessions(user_id, limit)
return {
"success": True,
"data": sessions,
"count": len(sessions)
}
except Exception as e:
logger.error(f"获取会话列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/agent/history/{session_id}")
async def get_chat_history(session_id: str, limit: int = 100):
"""
获取指定会话的聊天历史
Args:
session_id: 会话ID
limit: 返回数量默认100
Returns:
聊天记录列表
"""
try:
messages = es_client.get_chat_history(session_id, limit)
return {
"success": True,
"data": messages,
"count": len(messages)
}
except Exception as e:
logger.error(f"获取聊天历史失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/agent/search")
async def search_chat_history(user_id: str, query: str, top_k: int = 10):
"""
向量搜索聊天历史
Args:
user_id: 用户ID
query: 查询文本
top_k: 返回数量默认10
Returns:
相关聊天记录列表
"""
try:
results = es_client.search_chat_history(user_id, query, top_k)
return {
"success": True,
"data": results,
"count": len(results)
}
except Exception as e:
logger.error(f"向量搜索失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== 投研会议室系统 (V2 - 流式+工具调用) ====================
import random
# 投研会议室专用模型配置
MEETING_MODEL_CONFIGS = {
"kimi-k2-thinking": {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-k2-thinking",
},
"deepseek": {
"api_key": "sk-7363bdb28d7d4bf0aa68eb9449f8f063",
"base_url": "https://api.deepseek.com",
"model": "deepseek-chat",
},
"deepmoney": {
"api_key": "",
"base_url": "http://111.62.35.50:8000/v1",
"model": "deepmoney",
},
}
def clean_deepseek_tool_markers(content: str) -> str:
"""
清理 DeepSeek 模型输出中的工具调用标记
DeepSeek 有时会以文本形式输出工具调用,格式如:
<tool▁calls▁begin><tool▁call▁begin>tool_name<tool▁sep>{"args": "value"}<tool▁call▁end><tool▁calls▁end>
"""
import re
if not content:
return content
# 清理 DeepSeek 工具调用标记
# 匹配 <tool▁calls▁begin> ... <tool▁calls▁end> 整个块
pattern = r'<tool▁calls▁begin>.*?<tool▁calls▁end>'
cleaned = re.sub(pattern, '', content, flags=re.DOTALL)
# 也清理可能残留的单个标记
markers = [
'<tool▁calls▁begin>',
'<tool▁calls▁end>',
'<tool▁call▁begin>',
'<tool▁call▁end>',
'<tool▁sep>',
]
for marker in markers:
cleaned = cleaned.replace(marker, '')
return cleaned.strip()
# 每个角色可用的工具列表
ROLE_TOOLS = {
"buffett": ["search_china_news", "search_research_reports", "get_stock_basic_info", "get_stock_financial_index"],
"big_short": ["search_china_news", "get_stock_financial_index", "get_stock_balance_sheet", "get_stock_cashflow"],
"simons": ["get_stock_trade_data", "search_limit_up_stocks", "get_concept_statistics"],
"leek": [], # 韭菜不用工具
"fund_manager": ["search_china_news", "search_research_reports", "get_stock_basic_info"],
}
# 投研会议室角色配置
MEETING_ROLES = {
"buffett": {
"id": "buffett",
"name": "巴菲特",
"nickname": "唱多者",
"role_type": "bull",
"avatar": "/images/agent/巴菲特.png",
"model": "kimi-k2-thinking",
"color": "#10B981",
"description": "主观多头,善于分析事件的潜在利好和长期价值",
"tools": ROLE_TOOLS["buffett"],
"system_prompt": """你是"巴菲特",一位资深的价值投资者和主观多头分析师。
你的特点:
1. 善于发现事件和公司的潜在利好因素
2. 关注长期价值,分析护城河、竞争优势
3. 对市场保持乐观但理性的态度
你可以使用以下工具获取数据:
- search_china_news: 搜索新闻
- search_research_reports: 搜索研报
- get_stock_basic_info: 获取股票基本信息
- get_stock_financial_index: 获取财务指标
分析时请先调用工具获取数据,再基于数据发表看多观点。
注意参考前面其他人的发言进行有针对性的回应。发言控制在200字以内。"""
},
"big_short": {
"id": "big_short",
"name": "大空头",
"nickname": "大空头",
"role_type": "bear",
"avatar": "/images/agent/大空头.png",
"model": "kimi-k2-thinking",
"color": "#EF4444",
"description": "善于分析事件和财报中的风险因素",
"tools": ROLE_TOOLS["big_short"],
"system_prompt": """你是"大空头",一位专业的风险分析师。
你的特点:
1. 善于发现被市场忽视的风险因素
2. 擅长财报分析,发现财务造假迹象
3. 关注行业天花板、竞争加剧、估值泡沫
你可以使用以下工具获取数据:
- search_china_news: 搜索负面新闻
- get_stock_financial_index: 获取财务指标找问题
- get_stock_balance_sheet: 分析资产负债表
- get_stock_cashflow: 分析现金流
分析时请先调用工具获取数据,再基于数据指出风险。
注意参考前面其他人的发言进行有针对性的反驳。发言控制在200字以内。"""
},
"simons": {
"id": "simons",
"name": "量化分析员",
"nickname": "西蒙斯",
"role_type": "quant",
"avatar": "/images/agent/simons.png",
"model": "kimi-k2-thinking",
"color": "#3B82F6",
"description": "中性立场,使用量化工具分析技术指标",
"tools": ROLE_TOOLS["simons"],
"system_prompt": """你是"量化分析员"(昵称:西蒙斯),一位专业的量化交易研究员。
你的特点:
1. 使用数据和技术指标说话,保持中性立场
2. 擅长均线、量价、动能指标分析
3. 用概率思维看待市场
你可以使用以下工具获取数据:
- get_stock_trade_data: 获取交易数据(价格、成交量)
- search_limit_up_stocks: 搜索涨停股票
- get_concept_statistics: 获取概念板块统计
分析时请先调用工具获取数据,再基于数据给出技术分析。
注意参考前面其他人的发言用数据说话。发言控制在200字以内。"""
},
"leek": {
"id": "leek",
"name": "韭菜",
"nickname": "牢大",
"role_type": "retail",
"avatar": "/images/agent/牢大.png",
"model": "deepmoney",
"color": "#F59E0B",
"description": "贪婪又讨厌亏损,热爱追涨杀跌",
"tools": [],
"system_prompt": """你是"韭菜"(昵称:牢大),一个典型的散户投资者。
你的特点:
1. 贪婪但又害怕亏损,追涨杀跌
2. 容易被市场情绪影响
3. 喜欢听小道消息,期望一夜暴富
你不需要调用工具,直接用散户视角发表看法。
注意参考前面其他人的发言用最朴素的方式回应。语言口语化、情绪化。发言控制在150字以内。"""
},
"fund_manager": {
"id": "fund_manager",
"name": "基金经理",
"nickname": "决策者",
"role_type": "manager",
"avatar": "/images/agent/基金经理.png",
"model": "deepseek",
"color": "#8B5CF6",
"description": "综合分析做出最终决策",
"tools": ROLE_TOOLS["fund_manager"],
"system_prompt": """你是"基金经理",投研会议的最终决策者。
你的角色:
1. 综合各方观点,做出理性判断
2. 平衡多空观点,识别有价值的分析
3. 注意:韭菜的观点通常是反向指标
如果需要补充信息,可以调用工具:
- search_china_news: 搜索新闻
- search_research_reports: 搜索研报
- get_stock_basic_info: 获取股票基本信息
决策输出格式:
1. 综合评估
2. 关键观点
3. 风险提示
4. 操作建议(买入/持有/观望/卖出)
5. 信心指数1-10分
参考前面所有人的发言给出综合判断。发言控制在300字以内。"""
}
}
class MeetingRequest(BaseModel):
"""投研会议请求"""
topic: str
user_id: str = "anonymous"
user_nickname: str = "匿名用户"
session_id: Optional[str] = None
user_message: Optional[str] = None
conversation_history: List[Dict[str, Any]] = []
def get_random_speaking_order() -> List[str]:
"""随机生成发言顺序(不包括基金经理)"""
roles = ["buffett", "big_short", "simons", "leek"]
random.shuffle(roles)
return roles
async def call_role_tool(role_id: str, tool_name: str, arguments: dict) -> dict:
"""调用角色的工具"""
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
return {"success": False, "error": f"Unknown tool: {tool_name}"}
try:
result = await handler(arguments)
return {"success": True, "tool": tool_name, "result": result}
except Exception as e:
logger.error(f"Tool {tool_name} failed: {e}")
return {"success": False, "tool": tool_name, "error": str(e)}
async def stream_role_response(
role_id: str,
topic: str,
context: str,
tools: List[dict]
) -> AsyncGenerator[dict, None]:
"""流式生成角色回复,支持工具调用"""
role = MEETING_ROLES.get(role_id)
if not role:
yield {"type": "error", "error": f"Unknown role: {role_id}"}
return
model_name = role["model"]
model_config = MEETING_MODEL_CONFIGS.get(model_name)
if not model_config:
yield {"type": "error", "error": f"Unknown model: {model_name}"}
return
try:
client = OpenAI(
api_key=model_config["api_key"],
base_url=model_config["base_url"],
timeout=180
)
messages = [
{"role": "system", "content": role["system_prompt"]},
{"role": "user", "content": f"议题:{topic}\n\n{context}"}
]
# 准备工具定义(如果该角色有工具)
role_tool_names = role.get("tools", [])
openai_tools = None
if role_tool_names:
openai_tools = []
for tool in TOOLS:
if tool.name in role_tool_names:
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
# 第一次调用:可能触发工具调用
tool_calls_made = []
if openai_tools:
response = client.chat.completions.create(
model=model_config["model"],
messages=messages,
tools=openai_tools,
tool_choice="auto",
stream=False, # 工具调用不使用流式
temperature=0.7,
max_tokens=1000,
)
assistant_message = response.choices[0].message
# 处理工具调用
if assistant_message.tool_calls:
messages.append(assistant_message)
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_call_id = tool_call.id
try:
arguments = json.loads(tool_call.function.arguments)
except:
arguments = {}
# 发送工具调用开始事件
yield {
"type": "tool_call_start",
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"arguments": arguments
}
# 执行工具调用
start_time = time.time()
result = await call_role_tool(role_id, tool_name, arguments)
execution_time = time.time() - start_time
tool_calls_made.append({
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"arguments": arguments,
"result": result,
"execution_time": execution_time
})
# 发送工具调用结果事件
yield {
"type": "tool_call_result",
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"result": result,
"status": "success" if result.get("success") else "error",
"execution_time": execution_time
}
# 添加工具结果到消息
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result, ensure_ascii=False)
})
# 流式生成最终回复
stream = client.chat.completions.create(
model=model_config["model"],
messages=messages,
stream=True,
temperature=0.7,
max_tokens=2000, # 增加 token 限制以避免输出被截断
)
full_content = ""
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
yield {
"type": "content_delta",
"content": content
}
# 清理 DeepSeek 工具调用标记
full_content = clean_deepseek_tool_markers(full_content)
# 发送完成事件
yield {
"type": "content_done",
"full_content": full_content,
"tool_calls": tool_calls_made
}
except Exception as e:
logger.error(f"Role {role_id} stream failed: {e}")
yield {"type": "error", "error": str(e)}
@app.post("/agent/meeting/stream")
async def stream_investment_meeting(request: MeetingRequest):
"""
流式投研会议 V2
- 随机发言顺序
- 每个角色流式输出
- 支持工具调用
- 支持用户中途发言
"""
logger.info(f"[Meeting V2] 启动: {request.topic}")
async def generate_meeting_stream() -> AsyncGenerator[str, None]:
session_id = request.session_id or str(uuid.uuid4())
round_number = len(request.conversation_history) // 5 + 1
# 发送会话开始
yield f"data: {json.dumps({'type': 'session_start', 'session_id': session_id, 'round': round_number}, ensure_ascii=False)}\n\n"
# 构建上下文
context_parts = []
if request.conversation_history:
context_parts.append("之前的讨论:")
for msg in request.conversation_history:
context_parts.append(f"{msg.get('role_name', '未知')}】:{msg.get('content', '')}")
if request.user_message:
context_parts.append(f"\n用户刚才说:{request.user_message}")
context = "\n".join(context_parts) if context_parts else "这是第一轮讨论,请针对议题发表你的观点。"
# 随机发言顺序
speaking_order = get_random_speaking_order()
yield f"data: {json.dumps({'type': 'order_decided', 'order': speaking_order}, ensure_ascii=False)}\n\n"
all_messages = []
accumulated_context = context
# 依次让每个角色发言
for role_id in speaking_order:
role = MEETING_ROLES[role_id]
# 发送开始发言事件
yield f"data: {json.dumps({'type': 'speaking_start', 'role_id': role_id, 'role_name': role['name'], 'color': role['color']}, ensure_ascii=False)}\n\n"
# 准备工具列表
role_tools = [t for t in TOOLS if t.name in role.get("tools", [])]
# 流式生成回复
full_content = ""
tool_calls = []
async for event in stream_role_response(role_id, request.topic, accumulated_context, role_tools):
if event["type"] == "tool_call_start":
yield f"data: {json.dumps({'type': 'tool_call_start', 'role_id': role_id, 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'arguments': event['arguments']}, ensure_ascii=False)}\n\n"
elif event["type"] == "tool_call_result":
yield f"data: {json.dumps({'type': 'tool_call_result', 'role_id': role_id, 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'result': event['result'], 'status': event['status'], 'execution_time': event['execution_time']}, ensure_ascii=False)}\n\n"
tool_calls.append(event)
elif event["type"] == "content_delta":
yield f"data: {json.dumps({'type': 'content_delta', 'role_id': role_id, 'content': event['content']}, ensure_ascii=False)}\n\n"
full_content += event["content"]
elif event["type"] == "content_done":
full_content = event["full_content"]
tool_calls = event.get("tool_calls", [])
elif event["type"] == "error":
yield f"data: {json.dumps({'type': 'error', 'role_id': role_id, 'error': event['error']}, ensure_ascii=False)}\n\n"
full_content = f"[{role['name']}暂时无法发言]"
# 构建完整消息
message = {
"role_id": role_id,
"role_name": role["name"],
"nickname": role["nickname"],
"avatar": role["avatar"],
"color": role["color"],
"content": full_content,
"tool_calls": tool_calls,
"timestamp": datetime.now().isoformat(),
"round_number": round_number
}
all_messages.append(message)
# 发送消息完成事件
yield f"data: {json.dumps({'type': 'message_complete', 'message': message}, ensure_ascii=False)}\n\n"
# 更新上下文
accumulated_context += f"\n\n{role['name']}】:{full_content}"
await asyncio.sleep(0.3)
# 基金经理总结
fund_manager = MEETING_ROLES["fund_manager"]
yield f"data: {json.dumps({'type': 'speaking_start', 'role_id': 'fund_manager', 'role_name': fund_manager['name'], 'color': fund_manager['color']}, ensure_ascii=False)}\n\n"
fm_full_content = ""
fm_tool_calls = []
fm_tools = [t for t in TOOLS if t.name in fund_manager.get("tools", [])]
async for event in stream_role_response("fund_manager", request.topic, accumulated_context, fm_tools):
if event["type"] == "tool_call_start":
yield f"data: {json.dumps({'type': 'tool_call_start', 'role_id': 'fund_manager', 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'arguments': event['arguments']}, ensure_ascii=False)}\n\n"
elif event["type"] == "tool_call_result":
yield f"data: {json.dumps({'type': 'tool_call_result', 'role_id': 'fund_manager', 'tool_call_id': event['tool_call_id'], 'tool_name': event['tool_name'], 'result': event['result'], 'status': event['status'], 'execution_time': event['execution_time']}, ensure_ascii=False)}\n\n"
fm_tool_calls.append(event)
elif event["type"] == "content_delta":
yield f"data: {json.dumps({'type': 'content_delta', 'role_id': 'fund_manager', 'content': event['content']}, ensure_ascii=False)}\n\n"
fm_full_content += event["content"]
elif event["type"] == "content_done":
fm_full_content = event["full_content"]
elif event["type"] == "error":
fm_full_content = "[基金经理暂时无法发言]"
fm_message = {
"role_id": "fund_manager",
"role_name": fund_manager["name"],
"nickname": fund_manager["nickname"],
"avatar": fund_manager["avatar"],
"color": fund_manager["color"],
"content": fm_full_content,
"tool_calls": fm_tool_calls,
"timestamp": datetime.now().isoformat(),
"round_number": round_number,
"is_conclusion": True
}
yield f"data: {json.dumps({'type': 'message_complete', 'message': fm_message}, ensure_ascii=False)}\n\n"
# 发送会议状态(不强制结束,用户可以继续)
yield f"data: {json.dumps({'type': 'round_end', 'round_number': round_number, 'can_continue': True}, ensure_ascii=False)}\n\n"
return StreamingResponse(
generate_meeting_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/agent/meeting/roles")
async def get_meeting_roles():
"""获取所有会议角色配置"""
return {
"success": True,
"roles": [
{
"id": role["id"],
"name": role["name"],
"nickname": role["nickname"],
"role_type": role["role_type"],
"avatar": role["avatar"],
"color": role["color"],
"description": role["description"],
"tools": role.get("tools", []),
}
for role in MEETING_ROLES.values()
]
}
# ==================== 健康检查 ====================
@app.get("/health")
async def health_check():
"""健康检查"""
# 检查各个后端服务的健康状态
services_status = {}
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news?query=test&top_k=1", timeout=5.0)
services_status["news_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["news_api"] = "unhealthy"
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/", timeout=5.0)
services_status["concept_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["concept_api"] = "unhealthy"
try:
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/health", timeout=5.0)
services_status["stock_analysis_api"] = "healthy" if response.status_code == 200 else "unhealthy"
except:
services_status["stock_analysis_api"] = "unhealthy"
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": services_status
}
# ==================== 错误处理 ====================
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""HTTP异常处理"""
return JSONResponse(
status_code=exc.status_code,
content={
"success": False,
"error": exc.detail,
"timestamp": datetime.now().isoformat()
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""通用异常处理"""
logger.error(f"Unexpected error: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "Internal server error",
"detail": str(exc),
"timestamp": datetime.now().isoformat()
}
)
# ==================== 应用启动/关闭 ====================
@app.on_event("startup")
async def startup_event():
"""应用启动"""
logger.info("MCP Server starting up...")
logger.info(f"Registered {len(TOOLS)} tools")
# 初始化数据库连接池
try:
await db.get_pool()
logger.info("MySQL connection pool initialized")
except Exception as e:
logger.error(f"Failed to initialize MySQL pool: {e}")
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭"""
logger.info("MCP Server shutting down...")
await HTTP_CLIENT.aclose()
# 关闭数据库连接池
try:
await db.close_pool()
logger.info("MySQL connection pool closed")
except Exception as e:
logger.error(f"Failed to close MySQL pool: {e}")
# ==================== 主程序 ====================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"mcp_server:app",
host="0.0.0.0",
port=8900,
reload=True,
log_level="info"
)