agent功能开发增加MCP后端

This commit is contained in:
2025-11-07 19:55:05 +08:00
parent 463ca7cf60
commit 4dc27a35ff
5 changed files with 0 additions and 1866 deletions

View File

@@ -1,361 +0,0 @@
"""
Kimi API 集成示例
演示如何将MCP工具与Kimi大模型结合使用
"""
from openai import OpenAI
import json
from typing import List, Dict, Any
from mcp_client_example import MCPClient
# Kimi API配置
KIMI_API_KEY = "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5"
KIMI_BASE_URL = "https://api.moonshot.cn/v1"
KIMI_MODEL = "kimi-k2-turbpreview"
# 初始化Kimi客户端
kimi_client = OpenAI(
api_key=KIMI_API_KEY,
base_url=KIMI_BASE_URL,
)
# 初始化MCP客户端
mcp_client = MCPClient()
def convert_mcp_tools_to_kimi_format() -> tuple[List[Dict], Dict]:
"""
将MCP工具转换为Kimi API的tools格式
Returns:
tools: Kimi格式的工具列表
tool_map: 工具名称到执行函数的映射
"""
# 获取所有MCP工具
mcp_tools_response = mcp_client.list_tools()
mcp_tools = mcp_tools_response["tools"]
# 转换为Kimi格式
kimi_tools = []
tool_map = {}
for tool in mcp_tools:
# Kimi工具格式
kimi_tool = {
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
}
kimi_tools.append(kimi_tool)
# 创建工具执行函数
tool_name = tool["name"]
tool_map[tool_name] = lambda args, name=tool_name: execute_mcp_tool(name, args)
return kimi_tools, tool_map
def execute_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
执行MCP工具
Args:
tool_name: 工具名称
arguments: 工具参数
Returns:
工具执行结果
"""
print(f"[工具调用] {tool_name}")
print(f"[参数] {json.dumps(arguments, ensure_ascii=False, indent=2)}")
result = mcp_client.call_tool(tool_name, arguments)
print(f"[结果] 成功: {result.get('success', False)}")
return result
def chat_with_kimi(user_message: str, verbose: bool = True) -> str:
"""
与Kimi进行对话支持工具调用
Args:
user_message: 用户消息
verbose: 是否打印详细信息
Returns:
Kimi的回复
"""
# 获取Kimi格式的工具
tools, tool_map = convert_mcp_tools_to_kimi_format()
if verbose:
print(f"\n{'='*60}")
print(f"加载了 {len(tools)} 个工具")
print(f"{'='*60}\n")
# 初始化对话
messages = [
{
"role": "system",
"content": """你是一个专业的金融数据分析助手,由 Moonshot AI 提供支持。
你可以使用各种工具来帮助用户查询和分析金融数据,包括:
- 新闻搜索(全球新闻、中国新闻、医疗新闻)
- 公司研究(路演信息、研究报告)
- 概念板块分析
- 股票分析(涨停分析、财务数据、交易数据)
- 财务报表(资产负债表、现金流量表)
请根据用户的问题,选择合适的工具来获取信息,并提供专业的分析和建议。"""
},
{
"role": "user",
"content": user_message
}
]
if verbose:
print(f"[用户]: {user_message}\n")
# 对话循环,处理工具调用
finish_reason = None
iteration = 0
max_iterations = 10 # 防止无限循环
while finish_reason is None or finish_reason == "tool_calls":
iteration += 1
if iteration > max_iterations:
print("[警告] 达到最大迭代次数")
break
if verbose and iteration > 1:
print(f"\n[轮次 {iteration}]")
# 调用Kimi API
completion = kimi_client.chat.completions.create(
model=KIMI_MODEL,
messages=messages,
temperature=0.6, # Kimi推荐的temperature值
tools=tools,
)
choice = completion.choices[0]
finish_reason = choice.finish_reason
if verbose:
print(f"[Kimi] finish_reason: {finish_reason}")
# 处理工具调用
if finish_reason == "tool_calls":
# 将Kimi的消息添加到上下文
messages.append(choice.message)
# 执行每个工具调用
for tool_call in choice.message.tool_calls:
tool_name = tool_call.function.name
tool_arguments = json.loads(tool_call.function.arguments)
# 执行工具
tool_result = tool_map[tool_name](tool_arguments)
# 将工具结果添加到消息中
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_name,
"content": json.dumps(tool_result, ensure_ascii=False),
})
if verbose:
print() # 空行分隔
# 返回最终回复
final_response = choice.message.content
if verbose:
print(f"\n[Kimi]: {final_response}\n")
print(f"{'='*60}")
return final_response
def demo_simple_query():
"""演示1: 简单查询"""
print("\n" + "="*60)
print("演示1: 简单新闻查询")
print("="*60)
response = chat_with_kimi("帮我查找关于人工智能的最新新闻")
return response
def demo_stock_analysis():
"""演示2: 股票分析"""
print("\n" + "="*60)
print("演示2: 股票财务分析")
print("="*60)
response = chat_with_kimi("帮我分析贵州茅台600519的财务状况")
return response
def demo_concept_research():
"""演示3: 概念研究"""
print("\n" + "="*60)
print("演示3: 概念板块研究")
print("="*60)
response = chat_with_kimi("查找新能源汽车相关的概念板块,并告诉我涨幅最高的是哪些")
return response
def demo_industry_comparison():
"""演示4: 行业对比"""
print("\n" + "="*60)
print("演示4: 行业内股票对比")
print("="*60)
response = chat_with_kimi("帮我找出半导体行业的龙头股票,并对比它们的财务指标")
return response
def demo_comprehensive_analysis():
"""演示5: 综合分析"""
print("\n" + "="*60)
print("演示5: 综合分析")
print("="*60)
response = chat_with_kimi("""
我想投资白酒行业,请帮我:
1. 搜索白酒行业的主要上市公司
2. 对比贵州茅台和五粮液的财务数据
3. 查看最近的行业新闻
4. 给出投资建议
""")
return response
def interactive_chat():
"""交互式对话"""
print("\n" + "="*60)
print("Kimi 金融助手 - 交互模式")
print("="*60)
print("提示:输入 'quit''exit' 退出")
print("="*60 + "\n")
while True:
try:
user_input = input("你: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', '退出']:
print("\n再见!")
break
response = chat_with_kimi(user_input)
except KeyboardInterrupt:
print("\n\n再见!")
break
except Exception as e:
print(f"\n[错误] {str(e)}\n")
def test_kimi_connection():
"""测试Kimi API连接"""
print("\n" + "="*60)
print("测试 Kimi API 连接")
print("="*60 + "\n")
try:
# 简单的测试请求
response = kimi_client.chat.completions.create(
model=KIMI_MODEL,
messages=[
{"role": "user", "content": "你好,请介绍一下你自己"}
],
temperature=0.6
)
print("[✓] 连接成功!")
print(f"[✓] 模型: {KIMI_MODEL}")
print(f"[✓] 回复: {response.choices[0].message.content}\n")
return True
except Exception as e:
print(f"[✗] 连接失败: {str(e)}\n")
return False
def show_available_tools():
"""显示所有可用工具"""
print("\n" + "="*60)
print("可用工具列表")
print("="*60 + "\n")
tools, _ = convert_mcp_tools_to_kimi_format()
for i, tool in enumerate(tools, 1):
func = tool["function"]
print(f"{i}. {func['name']}")
print(f" 描述: {func['description'][:80]}...")
print()
print(f"总计: {len(tools)} 个工具\n")
if __name__ == "__main__":
import sys
# 首先测试连接
if not test_kimi_connection():
print("请检查API Key和网络连接")
sys.exit(1)
# 显示可用工具
show_available_tools()
# 运行演示
print("\n选择运行模式:")
print("1. 简单查询演示")
print("2. 股票分析演示")
print("3. 概念研究演示")
print("4. 行业对比演示")
print("5. 综合分析演示")
print("6. 交互式对话")
print("7. 运行所有演示")
try:
choice = input("\n请选择 (1-7): ").strip()
if choice == "1":
demo_simple_query()
elif choice == "2":
demo_stock_analysis()
elif choice == "3":
demo_concept_research()
elif choice == "4":
demo_industry_comparison()
elif choice == "5":
demo_comprehensive_analysis()
elif choice == "6":
interactive_chat()
elif choice == "7":
demo_simple_query()
demo_stock_analysis()
demo_concept_research()
demo_industry_comparison()
demo_comprehensive_analysis()
else:
print("无效选择")
except KeyboardInterrupt:
print("\n\n程序已退出")
finally:
mcp_client.close()

View File

@@ -1,470 +0,0 @@
"""
MCP Agent System - 基于 DeepResearch 逻辑的智能代理系统
三阶段流程:计划制定 → 工具执行 → 结果总结
"""
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Literal
from datetime import datetime
import json
import logging
from openai import OpenAI
import asyncio
import os
logger = logging.getLogger(__name__)
# ==================== 数据模型 ====================
class ToolCall(BaseModel):
"""工具调用"""
tool: str
arguments: Dict[str, Any]
reason: str # 为什么要调用这个工具
class ExecutionPlan(BaseModel):
"""执行计划"""
goal: str # 用户的目标
steps: List[ToolCall] # 执行步骤
reasoning: str # 规划reasoning
class StepResult(BaseModel):
"""单步执行结果"""
step_index: int
tool: str
arguments: Dict[str, Any]
status: Literal["success", "failed", "skipped"]
result: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0
class AgentResponse(BaseModel):
"""Agent响应"""
success: bool
message: str # 自然语言总结
plan: Optional[ExecutionPlan] = None # 执行计划
step_results: List[StepResult] = [] # 每步的结果
final_summary: Optional[str] = None # 最终总结
metadata: Optional[Dict[str, Any]] = None
class ChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_history: List[Dict[str, str]] = []
stream: bool = False # 是否流式输出
# ==================== Agent 系统 ====================
class MCPAgent:
"""MCP 智能代理 - 三阶段执行"""
def __init__(self, provider: str = "qwen"):
self.provider = provider
# LLM 配置
config = {
"qwen": {
"api_key": os.getenv("DASHSCOPE_API_KEY", ""),
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"model": "qwen-plus",
},
"deepseek": {
"api_key": os.getenv("DEEPSEEK_API_KEY", ""),
"base_url": "https://api.deepseek.com/v1",
"model": "deepseek-chat",
},
"openai": {
"api_key": os.getenv("OPENAI_API_KEY", ""),
"base_url": "https://api.openai.com/v1",
"model": "gpt-4o-mini",
},
}.get(provider)
if not config or not config["api_key"]:
raise ValueError(f"Provider '{provider}' not configured. Please set API key.")
self.client = OpenAI(
api_key=config["api_key"],
base_url=config["base_url"],
)
self.model = config["model"]
# ==================== 阶段 1: 计划制定 ====================
def get_planning_prompt(self, tools: List[dict]) -> str:
"""获取计划制定的系统提示词"""
tools_desc = "\n\n".join([
f"**{tool['name']}**\n"
f"描述:{tool['description']}\n"
f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
for tool in tools
])
return f"""你是一个专业的金融研究助手。你需要根据用户的问题,制定一个详细的执行计划。
## 可用工具
{tools_desc}
## 重要知识
- 贵州茅台股票代码: 600519
- 涨停: 股价单日涨幅约10%
- 概念板块: 相同题材的股票分类
## 特殊工具说明
- **summarize_with_llm**: 这是一个特殊工具,用于让你总结和分析收集到的数据
- 当需要对多个数据源进行综合分析时使用
- 当需要生成研究报告时使用
- 参数: {{"data": "要分析的数据", "task": "分析任务描述"}}
## 任务
分析用户问题,制定执行计划。返回 JSON 格式:
```json
{{
"goal": "用户的目标(一句话概括)",
"reasoning": "你的分析思路(为什么这样规划)",
"steps": [
{{
"tool": "工具名称",
"arguments": {{"参数名": "参数值"}},
"reason": "为什么要执行这一步"
}}
]
}}
```
## 规划原则
1. **从简到繁**: 先获取基础信息,再深入分析
2. **数据先行**: 先收集数据,再总结分析
3. **合理组合**: 可以调用多个工具但不要超过5个
4. **包含总结**: 最后一步通常是 summarize_with_llm
## 示例
用户:"帮我全面分析一下贵州茅台这只股票"
你的计划:
```json
{{
"goal": "全面分析贵州茅台股票",
"reasoning": "需要获取基本信息、财务指标、交易数据,然后综合分析",
"steps": [
{{
"tool": "get_stock_basic_info",
"arguments": {{"seccode": "600519"}},
"reason": "获取股票基本信息(公司名称、行业、市值等)"
}},
{{
"tool": "get_stock_financial_index",
"arguments": {{"seccode": "600519", "limit": 5}},
"reason": "获取最近5期财务指标营收、利润、ROE等"
}},
{{
"tool": "get_stock_trade_data",
"arguments": {{"seccode": "600519", "limit": 30}},
"reason": "获取最近30天交易数据价格走势、成交量"
}},
{{
"tool": "search_china_news",
"arguments": {{"query": "贵州茅台", "top_k": 5}},
"reason": "获取最新新闻,了解市场动态"
}},
{{
"tool": "summarize_with_llm",
"arguments": {{
"data": "前面收集的所有数据",
"task": "综合分析贵州茅台的投资价值,包括基本面、财务状况、股价走势、市场情绪"
}},
"reason": "综合所有数据,生成投资分析报告"
}}
]
}}
```
只返回JSON不要额外解释。"""
async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan:
"""阶段1: 创建执行计划"""
logger.info(f"[Planning] Creating plan for: {user_query}")
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "user", "content": user_query},
]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3,
max_tokens=1500,
)
plan_json = response.choices[0].message.content.strip()
logger.info(f"[Planning] Raw response: {plan_json}")
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", ""),
steps=[
ToolCall(**step) for step in plan_data["steps"]
],
)
logger.info(f"[Planning] Plan created: {len(plan.steps)} steps")
return plan
# ==================== 阶段 2: 工具执行 ====================
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
tool_handlers: Dict[str, Any],
) -> Dict[str, Any]:
"""执行单个工具"""
# 特殊处理summarize_with_llm
if tool_name == "summarize_with_llm":
return await self.summarize_with_llm(
data=arguments.get("data", ""),
task=arguments.get("task", "总结数据"),
)
# 调用 MCP 工具
handler = tool_handlers.get(tool_name)
if not handler:
raise ValueError(f"Tool '{tool_name}' not found")
result = await handler(arguments)
return result
async def execute_plan(
self,
plan: ExecutionPlan,
tool_handlers: Dict[str, Any],
) -> List[StepResult]:
"""阶段2: 执行计划中的所有步骤"""
logger.info(f"[Execution] Starting execution: {len(plan.steps)} steps")
results = []
collected_data = {} # 收集的数据,供后续步骤使用
for i, step in enumerate(plan.steps):
logger.info(f"[Execution] Step {i+1}/{len(plan.steps)}: {step.tool}")
start_time = datetime.now()
try:
# 替换 arguments 中的占位符
arguments = step.arguments.copy()
if step.tool == "summarize_with_llm" and arguments.get("data") == "前面收集的所有数据":
# 将收集的数据传递给总结工具
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
# 保存结果
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
results.append(step_result)
# 收集数据
collected_data[f"step_{i+1}_{step.tool}"] = result
logger.info(f"[Execution] Step {i+1} completed in {execution_time:.2f}s")
except Exception as e:
logger.error(f"[Execution] Step {i+1} failed: {str(e)}")
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
results.append(step_result)
# 根据错误类型决定是否继续
if "not found" in str(e).lower():
logger.warning(f"[Execution] Stopping due to critical error")
break
else:
logger.warning(f"[Execution] Continuing despite error")
continue
logger.info(f"[Execution] Execution completed: {len(results)} steps")
return results
async def summarize_with_llm(self, data: str, task: str) -> str:
"""特殊工具:使用 LLM 总结数据"""
logger.info(f"[LLM Summary] Task: {task}")
messages = [
{
"role": "system",
"content": "你是一个专业的金融分析师。根据提供的数据,完成指定的分析任务。"
},
{
"role": "user",
"content": f"## 任务\n{task}\n\n## 数据\n{data}\n\n请根据数据完成分析任务,用专业且易懂的语言呈现。"
},
]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=2000,
)
summary = response.choices[0].message.content
return summary
# ==================== 阶段 3: 结果总结 ====================
async def generate_final_summary(
self,
user_query: str,
plan: ExecutionPlan,
step_results: List[StepResult],
) -> str:
"""阶段3: 生成最终总结"""
logger.info("[Summary] Generating final summary")
# 收集所有成功的结果
successful_results = [r for r in step_results if r.status == "success"]
if not successful_results:
return "很抱歉,所有步骤都执行失败,无法生成分析报告。"
# 构建总结提示
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {json.dumps(r.result, ensure_ascii=False, indent=2)[:1000]}..."
for r in successful_results
])
messages = [
{
"role": "system",
"content": "你是一个专业的金融研究助手。根据执行结果,生成一份简洁清晰的报告。"
},
{
"role": "user",
"content": f"""
用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请根据以上信息生成一份专业的分析报告300字以内
"""
},
]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.7,
max_tokens=1000,
)
summary = response.choices[0].message.content
logger.info("[Summary] Final summary generated")
return summary
# ==================== 主流程 ====================
async def process_query(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
) -> AgentResponse:
"""主流程:处理用户查询"""
logger.info(f"[Agent] Processing query: {user_query}")
try:
# 阶段 1: 创建计划
plan = await self.create_plan(user_query, tools)
# 阶段 2: 执行计划
step_results = await self.execute_plan(plan, tool_handlers)
# 阶段 3: 生成总结
final_summary = await self.generate_final_summary(
user_query, plan, step_results
)
return AgentResponse(
success=True,
message=final_summary,
plan=plan,
step_results=step_results,
final_summary=final_summary,
metadata={
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
},
)
except Exception as e:
logger.error(f"[Agent] Error: {str(e)}", exc_info=True)
return AgentResponse(
success=False,
message=f"处理失败: {str(e)}",
)
# ==================== FastAPI 端点 ====================
"""
在 mcp_server.py 中添加:
from mcp_agent_system import MCPAgent, ChatRequest, AgentResponse
# 创建 Agent 实例
agent = MCPAgent(provider="qwen")
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(request: ChatRequest):
\"\"\"智能代理对话端点\"\"\"
logger.info(f"Agent chat: {request.message}")
# 获取工具列表和处理器
tools = [tool.dict() for tool in TOOLS]
# 处理查询
response = await agent.process_query(
user_query=request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
)
return response
"""

View File

@@ -1,295 +0,0 @@
"""
MCP Chat Endpoint - 添加到 mcp_server.py
集成LLM实现智能对话自动调用MCP工具并总结结果
"""
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import os
import json
from openai import OpenAI
import logging
logger = logging.getLogger(__name__)
# ==================== LLM配置 ====================
# 支持多种LLM提供商
LLM_PROVIDERS = {
"openai": {
"api_key": os.getenv("OPENAI_API_KEY", ""),
"base_url": "https://api.openai.com/v1",
"model": "gpt-4o-mini", # 便宜且快速
},
"qwen": {
"api_key": os.getenv("DASHSCOPE_API_KEY", ""),
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"model": "qwen-plus",
},
"deepseek": {
"api_key": os.getenv("DEEPSEEK_API_KEY", ""),
"base_url": "https://api.deepseek.com/v1",
"model": "deepseek-chat",
},
}
# 默认使用的LLM提供商
DEFAULT_PROVIDER = "qwen" # 推荐使用通义千问,价格便宜
# ==================== 数据模型 ====================
class Message(BaseModel):
"""消息"""
role: str # system, user, assistant
content: str
class ChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_history: List[Dict[str, str]] = []
provider: Optional[str] = DEFAULT_PROVIDER
class ChatResponse(BaseModel):
"""聊天响应"""
success: bool
message: str
tool_used: Optional[str] = None
raw_data: Optional[Any] = None
error: Optional[str] = None
# ==================== LLM助手类 ====================
class MCPChatAssistant:
"""MCP聊天助手 - 集成LLM和工具调用"""
def __init__(self, provider: str = DEFAULT_PROVIDER):
self.provider = provider
config = LLM_PROVIDERS.get(provider)
if not config or not config["api_key"]:
logger.warning(f"LLM provider '{provider}' not configured, using fallback mode")
self.client = None
else:
self.client = OpenAI(
api_key=config["api_key"],
base_url=config["base_url"],
)
self.model = config["model"]
def get_system_prompt(self, tools: List[dict]) -> str:
"""构建系统提示词"""
tools_desc = "\n\n".join([
f"**{tool['name']}**\n描述:{tool['description']}\n参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
for tool in tools
])
return f"""你是一个专业的金融投资助手。你可以使用以下工具来帮助用户查询信息:
{tools_desc}
## 工作流程
1. **理解用户意图**:分析用户问题,确定需要什么信息
2. **选择工具**:从上面的工具中选择最合适的一个或多个
3. **提取参数**:从用户输入中提取工具需要的参数
4. **返回工具调用指令**JSON格式
{{"tool": "工具名", "arguments": {{...}}}}
## 重要规则
- 贵州茅台的股票代码是 **600519**
- 如果用户提到股票名称,尝试推断股票代码
- 如果不确定需要什么信息,使用 search_china_news 搜索相关新闻
- 涨停是指股票当日涨幅达到10%左右
- 只返回工具调用指令,不要额外解释
## 示例
用户:"查询贵州茅台的股票信息"
你:{{"tool": "get_stock_basic_info", "arguments": {{"seccode": "600519"}}}}
用户:"今日涨停的股票有哪些"
你:{{"tool": "search_limit_up_stocks", "arguments": {{"query": "", "mode": "hybrid", "page_size": 10}}}}
用户:"新能源概念板块表现如何"
你:{{"tool": "search_concepts", "arguments": {{"query": "新能源", "size": 10, "sort_by": "change_pct"}}}}
"""
async def chat(self, user_message: str, conversation_history: List[Dict[str, str]], tools: List[dict]) -> ChatResponse:
"""智能对话"""
try:
if not self.client:
# 降级到简单匹配
return await self.fallback_chat(user_message)
# 1. 构建消息历史
messages = [
{"role": "system", "content": self.get_system_prompt(tools)},
]
# 添加历史对话最多保留最近10轮
for msg in conversation_history[-20:]:
messages.append({
"role": "user" if msg.get("isUser") else "assistant",
"content": msg.get("content", ""),
})
messages.append({"role": "user", "content": user_message})
# 2. 调用LLM获取工具调用指令
logger.info(f"Calling LLM with {len(messages)} messages")
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3, # 低温度,更确定性
max_tokens=500,
)
tool_call_instruction = response.choices[0].message.content.strip()
logger.info(f"LLM response: {tool_call_instruction}")
# 3. 解析工具调用指令
try:
tool_call = json.loads(tool_call_instruction)
tool_name = tool_call.get("tool")
tool_args = tool_call.get("arguments", {})
if not tool_name:
raise ValueError("No tool specified")
# 4. 调用工具(这里需要导入 mcp_server 的工具处理器)
from mcp_server import TOOL_HANDLERS
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
raise ValueError(f"Tool '{tool_name}' not found")
tool_result = await handler(tool_args)
# 5. 让LLM总结结果
summary_messages = messages + [
{"role": "assistant", "content": tool_call_instruction},
{"role": "system", "content": f"工具 {tool_name} 返回的数据:\n{json.dumps(tool_result, ensure_ascii=False, indent=2)}\n\n请用自然语言总结这些数据给用户一个简洁清晰的回复不超过200字"}
]
summary_response = self.client.chat.completions.create(
model=self.model,
messages=summary_messages,
temperature=0.7,
max_tokens=300,
)
summary = summary_response.choices[0].message.content
return ChatResponse(
success=True,
message=summary,
tool_used=tool_name,
raw_data=tool_result,
)
except json.JSONDecodeError:
# LLM没有返回JSON格式直接返回其回复
return ChatResponse(
success=True,
message=tool_call_instruction,
)
except Exception as tool_error:
logger.error(f"Tool execution error: {str(tool_error)}")
return ChatResponse(
success=False,
message="工具调用失败",
error=str(tool_error),
)
except Exception as e:
logger.error(f"Chat error: {str(e)}", exc_info=True)
return ChatResponse(
success=False,
message="对话处理失败",
error=str(e),
)
async def fallback_chat(self, user_message: str) -> ChatResponse:
"""降级方案:简单关键词匹配"""
from mcp_server import TOOL_HANDLERS
try:
# 茅台特殊处理
if "茅台" in user_message or "贵州茅台" in user_message:
handler = TOOL_HANDLERS.get("get_stock_basic_info")
result = await handler({"seccode": "600519"})
return ChatResponse(
success=True,
message="已为您查询贵州茅台(600519)的股票信息:",
tool_used="get_stock_basic_info",
raw_data=result,
)
# 涨停分析
elif "涨停" in user_message:
handler = TOOL_HANDLERS.get("search_limit_up_stocks")
query = user_message.replace("涨停", "").strip()
result = await handler({"query": query, "mode": "hybrid", "page_size": 10})
return ChatResponse(
success=True,
message="已为您查询涨停股票信息:",
tool_used="search_limit_up_stocks",
raw_data=result,
)
# 概念板块
elif "概念" in user_message or "板块" in user_message:
handler = TOOL_HANDLERS.get("search_concepts")
query = user_message.replace("概念", "").replace("板块", "").strip()
result = await handler({"query": query, "size": 10, "sort_by": "change_pct"})
return ChatResponse(
success=True,
message=f"已为您查询'{query}'相关概念板块:",
tool_used="search_concepts",
raw_data=result,
)
# 默认:搜索新闻
else:
handler = TOOL_HANDLERS.get("search_china_news")
result = await handler({"query": user_message, "top_k": 5})
return ChatResponse(
success=True,
message="已为您搜索相关新闻:",
tool_used="search_china_news",
raw_data=result,
)
except Exception as e:
logger.error(f"Fallback chat error: {str(e)}")
return ChatResponse(
success=False,
message="查询失败",
error=str(e),
)
# ==================== FastAPI端点 ====================
# 在 mcp_server.py 中添加以下代码:
"""
from mcp_chat_endpoint import MCPChatAssistant, ChatRequest, ChatResponse
# 创建聊天助手实例
chat_assistant = MCPChatAssistant(provider="qwen") # 或 "openai", "deepseek"
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
\"\"\"智能对话端点 - 使用LLM理解意图并调用工具\"\"\"
logger.info(f"Chat request: {request.message}")
# 获取可用工具列表
tools = [tool.dict() for tool in TOOLS]
# 调用聊天助手
response = await chat_assistant.chat(
user_message=request.message,
conversation_history=request.conversation_history,
tools=tools,
)
return response
"""

View File

@@ -1,248 +0,0 @@
"""
MCP客户端使用示例
演示如何调用MCP服务器的各种工具
"""
import httpx
import json
from typing import Dict, Any
class MCPClient:
"""MCP客户端"""
def __init__(self, base_url: str = "http://localhost:8900"):
self.base_url = base_url
self.client = httpx.Client(timeout=60.0)
def list_tools(self):
"""列出所有可用工具"""
response = self.client.get(f"{self.base_url}/tools")
response.raise_for_status()
return response.json()
def get_tool(self, tool_name: str):
"""获取特定工具的定义"""
response = self.client.get(f"{self.base_url}/tools/{tool_name}")
response.raise_for_status()
return response.json()
def call_tool(self, tool_name: str, arguments: Dict[str, Any]):
"""调用工具"""
payload = {
"tool": tool_name,
"arguments": arguments
}
response = self.client.post(f"{self.base_url}/tools/call", json=payload)
response.raise_for_status()
return response.json()
def close(self):
"""关闭客户端"""
self.client.close()
def print_result(title: str, result: Dict[str, Any]):
"""打印结果"""
print(f"\n{'=' * 60}")
print(f"{title}")
print(f"{'=' * 60}")
print(json.dumps(result, ensure_ascii=False, indent=2))
def main():
"""主函数 - 演示各种工具的使用"""
client = MCPClient()
try:
# 1. 列出所有工具
print("\n示例1: 列出所有可用工具")
tools = client.list_tools()
print(f"可用工具数量: {len(tools['tools'])}")
for tool in tools['tools']:
print(f" - {tool['name']}: {tool['description'][:50]}...")
# 2. 搜索中国新闻
print("\n示例2: 搜索中国新闻(关键词:人工智能)")
result = client.call_tool(
"search_china_news",
{
"query": "人工智能",
"top_k": 5
}
)
if result['success']:
print_result("中国新闻搜索结果", result['data'])
# 3. 搜索概念板块(按涨跌幅排序)
print("\n示例3: 搜索概念板块(关键词:新能源,按涨跌幅排序)")
result = client.call_tool(
"search_concepts",
{
"query": "新能源",
"size": 5,
"sort_by": "change_pct"
}
)
if result['success']:
print_result("概念搜索结果", result['data'])
# 4. 获取股票的相关概念
print("\n示例4: 获取股票相关概念股票代码600519")
result = client.call_tool(
"get_stock_concepts",
{
"stock_code": "600519",
"size": 10
}
)
if result['success']:
print_result("股票概念结果", result['data'])
# 5. 搜索涨停股票
print("\n示例5: 搜索涨停股票(关键词:锂电池)")
result = client.call_tool(
"search_limit_up_stocks",
{
"query": "锂电池",
"mode": "hybrid",
"page_size": 5
}
)
if result['success']:
print_result("涨停股票搜索结果", result['data'])
# 6. 搜索研究报告
print("\n示例6: 搜索研究报告(关键词:投资策略)")
result = client.call_tool(
"search_research_reports",
{
"query": "投资策略",
"mode": "hybrid",
"size": 3
}
)
if result['success']:
print_result("研究报告搜索结果", result['data'])
# 7. 获取概念统计数据
print("\n示例7: 获取概念统计最近7天")
result = client.call_tool(
"get_concept_statistics",
{
"days": 7,
"min_stock_count": 3
}
)
if result['success']:
print_result("概念统计结果", result['data'])
# 8. 搜索路演信息
print("\n示例8: 搜索路演信息(关键词:业绩)")
result = client.call_tool(
"search_roadshows",
{
"query": "业绩",
"size": 3
}
)
if result['success']:
print_result("路演搜索结果", result['data'])
# 9. 获取股票基本信息
print("\n示例9: 获取股票基本信息股票600519")
result = client.call_tool(
"get_stock_basic_info",
{
"seccode": "600519"
}
)
if result['success']:
print_result("股票基本信息", result['data'])
# 10. 获取股票财务指标
print("\n示例10: 获取股票财务指标股票600519最近5期")
result = client.call_tool(
"get_stock_financial_index",
{
"seccode": "600519",
"limit": 5
}
)
if result['success']:
print_result("财务指标", result['data'])
# 11. 获取股票交易数据
print("\n示例11: 获取股票交易数据股票600519最近10天")
result = client.call_tool(
"get_stock_trade_data",
{
"seccode": "600519",
"limit": 10
}
)
if result['success']:
print_result("交易数据", result['data'])
# 12. 按行业搜索股票
print("\n示例12: 按行业搜索股票(行业:半导体)")
result = client.call_tool(
"search_stocks_by_criteria",
{
"industry": "半导体",
"limit": 10
}
)
if result['success']:
print_result("行业股票", result['data'])
# 13. 股票对比分析
print("\n示例13: 股票对比分析600519 vs 000858")
result = client.call_tool(
"get_stock_comparison",
{
"seccodes": ["600519", "000858"],
"metric": "financial"
}
)
if result['success']:
print_result("股票对比", result['data'])
except Exception as e:
print(f"\n错误: {str(e)}")
finally:
client.close()
def test_single_tool():
"""测试单个工具(用于快速测试)"""
client = MCPClient()
try:
# 修改这里来测试不同的工具
result = client.call_tool(
"search_china_news",
{
"query": "芯片",
"exact_match": True,
"top_k": 3
}
)
print_result("测试结果", result)
except Exception as e:
print(f"错误: {str(e)}")
finally:
client.close()
if __name__ == "__main__":
# 运行完整示例
main()
# 或者测试单个工具
# test_single_tool()

View File

@@ -1,492 +0,0 @@
"""
集成到 mcp_server.py 的 Agent 系统
使用 Kimi (kimi-k2-thinking) 和 DeepMoney 两个模型
"""
from openai import OpenAI
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Literal
from datetime import datetime
import json
import logging
logger = logging.getLogger(__name__)
# ==================== 模型配置 ====================
# Kimi 配置 - 用于计划制定和深度推理
KIMI_CONFIG = {
"api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5",
"base_url": "https://api.moonshot.cn/v1",
"model": "kimi-k2-thinking", # 思考模型
}
# DeepMoney 配置 - 用于新闻总结
DEEPMONEY_CONFIG = {
"api_key": "", # 空值
"base_url": "http://111.62.35.50:8000/v1",
"model": "deepmoney",
}
# ==================== 数据模型 ====================
class ToolCall(BaseModel):
"""工具调用"""
tool: str
arguments: Dict[str, Any]
reason: str
class ExecutionPlan(BaseModel):
"""执行计划"""
goal: str
steps: List[ToolCall]
reasoning: str
class StepResult(BaseModel):
"""单步执行结果"""
step_index: int
tool: str
arguments: Dict[str, Any]
status: Literal["success", "failed", "skipped"]
result: Optional[Any] = None
error: Optional[str] = None
execution_time: float = 0
class AgentResponse(BaseModel):
"""Agent响应"""
success: bool
message: str
plan: Optional[ExecutionPlan] = None
step_results: List[StepResult] = []
final_summary: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
class ChatRequest(BaseModel):
"""聊天请求"""
message: str
conversation_history: List[Dict[str, str]] = []
# ==================== Agent 系统 ====================
class MCPAgentIntegrated:
"""集成版 MCP Agent - 使用 Kimi 和 DeepMoney"""
def __init__(self):
# 初始化 Kimi 客户端(计划制定)
self.kimi_client = OpenAI(
api_key=KIMI_CONFIG["api_key"],
base_url=KIMI_CONFIG["base_url"],
)
self.kimi_model = KIMI_CONFIG["model"]
# 初始化 DeepMoney 客户端(新闻总结)
self.deepmoney_client = OpenAI(
api_key=DEEPMONEY_CONFIG["api_key"],
base_url=DEEPMONEY_CONFIG["base_url"],
)
self.deepmoney_model = DEEPMONEY_CONFIG["model"]
def get_planning_prompt(self, tools: List[dict]) -> str:
"""获取计划制定的系统提示词"""
tools_desc = "\n\n".join([
f"**{tool['name']}**\n"
f"描述:{tool['description']}\n"
f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}"
for tool in tools
])
return f"""你是一个专业的金融研究助手。根据用户问题,制定详细的执行计划。
## 可用工具
{tools_desc}
## 特殊工具
- **summarize_news**: 使用 DeepMoney 模型总结新闻数据
- 参数: {{"data": "新闻列表JSON", "focus": "关注点"}}
- 适用场景: 当需要总结新闻、研报等文本数据时
## 重要知识
- 贵州茅台: 600519
- 涨停: 涨幅约10%
- 概念板块: 相同题材股票分类
## 任务
分析用户问题,制定执行计划。返回 JSON
```json
{{
"goal": "用户目标",
"reasoning": "分析思路",
"steps": [
{{
"tool": "工具名",
"arguments": {{"参数": ""}},
"reason": "原因"
}}
]
}}
```
## 规划原则
1. 先收集数据,再分析总结
2. 使用 summarize_news 总结新闻类数据
3. 不超过5个步骤
4. 最后一步通常是总结
## 示例
用户:"贵州茅台最近有什么新闻"
计划:
```json
{{
"goal": "查询并总结贵州茅台最新新闻",
"reasoning": "先搜索新闻,再用 DeepMoney 总结",
"steps": [
{{
"tool": "search_china_news",
"arguments": {{"query": "贵州茅台", "top_k": 10}},
"reason": "搜索贵州茅台相关新闻"
}},
{{
"tool": "summarize_news",
"arguments": {{
"data": "前面的新闻数据",
"focus": "贵州茅台的重要动态和市场影响"
}},
"reason": "使用DeepMoney总结新闻要点"
}}
]
}}
```
只返回JSON不要其他内容。"""
async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan:
"""阶段1: 使用 Kimi 创建执行计划(带思考过程)"""
logger.info(f"[Planning] Kimi开始制定计划: {user_query}")
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "user", "content": user_query},
]
# 使用 Kimi 思考模型
response = self.kimi_client.chat.completions.create(
model=self.kimi_model,
messages=messages,
temperature=1.0, # Kimi 推荐
max_tokens=16000, # 足够容纳 reasoning_content
)
choice = response.choices[0]
message = choice.message
# 提取思考过程
reasoning_content = ""
if hasattr(message, "reasoning_content"):
reasoning_content = getattr(message, "reasoning_content")
logger.info(f"[Planning] Kimi思考过程: {reasoning_content[:200]}...")
# 提取计划内容
plan_json = message.content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)}")
return plan
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
tool_handlers: Dict[str, Any],
) -> Dict[str, Any]:
"""执行单个工具"""
# 特殊工具summarize_news使用 DeepMoney
if tool_name == "summarize_news":
return await self.summarize_news_with_deepmoney(
data=arguments.get("data", ""),
focus=arguments.get("focus", "关键信息"),
)
# 调用 MCP 工具
handler = tool_handlers.get(tool_name)
if not handler:
raise ValueError(f"Tool '{tool_name}' not found")
result = await handler(arguments)
return result
async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str:
"""使用 DeepMoney 模型总结新闻"""
logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}")
messages = [
{
"role": "system",
"content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。"
},
{
"role": "user",
"content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}"
},
]
try:
response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=1000,
)
summary = response.choices[0].message.content
logger.info(f"[DeepMoney] 总结完成")
return summary
except Exception as e:
logger.error(f"[DeepMoney] 总结失败: {str(e)}")
# 降级:返回简化摘要
return f"新闻总结失败,原始数据:{data[:500]}..."
async def execute_plan(
self,
plan: ExecutionPlan,
tool_handlers: Dict[str, Any],
) -> List[StepResult]:
"""阶段2: 执行计划"""
logger.info(f"[Execution] 开始执行: {len(plan.steps)}")
results = []
collected_data = {}
for i, step in enumerate(plan.steps):
logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}")
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
# 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据"
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
# 将收集的数据传递
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
results.append(step_result)
# 收集数据
collected_data[f"step_{i+1}_{step.tool}"] = result
logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s")
except Exception as e:
logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}")
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
results.append(step_result)
# 继续执行其他步骤
continue
logger.info(f"[Execution] 执行完成")
return results
async def generate_final_summary(
self,
user_query: str,
plan: ExecutionPlan,
step_results: List[StepResult],
) -> str:
"""阶段3: 使用 Kimi 生成最终总结"""
logger.info("[Summary] Kimi生成最终总结")
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
if not successful_results:
return "很抱歉,所有步骤都执行失败,无法生成分析报告。"
# 构建结果文本(精简版)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {str(r.result)[:800]}..."
for r in successful_results[:3] # 只取前3个避免超长
])
messages = [
{
"role": "system",
"content": "你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。"
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告300字以内"""
},
]
try:
response = self.kimi_client.chat.completions.create(
model="kimi-k2-turbpreview", # 使用非思考模型,更快
messages=messages,
temperature=0.7,
max_tokens=1000,
)
summary = response.choices[0].message.content
logger.info("[Summary] 总结完成")
return summary
except Exception as e:
logger.error(f"[Summary] 总结失败: {str(e)}")
# 降级:返回最后一步的结果
if successful_results:
last_result = successful_results[-1]
if isinstance(last_result.result, str):
return last_result.result
else:
return json.dumps(last_result.result, ensure_ascii=False, indent=2)
return "总结生成失败"
async def process_query(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
) -> AgentResponse:
"""主流程"""
logger.info(f"[Agent] 处理查询: {user_query}")
try:
# 阶段1: Kimi 制定计划
plan = await self.create_plan(user_query, tools)
# 阶段2: 执行工具
step_results = await self.execute_plan(plan, tool_handlers)
# 阶段3: Kimi 生成总结
final_summary = await self.generate_final_summary(
user_query, plan, step_results
)
return AgentResponse(
success=True,
message=final_summary,
plan=plan,
step_results=step_results,
final_summary=final_summary,
metadata={
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
"model_used": {
"planning": self.kimi_model,
"summarization": "kimi-k2-turbpreview",
"news_summary": self.deepmoney_model,
},
},
)
except Exception as e:
logger.error(f"[Agent] 错误: {str(e)}", exc_info=True)
return AgentResponse(
success=False,
message=f"处理失败: {str(e)}",
)
# ==================== 添加到 mcp_server.py ====================
"""
在 mcp_server.py 中添加以下代码:
# 导入 Agent 系统
from mcp_server_agent_integration import MCPAgentIntegrated, ChatRequest, AgentResponse
# 创建 Agent 实例(全局)
agent = MCPAgentIntegrated()
# 添加端点
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(request: ChatRequest):
\"\"\"智能代理对话端点\"\"\"
logger.info(f"Agent chat: {request.message}")
# 获取工具列表
tools = [tool.dict() for tool in TOOLS]
# 添加特殊工具summarize_news
tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据JSON格式"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响''投资机会'"
}
},
"required": ["data"]
}
})
# 处理查询
response = await agent.process_query(
user_query=request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
)
return response
"""