1951 lines
65 KiB
Python
1951 lines
65 KiB
Python
"""
|
||
MCP Server for Financial Data Search
|
||
基于FastAPI的MCP服务端,整合多个金融数据搜索API
|
||
支持LLM调用和Web聊天功能
|
||
"""
|
||
|
||
from fastapi import FastAPI, HTTPException, Request
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
from pydantic import BaseModel, Field
|
||
from typing import List, Dict, Any, Optional, Literal, AsyncGenerator
|
||
from datetime import datetime, date
|
||
import logging
|
||
import httpx
|
||
from enum import Enum
|
||
import mcp_database as db
|
||
from openai import OpenAI
|
||
import json
|
||
import asyncio
|
||
import uuid
|
||
from mcp_elasticsearch import es_client
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 创建FastAPI应用
|
||
app = FastAPI(
|
||
title="Financial Data MCP Server",
|
||
description="Model Context Protocol server for financial data search and analysis",
|
||
version="1.0.0"
|
||
)
|
||
|
||
# 添加CORS中间件
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# ==================== 配置 ====================
|
||
|
||
class ServiceEndpoints:
|
||
"""API服务端点配置"""
|
||
NEWS_API = "http://222.128.1.157:21891" # 新闻API
|
||
ROADSHOW_API = "http://222.128.1.157:19800" # 路演API
|
||
CONCEPT_API = "http://222.128.1.157:16801" # 概念API(本地)
|
||
STOCK_ANALYSIS_API = "http://222.128.1.157:8811" # 涨停分析+研报API
|
||
|
||
# HTTP客户端配置
|
||
HTTP_CLIENT = httpx.AsyncClient(timeout=60.0)
|
||
|
||
# ==================== Agent系统配置 ====================
|
||
|
||
# Kimi 配置 - 用于计划制定和深度推理
|
||
KIMI_CONFIG = {
|
||
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
|
||
"base_url": "https://api.moonshot.cn/v1",
|
||
"model": "kimi-k2-thinking", # 思考模型
|
||
}
|
||
|
||
# DeepMoney 配置 - 用于新闻总结
|
||
DEEPMONEY_CONFIG = {
|
||
"api_key": "", # 空值
|
||
"base_url": "http://111.62.35.50:8000/v1",
|
||
"model": "deepmoney",
|
||
}
|
||
|
||
# ==================== MCP协议数据模型 ====================
|
||
|
||
class ToolParameter(BaseModel):
|
||
"""工具参数定义"""
|
||
type: str
|
||
description: str
|
||
enum: Optional[List[str]] = None
|
||
default: Optional[Any] = None
|
||
|
||
class ToolDefinition(BaseModel):
|
||
"""工具定义"""
|
||
name: str
|
||
description: str
|
||
parameters: Dict[str, Any] # 支持完整的 JSON Schema 格式
|
||
|
||
class ToolCallRequest(BaseModel):
|
||
"""工具调用请求"""
|
||
tool: str
|
||
arguments: Dict[str, Any] = {}
|
||
|
||
class ToolCallResponse(BaseModel):
|
||
"""工具调用响应"""
|
||
success: bool
|
||
data: Optional[Any] = None
|
||
error: Optional[str] = None
|
||
metadata: Optional[Dict[str, Any]] = None
|
||
|
||
# ==================== Agent系统数据模型 ====================
|
||
|
||
class ToolCall(BaseModel):
|
||
"""工具调用"""
|
||
tool: str
|
||
arguments: Dict[str, Any]
|
||
reason: str
|
||
|
||
class ExecutionPlan(BaseModel):
|
||
"""执行计划"""
|
||
goal: str
|
||
steps: List[ToolCall]
|
||
reasoning: str
|
||
|
||
class StepResult(BaseModel):
|
||
"""单步执行结果"""
|
||
step_index: int
|
||
tool: str
|
||
arguments: Dict[str, Any]
|
||
status: Literal["success", "failed", "skipped"]
|
||
result: Optional[Any] = None
|
||
error: Optional[str] = None
|
||
execution_time: float = 0
|
||
|
||
class AgentResponse(BaseModel):
|
||
"""Agent响应"""
|
||
success: bool
|
||
message: str
|
||
plan: Optional[ExecutionPlan] = None
|
||
step_results: List[StepResult] = []
|
||
final_summary: Optional[str] = None
|
||
metadata: Optional[Dict[str, Any]] = None
|
||
|
||
class ConversationMessage(BaseModel):
|
||
"""对话历史消息"""
|
||
isUser: bool
|
||
content: str
|
||
|
||
class AgentChatRequest(BaseModel):
|
||
"""聊天请求"""
|
||
message: str
|
||
conversation_history: List[ConversationMessage] = []
|
||
user_id: Optional[str] = None # 用户ID
|
||
user_nickname: Optional[str] = None # 用户昵称
|
||
user_avatar: Optional[str] = None # 用户头像URL
|
||
subscription_type: Optional[str] = None # 用户订阅类型(free/pro/max)
|
||
session_id: Optional[str] = None # 会话ID(如果为空则创建新会话)
|
||
|
||
# ==================== MCP工具定义 ====================
|
||
|
||
TOOLS: List[ToolDefinition] = [
|
||
ToolDefinition(
|
||
name="search_news",
|
||
description="搜索全球新闻,支持关键词搜索和日期过滤。适用于查找国际新闻、行业动态等。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词,例如:'人工智能'、'新能源汽车'"
|
||
},
|
||
"source": {
|
||
"type": "string",
|
||
"description": "新闻来源筛选,可选"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"top_k": {
|
||
"type": "integer",
|
||
"description": "返回结果数量,默认20",
|
||
"default": 20
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_china_news",
|
||
description="搜索中国新闻,使用KNN语义搜索。支持精确匹配模式,适合查找股票、公司相关新闻。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词"
|
||
},
|
||
"exact_match": {
|
||
"type": "boolean",
|
||
"description": "是否精确匹配(用于股票代码、公司名称等),默认false",
|
||
"default": False
|
||
},
|
||
"source": {
|
||
"type": "string",
|
||
"description": "新闻来源筛选"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"top_k": {
|
||
"type": "integer",
|
||
"description": "返回结果数量,默认20",
|
||
"default": 20
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_medical_news",
|
||
description="搜索医疗健康类新闻,包括医药、医疗设备、生物技术等领域。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词"
|
||
},
|
||
"source": {
|
||
"type": "string",
|
||
"description": "新闻来源"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"top_k": {
|
||
"type": "integer",
|
||
"description": "返回结果数量",
|
||
"default": 10
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_roadshows",
|
||
description="搜索上市公司路演、投资者交流活动记录。可按公司代码、日期范围搜索。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词,可以是公司名称、主题等"
|
||
},
|
||
"company_code": {
|
||
"type": "string",
|
||
"description": "公司股票代码,例如:'600519.SH'"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS"
|
||
},
|
||
"size": {
|
||
"type": "integer",
|
||
"description": "返回结果数量",
|
||
"default": 10
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_concepts",
|
||
description="搜索股票概念板块,支持按涨跌幅、股票数量排序。返回概念详情及相关股票列表。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词,例如:'新能源'、'人工智能'"
|
||
},
|
||
"size": {
|
||
"type": "integer",
|
||
"description": "每页结果数量",
|
||
"default": 10
|
||
},
|
||
"page": {
|
||
"type": "integer",
|
||
"description": "页码",
|
||
"default": 1
|
||
},
|
||
"sort_by": {
|
||
"type": "string",
|
||
"description": "排序方式:change_pct(涨跌幅), _score(相关度), stock_count(股票数), concept_name(名称)",
|
||
"enum": ["change_pct", "_score", "stock_count", "concept_name"],
|
||
"default": "change_pct"
|
||
},
|
||
"trade_date": {
|
||
"type": "string",
|
||
"description": "交易日期,格式:YYYY-MM-DD,默认最新"
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_concept_details",
|
||
description="根据概念ID获取详细信息,包括描述、相关股票、涨跌幅数据等。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"concept_id": {
|
||
"type": "string",
|
||
"description": "概念ID"
|
||
},
|
||
"trade_date": {
|
||
"type": "string",
|
||
"description": "交易日期,格式:YYYY-MM-DD"
|
||
}
|
||
},
|
||
"required": ["concept_id"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_concepts",
|
||
description="查询指定股票的所有相关概念板块,包括涨跌幅信息。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"stock_code": {
|
||
"type": "string",
|
||
"description": "股票代码或名称"
|
||
},
|
||
"size": {
|
||
"type": "integer",
|
||
"description": "返回概念数量",
|
||
"default": 50
|
||
},
|
||
"sort_by": {
|
||
"type": "string",
|
||
"description": "排序方式",
|
||
"enum": ["stock_count", "concept_name", "recent"],
|
||
"default": "stock_count"
|
||
},
|
||
"trade_date": {
|
||
"type": "string",
|
||
"description": "交易日期,格式:YYYY-MM-DD"
|
||
}
|
||
},
|
||
"required": ["stock_code"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_concept_statistics",
|
||
description="获取概念板块统计数据,包括涨幅榜、跌幅榜、活跃榜、波动榜、连涨榜。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"days": {
|
||
"type": "integer",
|
||
"description": "统计天数(与start_date/end_date互斥)"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"min_stock_count": {
|
||
"type": "integer",
|
||
"description": "最少股票数量过滤",
|
||
"default": 3
|
||
}
|
||
},
|
||
"required": []
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_limit_up_stocks",
|
||
description="搜索涨停股票,支持按日期、关键词、板块等条件搜索。包括混合语义搜索。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词(涨停原因、公司名称等)"
|
||
},
|
||
"date": {
|
||
"type": "string",
|
||
"description": "日期,格式:YYYYMMDD"
|
||
},
|
||
"mode": {
|
||
"type": "string",
|
||
"description": "搜索模式",
|
||
"enum": ["hybrid", "text", "vector"],
|
||
"default": "hybrid"
|
||
},
|
||
"sectors": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"description": "板块筛选"
|
||
},
|
||
"page_size": {
|
||
"type": "integer",
|
||
"description": "每页结果数",
|
||
"default": 20
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_daily_stock_analysis",
|
||
description="获取指定日期的涨停股票分析,包括板块分析、词云、趋势图表等。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"date": {
|
||
"type": "string",
|
||
"description": "日期,格式:YYYYMMDD"
|
||
}
|
||
},
|
||
"required": ["date"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_research_reports",
|
||
description="搜索研究报告,支持文本和语义混合搜索。可按作者、证券、日期等筛选。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "搜索关键词"
|
||
},
|
||
"mode": {
|
||
"type": "string",
|
||
"description": "搜索模式",
|
||
"enum": ["hybrid", "text", "vector"],
|
||
"default": "hybrid"
|
||
},
|
||
"exact_match": {
|
||
"type": "string",
|
||
"description": "是否精确匹配:0=模糊,1=精确",
|
||
"enum": ["0", "1"],
|
||
"default": "0"
|
||
},
|
||
"security_code": {
|
||
"type": "string",
|
||
"description": "证券代码筛选"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"size": {
|
||
"type": "integer",
|
||
"description": "返回结果数量",
|
||
"default": 10
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_basic_info",
|
||
description="获取股票基本信息,包括公司名称、行业、地址、主营业务、高管等基础数据。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccode": {
|
||
"type": "string",
|
||
"description": "股票代码,例如:600519"
|
||
}
|
||
},
|
||
"required": ["seccode"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_financial_index",
|
||
description="获取股票财务指标,包括每股收益、净资产收益率、营收增长率等关键财务数据。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccode": {
|
||
"type": "string",
|
||
"description": "股票代码"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"limit": {
|
||
"type": "integer",
|
||
"description": "返回条数,默认10",
|
||
"default": 10
|
||
}
|
||
},
|
||
"required": ["seccode"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_trade_data",
|
||
description="获取股票交易数据,包括价格、成交量、涨跌幅、换手率等日线行情数据。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccode": {
|
||
"type": "string",
|
||
"description": "股票代码"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"limit": {
|
||
"type": "integer",
|
||
"description": "返回条数,默认30",
|
||
"default": 30
|
||
}
|
||
},
|
||
"required": ["seccode"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_balance_sheet",
|
||
description="获取股票资产负债表,包括资产、负债、所有者权益等财务状况数据。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccode": {
|
||
"type": "string",
|
||
"description": "股票代码"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"limit": {
|
||
"type": "integer",
|
||
"description": "返回条数,默认8",
|
||
"default": 8
|
||
}
|
||
},
|
||
"required": ["seccode"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_cashflow",
|
||
description="获取股票现金流量表,包括经营、投资、筹资活动现金流数据。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccode": {
|
||
"type": "string",
|
||
"description": "股票代码"
|
||
},
|
||
"start_date": {
|
||
"type": "string",
|
||
"description": "开始日期,格式:YYYY-MM-DD"
|
||
},
|
||
"end_date": {
|
||
"type": "string",
|
||
"description": "结束日期,格式:YYYY-MM-DD"
|
||
},
|
||
"limit": {
|
||
"type": "integer",
|
||
"description": "返回条数,默认8",
|
||
"default": 8
|
||
}
|
||
},
|
||
"required": ["seccode"]
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="search_stocks_by_criteria",
|
||
description="按条件搜索股票,支持按行业、地区、市值等条件筛选股票列表。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"industry": {
|
||
"type": "string",
|
||
"description": "行业名称,支持模糊匹配"
|
||
},
|
||
"province": {
|
||
"type": "string",
|
||
"description": "省份名称"
|
||
},
|
||
"min_market_cap": {
|
||
"type": "number",
|
||
"description": "最小市值(亿元)"
|
||
},
|
||
"max_market_cap": {
|
||
"type": "number",
|
||
"description": "最大市值(亿元)"
|
||
},
|
||
"limit": {
|
||
"type": "integer",
|
||
"description": "返回条数,默认50",
|
||
"default": 50
|
||
}
|
||
},
|
||
"required": []
|
||
}
|
||
),
|
||
ToolDefinition(
|
||
name="get_stock_comparison",
|
||
description="股票对比分析,支持多只股票的财务指标或交易数据对比。",
|
||
parameters={
|
||
"type": "object",
|
||
"properties": {
|
||
"seccodes": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"description": "股票代码列表,至少2个"
|
||
},
|
||
"metric": {
|
||
"type": "string",
|
||
"description": "对比指标类型",
|
||
"enum": ["financial", "trade"],
|
||
"default": "financial"
|
||
}
|
||
},
|
||
"required": ["seccodes"]
|
||
}
|
||
),
|
||
]
|
||
|
||
# ==================== MCP协议端点 ====================
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
"""服务根端点"""
|
||
return {
|
||
"name": "Financial Data MCP Server",
|
||
"version": "1.0.0",
|
||
"protocol": "MCP",
|
||
"description": "Model Context Protocol server for financial data search and analysis"
|
||
}
|
||
|
||
@app.get("/tools")
|
||
async def list_tools():
|
||
"""列出所有可用工具"""
|
||
return {
|
||
"tools": [tool.dict() for tool in TOOLS]
|
||
}
|
||
|
||
@app.get("/tools/{tool_name}")
|
||
async def get_tool(tool_name: str):
|
||
"""获取特定工具的定义"""
|
||
tool = next((t for t in TOOLS if t.name == tool_name), None)
|
||
if not tool:
|
||
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
|
||
return tool.dict()
|
||
|
||
@app.post("/tools/call")
|
||
async def call_tool(request: ToolCallRequest):
|
||
"""调用工具"""
|
||
logger.info(f"Tool call: {request.tool} with args: {request.arguments}")
|
||
|
||
try:
|
||
# 路由到对应的工具处理函数
|
||
handler = TOOL_HANDLERS.get(request.tool)
|
||
if not handler:
|
||
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found")
|
||
|
||
result = await handler(request.arguments)
|
||
|
||
return ToolCallResponse(
|
||
success=True,
|
||
data=result,
|
||
metadata={
|
||
"tool": request.tool,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Tool call error: {str(e)}", exc_info=True)
|
||
return ToolCallResponse(
|
||
success=False,
|
||
error=str(e),
|
||
metadata={
|
||
"tool": request.tool,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
)
|
||
|
||
# ==================== 工具处理函数 ====================
|
||
|
||
async def handle_search_news(args: Dict[str, Any]) -> Any:
|
||
"""处理新闻搜索"""
|
||
params = {
|
||
"query": args.get("query"),
|
||
"source": args.get("source"),
|
||
"start_date": args.get("start_date"),
|
||
"end_date": args.get("end_date"),
|
||
"top_k": args.get("top_k", 20)
|
||
}
|
||
# 移除None值
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_china_news(args: Dict[str, Any]) -> Any:
|
||
"""处理中国新闻搜索"""
|
||
params = {
|
||
"query": args.get("query"),
|
||
"exact_match": args.get("exact_match", False),
|
||
"source": args.get("source"),
|
||
"start_date": args.get("start_date"),
|
||
"end_date": args.get("end_date"),
|
||
"top_k": args.get("top_k", 20)
|
||
}
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_china_news", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_medical_news(args: Dict[str, Any]) -> Any:
|
||
"""处理医疗新闻搜索"""
|
||
params = {
|
||
"query": args["query"],
|
||
"source": args.get("source"),
|
||
"start_date": args.get("start_date"),
|
||
"end_date": args.get("end_date"),
|
||
"top_k": args.get("top_k", 10)
|
||
}
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_medical_news", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_roadshows(args: Dict[str, Any]) -> Any:
|
||
"""处理路演搜索"""
|
||
params = {
|
||
"query": args["query"],
|
||
"company_code": args.get("company_code"),
|
||
"start_date": args.get("start_date"),
|
||
"end_date": args.get("end_date"),
|
||
"size": args.get("size", 10)
|
||
}
|
||
params = {k: v for k, v in params.items() if v is not None}
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.ROADSHOW_API}/search", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_concepts(args: Dict[str, Any]) -> Any:
|
||
"""处理概念搜索"""
|
||
payload = {
|
||
"query": args["query"],
|
||
"size": args.get("size", 10),
|
||
"page": args.get("page", 1),
|
||
"search_size": 100,
|
||
"sort_by": args.get("sort_by", "change_pct"),
|
||
"use_knn": True
|
||
}
|
||
if args.get("trade_date"):
|
||
payload["trade_date"] = args["trade_date"]
|
||
|
||
response = await HTTP_CLIENT.post(f"{ServiceEndpoints.CONCEPT_API}/search", json=payload)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_get_concept_details(args: Dict[str, Any]) -> Any:
|
||
"""处理概念详情获取"""
|
||
concept_id = args["concept_id"]
|
||
params = {}
|
||
if args.get("trade_date"):
|
||
params["trade_date"] = args["trade_date"]
|
||
|
||
response = await HTTP_CLIENT.get(
|
||
f"{ServiceEndpoints.CONCEPT_API}/concept/{concept_id}",
|
||
params=params
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_get_stock_concepts(args: Dict[str, Any]) -> Any:
|
||
"""处理股票概念获取"""
|
||
stock_code = args["stock_code"]
|
||
params = {
|
||
"size": args.get("size", 50),
|
||
"sort_by": args.get("sort_by", "stock_count"),
|
||
"include_description": True
|
||
}
|
||
if args.get("trade_date"):
|
||
params["trade_date"] = args["trade_date"]
|
||
|
||
response = await HTTP_CLIENT.get(
|
||
f"{ServiceEndpoints.CONCEPT_API}/stock/{stock_code}/concepts",
|
||
params=params
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_get_concept_statistics(args: Dict[str, Any]) -> Any:
|
||
"""处理概念统计获取"""
|
||
params = {}
|
||
if args.get("days"):
|
||
params["days"] = args["days"]
|
||
if args.get("start_date"):
|
||
params["start_date"] = args["start_date"]
|
||
if args.get("end_date"):
|
||
params["end_date"] = args["end_date"]
|
||
if args.get("min_stock_count"):
|
||
params["min_stock_count"] = args["min_stock_count"]
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/statistics", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_limit_up_stocks(args: Dict[str, Any]) -> Any:
|
||
"""处理涨停股票搜索"""
|
||
payload = {
|
||
"query": args["query"],
|
||
"mode": args.get("mode", "hybrid"),
|
||
"page_size": args.get("page_size", 20)
|
||
}
|
||
if args.get("date"):
|
||
payload["date"] = args["date"]
|
||
if args.get("sectors"):
|
||
payload["sectors"] = args["sectors"]
|
||
|
||
response = await HTTP_CLIENT.post(
|
||
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/stocks/search/hybrid",
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_get_daily_stock_analysis(args: Dict[str, Any]) -> Any:
|
||
"""处理每日股票分析获取"""
|
||
date = args["date"]
|
||
response = await HTTP_CLIENT.get(
|
||
f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/analysis/daily/{date}"
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_search_research_reports(args: Dict[str, Any]) -> Any:
|
||
"""处理研报搜索"""
|
||
params = {
|
||
"query": args["query"],
|
||
"mode": args.get("mode", "hybrid"),
|
||
"exact_match": args.get("exact_match", "0"),
|
||
"size": args.get("size", 10)
|
||
}
|
||
if args.get("security_code"):
|
||
params["security_code"] = args["security_code"]
|
||
if args.get("start_date"):
|
||
params["start_date"] = args["start_date"]
|
||
if args.get("end_date"):
|
||
params["end_date"] = args["end_date"]
|
||
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/search", params=params)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
async def handle_get_stock_basic_info(args: Dict[str, Any]) -> Any:
|
||
"""处理股票基本信息查询"""
|
||
seccode = args["seccode"]
|
||
result = await db.get_stock_basic_info(seccode)
|
||
if result:
|
||
return {"success": True, "data": result}
|
||
else:
|
||
return {"success": False, "error": f"未找到股票代码 {seccode} 的信息"}
|
||
|
||
async def handle_get_stock_financial_index(args: Dict[str, Any]) -> Any:
|
||
"""处理股票财务指标查询"""
|
||
seccode = args["seccode"]
|
||
start_date = args.get("start_date")
|
||
end_date = args.get("end_date")
|
||
limit = args.get("limit", 10)
|
||
|
||
result = await db.get_stock_financial_index(seccode, start_date, end_date, limit)
|
||
return {
|
||
"success": True,
|
||
"data": result,
|
||
"count": len(result)
|
||
}
|
||
|
||
async def handle_get_stock_trade_data(args: Dict[str, Any]) -> Any:
|
||
"""处理股票交易数据查询"""
|
||
seccode = args["seccode"]
|
||
start_date = args.get("start_date")
|
||
end_date = args.get("end_date")
|
||
limit = args.get("limit", 30)
|
||
|
||
result = await db.get_stock_trade_data(seccode, start_date, end_date, limit)
|
||
return {
|
||
"success": True,
|
||
"data": result,
|
||
"count": len(result)
|
||
}
|
||
|
||
async def handle_get_stock_balance_sheet(args: Dict[str, Any]) -> Any:
|
||
"""处理资产负债表查询"""
|
||
seccode = args["seccode"]
|
||
start_date = args.get("start_date")
|
||
end_date = args.get("end_date")
|
||
limit = args.get("limit", 8)
|
||
|
||
result = await db.get_stock_balance_sheet(seccode, start_date, end_date, limit)
|
||
return {
|
||
"success": True,
|
||
"data": result,
|
||
"count": len(result)
|
||
}
|
||
|
||
async def handle_get_stock_cashflow(args: Dict[str, Any]) -> Any:
|
||
"""处理现金流量表查询"""
|
||
seccode = args["seccode"]
|
||
start_date = args.get("start_date")
|
||
end_date = args.get("end_date")
|
||
limit = args.get("limit", 8)
|
||
|
||
result = await db.get_stock_cashflow(seccode, start_date, end_date, limit)
|
||
return {
|
||
"success": True,
|
||
"data": result,
|
||
"count": len(result)
|
||
}
|
||
|
||
async def handle_search_stocks_by_criteria(args: Dict[str, Any]) -> Any:
|
||
"""处理按条件搜索股票"""
|
||
industry = args.get("industry")
|
||
province = args.get("province")
|
||
min_market_cap = args.get("min_market_cap")
|
||
max_market_cap = args.get("max_market_cap")
|
||
limit = args.get("limit", 50)
|
||
|
||
result = await db.search_stocks_by_criteria(
|
||
industry, province, min_market_cap, max_market_cap, limit
|
||
)
|
||
return {
|
||
"success": True,
|
||
"data": result,
|
||
"count": len(result)
|
||
}
|
||
|
||
async def handle_get_stock_comparison(args: Dict[str, Any]) -> Any:
|
||
"""处理股票对比分析"""
|
||
seccodes = args["seccodes"]
|
||
metric = args.get("metric", "financial")
|
||
|
||
result = await db.get_stock_comparison(seccodes, metric)
|
||
return {
|
||
"success": True,
|
||
"data": result
|
||
}
|
||
|
||
# 工具处理函数映射
|
||
TOOL_HANDLERS = {
|
||
"search_news": handle_search_news,
|
||
"search_china_news": handle_search_china_news,
|
||
"search_medical_news": handle_search_medical_news,
|
||
"search_roadshows": handle_search_roadshows,
|
||
"search_concepts": handle_search_concepts,
|
||
"get_concept_details": handle_get_concept_details,
|
||
"get_stock_concepts": handle_get_stock_concepts,
|
||
"get_concept_statistics": handle_get_concept_statistics,
|
||
"search_limit_up_stocks": handle_search_limit_up_stocks,
|
||
"get_daily_stock_analysis": handle_get_daily_stock_analysis,
|
||
"search_research_reports": handle_search_research_reports,
|
||
"get_stock_basic_info": handle_get_stock_basic_info,
|
||
"get_stock_financial_index": handle_get_stock_financial_index,
|
||
"get_stock_trade_data": handle_get_stock_trade_data,
|
||
"get_stock_balance_sheet": handle_get_stock_balance_sheet,
|
||
"get_stock_cashflow": handle_get_stock_cashflow,
|
||
"search_stocks_by_criteria": handle_search_stocks_by_criteria,
|
||
"get_stock_comparison": handle_get_stock_comparison,
|
||
}
|
||
|
||
# ==================== Agent系统实现 ====================
|
||
|
||
class MCPAgentIntegrated:
|
||
"""集成版 MCP Agent - 使用 Kimi 和 DeepMoney"""
|
||
|
||
def __init__(self):
|
||
# 初始化 Kimi 客户端(计划制定)
|
||
self.kimi_client = OpenAI(
|
||
api_key=KIMI_CONFIG["api_key"],
|
||
base_url=KIMI_CONFIG["base_url"],
|
||
)
|
||
self.kimi_model = KIMI_CONFIG["model"]
|
||
|
||
# 初始化 DeepMoney 客户端(新闻总结)
|
||
self.deepmoney_client = OpenAI(
|
||
api_key=DEEPMONEY_CONFIG["api_key"],
|
||
base_url=DEEPMONEY_CONFIG["base_url"],
|
||
)
|
||
self.deepmoney_model = DEEPMONEY_CONFIG["model"]
|
||
|
||
def get_planning_prompt(self, tools: List[dict]) -> str:
|
||
"""获取计划制定的系统提示词"""
|
||
tools_desc = "\n\n".join([
|
||
f"**{tool['name']}**\n"
|
||
f"描述:{tool['description']}\n"
|
||
f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
|
||
for tool in tools
|
||
])
|
||
|
||
return f"""你是"价小前",北京价值前沿科技公司的AI投研聊天助手。
|
||
|
||
## 你的人格特征
|
||
- **名字**: 价小前
|
||
- **身份**: 北京价值前沿科技公司的专业AI投研助手
|
||
- **专业领域**: 股票投资研究、市场分析、新闻解读、财务分析
|
||
- **性格**: 专业、严谨、友好,擅长用简洁的语言解释复杂的金融概念
|
||
- **服务宗旨**: 帮助投资者做出更明智的投资决策,提供数据驱动的研究支持
|
||
|
||
## 可用工具
|
||
|
||
{tools_desc}
|
||
|
||
## 特殊工具
|
||
- **summarize_news**: 使用 DeepMoney 模型总结新闻数据
|
||
- 参数: {{"data": "新闻列表JSON", "focus": "关注点"}}
|
||
- 适用场景: 当需要总结新闻、研报等文本数据时
|
||
|
||
## 重要知识
|
||
- 贵州茅台: 600519
|
||
- 涨停: 涨幅约10%
|
||
- 概念板块: 相同题材股票分类
|
||
|
||
## 任务
|
||
分析用户问题,制定详细的执行计划。返回 JSON:
|
||
|
||
```json
|
||
{{
|
||
"goal": "用户目标",
|
||
"reasoning": "分析思路",
|
||
"steps": [
|
||
{{
|
||
"tool": "工具名",
|
||
"arguments": {{"参数": "值"}},
|
||
"reason": "原因"
|
||
}}
|
||
]
|
||
}}
|
||
```
|
||
|
||
## 规划原则
|
||
1. 先收集数据,再分析总结
|
||
2. 使用 summarize_news 总结新闻类数据
|
||
3. 不超过5个步骤
|
||
4. 最后一步通常是总结
|
||
|
||
## 示例
|
||
|
||
用户:"贵州茅台最近有什么新闻"
|
||
|
||
计划:
|
||
```json
|
||
{{
|
||
"goal": "查询并总结贵州茅台最新新闻",
|
||
"reasoning": "先搜索新闻,再用 DeepMoney 总结",
|
||
"steps": [
|
||
{{
|
||
"tool": "search_china_news",
|
||
"arguments": {{"query": "贵州茅台", "top_k": 10}},
|
||
"reason": "搜索贵州茅台相关新闻"
|
||
}},
|
||
{{
|
||
"tool": "summarize_news",
|
||
"arguments": {{
|
||
"data": "前面的新闻数据",
|
||
"focus": "贵州茅台的重要动态和市场影响"
|
||
}},
|
||
"reason": "使用DeepMoney总结新闻要点"
|
||
}}
|
||
]
|
||
}}
|
||
```
|
||
|
||
只返回JSON,不要其他内容。"""
|
||
|
||
async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan:
|
||
"""阶段1: 使用 Kimi 创建执行计划(带思考过程)"""
|
||
logger.info(f"[Planning] Kimi开始制定计划: {user_query}")
|
||
|
||
messages = [
|
||
{"role": "system", "content": self.get_planning_prompt(tools)},
|
||
{"role": "user", "content": user_query},
|
||
]
|
||
|
||
# 使用 Kimi 思考模型
|
||
response = self.kimi_client.chat.completions.create(
|
||
model=self.kimi_model,
|
||
messages=messages,
|
||
temperature=1.0, # Kimi 推荐
|
||
max_tokens=16000, # 足够容纳 reasoning_content
|
||
)
|
||
|
||
choice = response.choices[0]
|
||
message = choice.message
|
||
|
||
# 提取思考过程
|
||
reasoning_content = ""
|
||
if hasattr(message, "reasoning_content"):
|
||
reasoning_content = getattr(message, "reasoning_content")
|
||
logger.info(f"[Planning] Kimi思考过程: {reasoning_content[:200]}...")
|
||
|
||
# 提取计划内容
|
||
plan_json = message.content.strip()
|
||
|
||
# 清理可能的代码块标记
|
||
if "```json" in plan_json:
|
||
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
|
||
elif "```" in plan_json:
|
||
plan_json = plan_json.split("```")[1].split("```")[0].strip()
|
||
|
||
plan_data = json.loads(plan_json)
|
||
|
||
plan = ExecutionPlan(
|
||
goal=plan_data["goal"],
|
||
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
|
||
steps=[ToolCall(**step) for step in plan_data["steps"]],
|
||
)
|
||
|
||
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步")
|
||
return plan
|
||
|
||
async def execute_tool(
|
||
self,
|
||
tool_name: str,
|
||
arguments: Dict[str, Any],
|
||
tool_handlers: Dict[str, Any],
|
||
) -> Dict[str, Any]:
|
||
"""执行单个工具"""
|
||
|
||
# 特殊工具:summarize_news(使用 DeepMoney)
|
||
if tool_name == "summarize_news":
|
||
return await self.summarize_news_with_deepmoney(
|
||
data=arguments.get("data", ""),
|
||
focus=arguments.get("focus", "关键信息"),
|
||
)
|
||
|
||
# 调用 MCP 工具
|
||
handler = tool_handlers.get(tool_name)
|
||
if not handler:
|
||
raise ValueError(f"Tool '{tool_name}' not found")
|
||
|
||
result = await handler(arguments)
|
||
return result
|
||
|
||
async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str:
|
||
"""使用 DeepMoney 模型总结新闻"""
|
||
logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}")
|
||
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。"
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}"
|
||
},
|
||
]
|
||
|
||
try:
|
||
response = self.deepmoney_client.chat.completions.create(
|
||
model=self.deepmoney_model,
|
||
messages=messages,
|
||
temperature=0.7,
|
||
max_tokens=1000,
|
||
)
|
||
|
||
summary = response.choices[0].message.content
|
||
logger.info(f"[DeepMoney] 总结完成")
|
||
return summary
|
||
|
||
except Exception as e:
|
||
logger.error(f"[DeepMoney] 总结失败: {str(e)}")
|
||
# 降级:返回简化摘要
|
||
return f"新闻总结失败,原始数据:{data[:500]}..."
|
||
|
||
async def execute_plan(
|
||
self,
|
||
plan: ExecutionPlan,
|
||
tool_handlers: Dict[str, Any],
|
||
) -> List[StepResult]:
|
||
"""阶段2: 执行计划"""
|
||
logger.info(f"[Execution] 开始执行: {len(plan.steps)} 步")
|
||
|
||
results = []
|
||
collected_data = {}
|
||
|
||
for i, step in enumerate(plan.steps):
|
||
logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}")
|
||
|
||
start_time = datetime.now()
|
||
|
||
try:
|
||
# 替换占位符
|
||
arguments = step.arguments.copy()
|
||
|
||
# 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据"
|
||
if step.tool == "summarize_news":
|
||
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
|
||
# 将收集的数据传递
|
||
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
|
||
|
||
# 执行工具
|
||
result = await self.execute_tool(step.tool, arguments, tool_handlers)
|
||
|
||
execution_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
step_result = StepResult(
|
||
step_index=i,
|
||
tool=step.tool,
|
||
arguments=arguments,
|
||
status="success",
|
||
result=result,
|
||
execution_time=execution_time,
|
||
)
|
||
results.append(step_result)
|
||
|
||
# 收集数据
|
||
collected_data[f"step_{i+1}_{step.tool}"] = result
|
||
|
||
logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}")
|
||
|
||
execution_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
step_result = StepResult(
|
||
step_index=i,
|
||
tool=step.tool,
|
||
arguments=step.arguments,
|
||
status="failed",
|
||
error=str(e),
|
||
execution_time=execution_time,
|
||
)
|
||
results.append(step_result)
|
||
|
||
# 继续执行其他步骤
|
||
continue
|
||
|
||
logger.info(f"[Execution] 执行完成")
|
||
return results
|
||
|
||
async def generate_final_summary(
|
||
self,
|
||
user_query: str,
|
||
plan: ExecutionPlan,
|
||
step_results: List[StepResult],
|
||
) -> str:
|
||
"""阶段3: 使用 Kimi 生成最终总结"""
|
||
logger.info("[Summary] Kimi生成最终总结")
|
||
|
||
# 收集成功的结果
|
||
successful_results = [r for r in step_results if r.status == "success"]
|
||
|
||
if not successful_results:
|
||
return "很抱歉,所有步骤都执行失败,无法生成分析报告。"
|
||
|
||
# 构建结果文本(精简版)
|
||
results_text = "\n\n".join([
|
||
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
|
||
f"结果: {str(r.result)[:800]}..."
|
||
for r in successful_results[:3] # 只取前3个,避免超长
|
||
])
|
||
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
|
||
|
||
## 数据可视化能力
|
||
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
|
||
|
||
支持的图表类型:
|
||
- 折线图(line):适合时间序列数据(如股价走势、财务指标趋势)
|
||
- 柱状图(bar):适合对比数据(如不同年份的收入、利润对比)
|
||
- 饼图(pie):适合占比数据(如业务结构、资产分布)
|
||
|
||
### 图表格式(使用 Markdown 代码块)
|
||
在报告中插入图表时,使用以下格式:
|
||
|
||
```echarts
|
||
{
|
||
"title": {"text": "图表标题"},
|
||
"tooltip": {},
|
||
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
|
||
"yAxis": {"type": "value"},
|
||
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
|
||
}
|
||
```
|
||
|
||
### 示例
|
||
如果有股价数据,可以这样呈现:
|
||
|
||
**股价走势分析**
|
||
|
||
近30日股价呈现上涨趋势,最高达到1850元。
|
||
|
||
```echarts
|
||
{
|
||
"title": {"text": "近30日股价走势", "left": "center"},
|
||
"tooltip": {"trigger": "axis"},
|
||
"xAxis": {"type": "category", "data": ["2024-01-01", "2024-01-02", "2024-01-03"]},
|
||
"yAxis": {"type": "value", "name": "股价(元)"},
|
||
"series": [{"name": "收盘价", "type": "line", "data": [1800, 1820, 1850], "smooth": true}]
|
||
}
|
||
```
|
||
|
||
**重要提示**:
|
||
- ECharts 配置必须是合法的 JSON 格式
|
||
- 只在有明确数值数据时才生成图表
|
||
- 不要凭空捏造数据"""
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": f"""用户问题:{user_query}
|
||
|
||
执行计划:{plan.goal}
|
||
|
||
执行结果:
|
||
{results_text}
|
||
|
||
请生成专业的分析报告(500字以内)。如果结果中包含数值型数据,请使用 ECharts 图表进行可视化展示。"""
|
||
},
|
||
]
|
||
|
||
try:
|
||
response = self.kimi_client.chat.completions.create(
|
||
model="kimi-k2-turbo-preview", # 使用非思考模型,更快
|
||
messages=messages,
|
||
temperature=0.7,
|
||
max_tokens=2000, # 增加 token 限制以支持图表配置
|
||
)
|
||
|
||
summary = response.choices[0].message.content
|
||
logger.info("[Summary] 总结完成")
|
||
return summary
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Summary] 总结失败: {str(e)}")
|
||
# 降级:返回最后一步的结果
|
||
if successful_results:
|
||
last_result = successful_results[-1]
|
||
if isinstance(last_result.result, str):
|
||
return last_result.result
|
||
else:
|
||
return json.dumps(last_result.result, ensure_ascii=False, indent=2)
|
||
return "总结生成失败"
|
||
|
||
async def process_query(
|
||
self,
|
||
user_query: str,
|
||
tools: List[dict],
|
||
tool_handlers: Dict[str, Any],
|
||
) -> AgentResponse:
|
||
"""主流程(非流式)"""
|
||
logger.info(f"[Agent] 处理查询: {user_query}")
|
||
|
||
try:
|
||
# 阶段1: Kimi 制定计划
|
||
plan = await self.create_plan(user_query, tools)
|
||
|
||
# 阶段2: 执行工具
|
||
step_results = await self.execute_plan(plan, tool_handlers)
|
||
|
||
# 阶段3: Kimi 生成总结
|
||
final_summary = await self.generate_final_summary(
|
||
user_query, plan, step_results
|
||
)
|
||
|
||
return AgentResponse(
|
||
success=True,
|
||
message=final_summary,
|
||
plan=plan,
|
||
step_results=step_results,
|
||
final_summary=final_summary,
|
||
metadata={
|
||
"total_steps": len(plan.steps),
|
||
"successful_steps": len([r for r in step_results if r.status == "success"]),
|
||
"failed_steps": len([r for r in step_results if r.status == "failed"]),
|
||
"total_execution_time": sum(r.execution_time for r in step_results),
|
||
"model_used": {
|
||
"planning": self.kimi_model,
|
||
"summarization": "kimi-k2-turbo-preview",
|
||
"news_summary": self.deepmoney_model,
|
||
},
|
||
},
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Agent] 错误: {str(e)}", exc_info=True)
|
||
return AgentResponse(
|
||
success=False,
|
||
message=f"处理失败: {str(e)}",
|
||
)
|
||
|
||
async def process_query_stream(
|
||
self,
|
||
user_query: str,
|
||
tools: List[dict],
|
||
tool_handlers: Dict[str, Any],
|
||
) -> AsyncGenerator[str, None]:
|
||
"""主流程(流式输出)- 逐步返回执行结果"""
|
||
logger.info(f"[Agent Stream] 处理查询: {user_query}")
|
||
|
||
try:
|
||
# 发送开始事件
|
||
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
|
||
|
||
# 阶段1: Kimi 制定计划
|
||
yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."})
|
||
|
||
plan = await self.create_plan(user_query, tools)
|
||
|
||
# 发送计划
|
||
yield self._format_sse("plan", {
|
||
"goal": plan.goal,
|
||
"reasoning": plan.reasoning,
|
||
"steps": [
|
||
{"tool": step.tool, "arguments": step.arguments, "reason": step.reason}
|
||
for step in plan.steps
|
||
],
|
||
})
|
||
|
||
# 阶段2: 执行工具(逐步返回)
|
||
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"})
|
||
|
||
step_results = []
|
||
collected_data = {}
|
||
|
||
for i, step in enumerate(plan.steps):
|
||
# 发送步骤开始事件
|
||
yield self._format_sse("step_start", {
|
||
"step_index": i,
|
||
"tool": step.tool,
|
||
"arguments": step.arguments,
|
||
"reason": step.reason,
|
||
})
|
||
|
||
start_time = datetime.now()
|
||
|
||
try:
|
||
# 替换占位符
|
||
arguments = step.arguments.copy()
|
||
if step.tool == "summarize_news":
|
||
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
|
||
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
|
||
|
||
# 执行工具
|
||
result = await self.execute_tool(step.tool, arguments, tool_handlers)
|
||
execution_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
step_result = StepResult(
|
||
step_index=i,
|
||
tool=step.tool,
|
||
arguments=arguments,
|
||
status="success",
|
||
result=result,
|
||
execution_time=execution_time,
|
||
)
|
||
step_results.append(step_result)
|
||
collected_data[f"step_{i+1}_{step.tool}"] = result
|
||
|
||
# 发送步骤完成事件(包含结果)
|
||
yield self._format_sse("step_complete", {
|
||
"step_index": i,
|
||
"tool": step.tool,
|
||
"status": "success",
|
||
"result": result,
|
||
"execution_time": execution_time,
|
||
})
|
||
|
||
except Exception as e:
|
||
execution_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
step_result = StepResult(
|
||
step_index=i,
|
||
tool=step.tool,
|
||
arguments=step.arguments,
|
||
status="failed",
|
||
error=str(e),
|
||
execution_time=execution_time,
|
||
)
|
||
step_results.append(step_result)
|
||
|
||
# 发送步骤失败事件
|
||
yield self._format_sse("step_complete", {
|
||
"step_index": i,
|
||
"tool": step.tool,
|
||
"status": "failed",
|
||
"error": str(e),
|
||
"execution_time": execution_time,
|
||
})
|
||
|
||
# 阶段3: Kimi 生成总结
|
||
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
|
||
|
||
final_summary = await self.generate_final_summary(user_query, plan, step_results)
|
||
|
||
# 发送最终总结
|
||
yield self._format_sse("summary", {
|
||
"content": final_summary,
|
||
"metadata": {
|
||
"total_steps": len(plan.steps),
|
||
"successful_steps": len([r for r in step_results if r.status == "success"]),
|
||
"failed_steps": len([r for r in step_results if r.status == "failed"]),
|
||
"total_execution_time": sum(r.execution_time for r in step_results),
|
||
},
|
||
})
|
||
|
||
# 发送完成事件
|
||
yield self._format_sse("done", {"message": "处理完成"})
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)
|
||
yield self._format_sse("error", {"message": f"处理失败: {str(e)}"})
|
||
|
||
def _format_sse(self, event: str, data: dict) -> str:
|
||
"""格式化 SSE 消息"""
|
||
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
||
|
||
# 创建 Agent 实例(全局)
|
||
agent = MCPAgentIntegrated()
|
||
|
||
# ==================== Web聊天接口 ====================
|
||
|
||
class ChatMessage(BaseModel):
|
||
"""聊天消息"""
|
||
role: Literal["user", "assistant", "system"]
|
||
content: str
|
||
|
||
class ChatRequest(BaseModel):
|
||
"""聊天请求"""
|
||
messages: List[ChatMessage]
|
||
stream: bool = False
|
||
|
||
@app.post("/chat")
|
||
async def chat(request: ChatRequest):
|
||
"""
|
||
Web聊天接口
|
||
|
||
这是一个简化的接口,实际应该集成LLM API(如OpenAI、Claude等)
|
||
这里只是演示如何使用工具
|
||
"""
|
||
# TODO: 集成实际的LLM API
|
||
# 1. 将消息发送给LLM
|
||
# 2. LLM返回需要调用的工具
|
||
# 3. 调用工具并获取结果
|
||
# 4. 将工具结果返回给LLM
|
||
# 5. LLM生成最终回复
|
||
|
||
return {
|
||
"message": "Chat endpoint placeholder - integrate with your LLM provider",
|
||
"available_tools": len(TOOLS),
|
||
"hint": "Use POST /tools/call to invoke tools"
|
||
}
|
||
|
||
@app.post("/agent/chat", response_model=AgentResponse)
|
||
async def agent_chat(request: AgentChatRequest):
|
||
"""智能代理对话端点(非流式)"""
|
||
logger.info(f"Agent chat: {request.message} (user: {request.user_id})")
|
||
|
||
# ==================== 权限检查 ====================
|
||
# 订阅等级判断函数(与 app.py 保持一致)
|
||
def subscription_level(sub_type: str) -> int:
|
||
"""将订阅类型映射到等级数值,free=0, pro=1, max=2"""
|
||
mapping = {'free': 0, 'pro': 1, 'max': 2}
|
||
return mapping.get((sub_type or 'free').lower(), 0)
|
||
|
||
# 获取用户订阅类型(默认为 free)
|
||
user_subscription = (request.subscription_type or 'free').lower()
|
||
required_level = 'max'
|
||
|
||
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
|
||
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
|
||
|
||
if not has_access:
|
||
logger.warning(
|
||
f"权限检查失败 - user_id: {request.user_id}, "
|
||
f"nickname: {request.user_nickname}, "
|
||
f"subscription_type: {user_subscription}, "
|
||
f"required: {required_level}"
|
||
)
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
|
||
)
|
||
|
||
logger.info(
|
||
f"权限检查通过 - user_id: {request.user_id}, "
|
||
f"nickname: {request.user_nickname}, "
|
||
f"subscription_type: {user_subscription}"
|
||
)
|
||
|
||
# ==================== 会话管理 ====================
|
||
# 如果没有提供 session_id,创建新会话
|
||
session_id = request.session_id or str(uuid.uuid4())
|
||
|
||
# 保存用户消息到 ES
|
||
try:
|
||
es_client.save_chat_message(
|
||
session_id=session_id,
|
||
user_id=request.user_id or "anonymous",
|
||
user_nickname=request.user_nickname or "匿名用户",
|
||
user_avatar=request.user_avatar or "",
|
||
message_type="user",
|
||
message=request.message,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"保存用户消息失败: {e}")
|
||
|
||
# 获取工具列表
|
||
tools = [tool.dict() for tool in TOOLS]
|
||
|
||
# 添加特殊工具:summarize_news
|
||
tools.append({
|
||
"name": "summarize_news",
|
||
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"data": {
|
||
"type": "string",
|
||
"description": "要总结的新闻数据(JSON格式)"
|
||
},
|
||
"focus": {
|
||
"type": "string",
|
||
"description": "关注点,例如:'市场影响'、'投资机会'等"
|
||
}
|
||
},
|
||
"required": ["data"]
|
||
}
|
||
})
|
||
|
||
# 处理查询
|
||
response = await agent.process_query(
|
||
user_query=request.message,
|
||
tools=tools,
|
||
tool_handlers=TOOL_HANDLERS,
|
||
)
|
||
|
||
# 保存 Agent 回复到 ES
|
||
try:
|
||
# 将执行步骤转换为JSON字符串
|
||
steps_json = json.dumps(
|
||
[{"tool": step.tool, "status": step.status, "result": step.result} for step in response.step_results],
|
||
ensure_ascii=False
|
||
)
|
||
|
||
# 将 plan 转换为 JSON 字符串(ES 中 plan 字段是 text 类型)
|
||
plan_json = json.dumps(response.plan.dict(), ensure_ascii=False) if response.plan else None
|
||
|
||
es_client.save_chat_message(
|
||
session_id=session_id,
|
||
user_id=request.user_id or "anonymous",
|
||
user_nickname=request.user_nickname or "匿名用户",
|
||
user_avatar=request.user_avatar or "",
|
||
message_type="assistant",
|
||
message=response.final_summary, # 使用 final_summary 而不是 final_answer
|
||
plan=plan_json, # 传递 JSON 字符串而不是字典
|
||
steps=steps_json,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"保存 Agent 回复失败: {e}", exc_info=True)
|
||
|
||
# 在响应中返回 session_id
|
||
response_dict = response.dict()
|
||
response_dict["session_id"] = session_id
|
||
return response_dict
|
||
|
||
@app.post("/agent/chat/stream")
|
||
async def agent_chat_stream(request: AgentChatRequest):
|
||
"""智能代理对话端点(流式 SSE)"""
|
||
logger.info(f"Agent chat stream: {request.message}")
|
||
|
||
# ==================== 权限检查 ====================
|
||
# 订阅等级判断函数(与 app.py 保持一致)
|
||
def subscription_level(sub_type: str) -> int:
|
||
"""将订阅类型映射到等级数值,free=0, pro=1, max=2"""
|
||
mapping = {'free': 0, 'pro': 1, 'max': 2}
|
||
return mapping.get((sub_type or 'free').lower(), 0)
|
||
|
||
# 获取用户订阅类型(默认为 free)
|
||
user_subscription = (request.subscription_type or 'free').lower()
|
||
required_level = 'max'
|
||
|
||
# 权限检查:仅允许 max 用户访问(与传导链分析权限保持一致)
|
||
has_access = subscription_level(user_subscription) >= subscription_level(required_level)
|
||
|
||
if not has_access:
|
||
logger.warning(
|
||
f"[Stream] 权限检查失败 - user_id: {request.user_id}, "
|
||
f"nickname: {request.user_nickname}, "
|
||
f"subscription_type: {user_subscription}, "
|
||
f"required: {required_level}"
|
||
)
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="很抱歉,「价小前投研」功能仅对 Max 订阅用户开放。请升级您的订阅以使用此功能。"
|
||
)
|
||
|
||
logger.info(
|
||
f"[Stream] 权限检查通过 - user_id: {request.user_id}, "
|
||
f"nickname: {request.user_nickname}, "
|
||
f"subscription_type: {user_subscription}"
|
||
)
|
||
|
||
# 获取工具列表
|
||
tools = [tool.dict() for tool in TOOLS]
|
||
|
||
# 添加特殊工具:summarize_news
|
||
tools.append({
|
||
"name": "summarize_news",
|
||
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"data": {
|
||
"type": "string",
|
||
"description": "要总结的新闻数据(JSON格式)"
|
||
},
|
||
"focus": {
|
||
"type": "string",
|
||
"description": "关注点,例如:'市场影响'、'投资机会'等"
|
||
}
|
||
},
|
||
"required": ["data"]
|
||
}
|
||
})
|
||
|
||
# 返回流式响应
|
||
return StreamingResponse(
|
||
agent.process_query_stream(
|
||
user_query=request.message,
|
||
tools=tools,
|
||
tool_handlers=TOOL_HANDLERS,
|
||
),
|
||
media_type="text/event-stream",
|
||
headers={
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
|
||
},
|
||
)
|
||
|
||
# ==================== 聊天记录管理 API ====================
|
||
|
||
@app.get("/agent/sessions")
|
||
async def get_chat_sessions(user_id: str, limit: int = 50):
|
||
"""
|
||
获取用户的聊天会话列表
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
limit: 返回数量(默认50)
|
||
|
||
Returns:
|
||
会话列表
|
||
"""
|
||
try:
|
||
sessions = es_client.get_chat_sessions(user_id, limit)
|
||
return {
|
||
"success": True,
|
||
"data": sessions,
|
||
"count": len(sessions)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取会话列表失败: {e}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@app.get("/agent/history/{session_id}")
|
||
async def get_chat_history(session_id: str, limit: int = 100):
|
||
"""
|
||
获取指定会话的聊天历史
|
||
|
||
Args:
|
||
session_id: 会话ID
|
||
limit: 返回数量(默认100)
|
||
|
||
Returns:
|
||
聊天记录列表
|
||
"""
|
||
try:
|
||
messages = es_client.get_chat_history(session_id, limit)
|
||
return {
|
||
"success": True,
|
||
"data": messages,
|
||
"count": len(messages)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取聊天历史失败: {e}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@app.post("/agent/search")
|
||
async def search_chat_history(user_id: str, query: str, top_k: int = 10):
|
||
"""
|
||
向量搜索聊天历史
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
query: 查询文本
|
||
top_k: 返回数量(默认10)
|
||
|
||
Returns:
|
||
相关聊天记录列表
|
||
"""
|
||
try:
|
||
results = es_client.search_chat_history(user_id, query, top_k)
|
||
return {
|
||
"success": True,
|
||
"data": results,
|
||
"count": len(results)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"向量搜索失败: {e}")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
# ==================== 健康检查 ====================
|
||
|
||
@app.get("/health")
|
||
async def health_check():
|
||
"""健康检查"""
|
||
# 检查各个后端服务的健康状态
|
||
services_status = {}
|
||
|
||
try:
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.NEWS_API}/search_news?query=test&top_k=1", timeout=5.0)
|
||
services_status["news_api"] = "healthy" if response.status_code == 200 else "unhealthy"
|
||
except:
|
||
services_status["news_api"] = "unhealthy"
|
||
|
||
try:
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.CONCEPT_API}/", timeout=5.0)
|
||
services_status["concept_api"] = "healthy" if response.status_code == 200 else "unhealthy"
|
||
except:
|
||
services_status["concept_api"] = "unhealthy"
|
||
|
||
try:
|
||
response = await HTTP_CLIENT.get(f"{ServiceEndpoints.STOCK_ANALYSIS_API}/api/v1/health", timeout=5.0)
|
||
services_status["stock_analysis_api"] = "healthy" if response.status_code == 200 else "unhealthy"
|
||
except:
|
||
services_status["stock_analysis_api"] = "unhealthy"
|
||
|
||
return {
|
||
"status": "healthy",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"services": services_status
|
||
}
|
||
|
||
# ==================== 错误处理 ====================
|
||
|
||
@app.exception_handler(HTTPException)
|
||
async def http_exception_handler(request: Request, exc: HTTPException):
|
||
"""HTTP异常处理"""
|
||
return JSONResponse(
|
||
status_code=exc.status_code,
|
||
content={
|
||
"success": False,
|
||
"error": exc.detail,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
)
|
||
|
||
@app.exception_handler(Exception)
|
||
async def general_exception_handler(request: Request, exc: Exception):
|
||
"""通用异常处理"""
|
||
logger.error(f"Unexpected error: {str(exc)}", exc_info=True)
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"success": False,
|
||
"error": "Internal server error",
|
||
"detail": str(exc),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
)
|
||
|
||
# ==================== 应用启动/关闭 ====================
|
||
|
||
@app.on_event("startup")
|
||
async def startup_event():
|
||
"""应用启动"""
|
||
logger.info("MCP Server starting up...")
|
||
logger.info(f"Registered {len(TOOLS)} tools")
|
||
# 初始化数据库连接池
|
||
try:
|
||
await db.get_pool()
|
||
logger.info("MySQL connection pool initialized")
|
||
except Exception as e:
|
||
logger.error(f"Failed to initialize MySQL pool: {e}")
|
||
|
||
@app.on_event("shutdown")
|
||
async def shutdown_event():
|
||
"""应用关闭"""
|
||
logger.info("MCP Server shutting down...")
|
||
await HTTP_CLIENT.aclose()
|
||
# 关闭数据库连接池
|
||
try:
|
||
await db.close_pool()
|
||
logger.info("MySQL connection pool closed")
|
||
except Exception as e:
|
||
logger.error(f"Failed to close MySQL pool: {e}")
|
||
|
||
# ==================== 主程序 ====================
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
|
||
uvicorn.run(
|
||
"mcp_server:app",
|
||
host="0.0.0.0",
|
||
port=8900,
|
||
reload=True,
|
||
log_level="info"
|
||
)
|