From 4dc27a35ff9dd3d64032596a5635277a311ce760 Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Fri, 7 Nov 2025 19:55:05 +0800 Subject: [PATCH] =?UTF-8?q?agent=E5=8A=9F=E8=83=BD=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0MCP=E5=90=8E=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kimi_integration.py | 361 ----------------------- mcp_agent_system.py | 470 ------------------------------ mcp_chat_endpoint.py | 295 ------------------- mcp_client_example.py | 248 ---------------- mcp_server_agent_integration.py | 492 -------------------------------- 5 files changed, 1866 deletions(-) delete mode 100644 kimi_integration.py delete mode 100644 mcp_agent_system.py delete mode 100644 mcp_chat_endpoint.py delete mode 100644 mcp_client_example.py delete mode 100644 mcp_server_agent_integration.py diff --git a/kimi_integration.py b/kimi_integration.py deleted file mode 100644 index 80969666..00000000 --- a/kimi_integration.py +++ /dev/null @@ -1,361 +0,0 @@ -""" -Kimi API 集成示例 -演示如何将MCP工具与Kimi大模型结合使用 -""" - -from openai import OpenAI -import json -from typing import List, Dict, Any -from mcp_client_example import MCPClient - -# Kimi API配置 -KIMI_API_KEY = "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5" -KIMI_BASE_URL = "https://api.moonshot.cn/v1" -KIMI_MODEL = "kimi-k2-turbpreview" - -# 初始化Kimi客户端 -kimi_client = OpenAI( - api_key=KIMI_API_KEY, - base_url=KIMI_BASE_URL, -) - -# 初始化MCP客户端 -mcp_client = MCPClient() - - -def convert_mcp_tools_to_kimi_format() -> tuple[List[Dict], Dict]: - """ - 将MCP工具转换为Kimi API的tools格式 - - Returns: - tools: Kimi格式的工具列表 - tool_map: 工具名称到执行函数的映射 - """ - # 获取所有MCP工具 - mcp_tools_response = mcp_client.list_tools() - mcp_tools = mcp_tools_response["tools"] - - # 转换为Kimi格式 - kimi_tools = [] - tool_map = {} - - for tool in mcp_tools: - # Kimi工具格式 - kimi_tool = { - "type": "function", - "function": { - "name": tool["name"], - "description": tool["description"], - "parameters": tool["parameters"] - } - } - kimi_tools.append(kimi_tool) - - # 创建工具执行函数 - tool_name = tool["name"] - tool_map[tool_name] = lambda args, name=tool_name: execute_mcp_tool(name, args) - - return kimi_tools, tool_map - - -def execute_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: - """ - 执行MCP工具 - - Args: - tool_name: 工具名称 - arguments: 工具参数 - - Returns: - 工具执行结果 - """ - print(f"[工具调用] {tool_name}") - print(f"[参数] {json.dumps(arguments, ensure_ascii=False, indent=2)}") - - result = mcp_client.call_tool(tool_name, arguments) - - print(f"[结果] 成功: {result.get('success', False)}") - - return result - - -def chat_with_kimi(user_message: str, verbose: bool = True) -> str: - """ - 与Kimi进行对话,支持工具调用 - - Args: - user_message: 用户消息 - verbose: 是否打印详细信息 - - Returns: - Kimi的回复 - """ - # 获取Kimi格式的工具 - tools, tool_map = convert_mcp_tools_to_kimi_format() - - if verbose: - print(f"\n{'='*60}") - print(f"加载了 {len(tools)} 个工具") - print(f"{'='*60}\n") - - # 初始化对话 - messages = [ - { - "role": "system", - "content": """你是一个专业的金融数据分析助手,由 Moonshot AI 提供支持。 -你可以使用各种工具来帮助用户查询和分析金融数据,包括: -- 新闻搜索(全球新闻、中国新闻、医疗新闻) -- 公司研究(路演信息、研究报告) -- 概念板块分析 -- 股票分析(涨停分析、财务数据、交易数据) -- 财务报表(资产负债表、现金流量表) - -请根据用户的问题,选择合适的工具来获取信息,并提供专业的分析和建议。""" - }, - { - "role": "user", - "content": user_message - } - ] - - if verbose: - print(f"[用户]: {user_message}\n") - - # 对话循环,处理工具调用 - finish_reason = None - iteration = 0 - max_iterations = 10 # 防止无限循环 - - while finish_reason is None or finish_reason == "tool_calls": - iteration += 1 - if iteration > max_iterations: - print("[警告] 达到最大迭代次数") - break - - if verbose and iteration > 1: - print(f"\n[轮次 {iteration}]") - - # 调用Kimi API - completion = kimi_client.chat.completions.create( - model=KIMI_MODEL, - messages=messages, - temperature=0.6, # Kimi推荐的temperature值 - tools=tools, - ) - - choice = completion.choices[0] - finish_reason = choice.finish_reason - - if verbose: - print(f"[Kimi] finish_reason: {finish_reason}") - - # 处理工具调用 - if finish_reason == "tool_calls": - # 将Kimi的消息添加到上下文 - messages.append(choice.message) - - # 执行每个工具调用 - for tool_call in choice.message.tool_calls: - tool_name = tool_call.function.name - tool_arguments = json.loads(tool_call.function.arguments) - - # 执行工具 - tool_result = tool_map[tool_name](tool_arguments) - - # 将工具结果添加到消息中 - messages.append({ - "role": "tool", - "tool_call_id": tool_call.id, - "name": tool_name, - "content": json.dumps(tool_result, ensure_ascii=False), - }) - - if verbose: - print() # 空行分隔 - - # 返回最终回复 - final_response = choice.message.content - - if verbose: - print(f"\n[Kimi]: {final_response}\n") - print(f"{'='*60}") - - return final_response - - -def demo_simple_query(): - """演示1: 简单查询""" - print("\n" + "="*60) - print("演示1: 简单新闻查询") - print("="*60) - - response = chat_with_kimi("帮我查找关于人工智能的最新新闻") - return response - - -def demo_stock_analysis(): - """演示2: 股票分析""" - print("\n" + "="*60) - print("演示2: 股票财务分析") - print("="*60) - - response = chat_with_kimi("帮我分析贵州茅台(600519)的财务状况") - return response - - -def demo_concept_research(): - """演示3: 概念研究""" - print("\n" + "="*60) - print("演示3: 概念板块研究") - print("="*60) - - response = chat_with_kimi("查找新能源汽车相关的概念板块,并告诉我涨幅最高的是哪些") - return response - - -def demo_industry_comparison(): - """演示4: 行业对比""" - print("\n" + "="*60) - print("演示4: 行业内股票对比") - print("="*60) - - response = chat_with_kimi("帮我找出半导体行业的龙头股票,并对比它们的财务指标") - return response - - -def demo_comprehensive_analysis(): - """演示5: 综合分析""" - print("\n" + "="*60) - print("演示5: 综合分析") - print("="*60) - - response = chat_with_kimi(""" - 我想投资白酒行业,请帮我: - 1. 搜索白酒行业的主要上市公司 - 2. 对比贵州茅台和五粮液的财务数据 - 3. 查看最近的行业新闻 - 4. 给出投资建议 - """) - return response - - -def interactive_chat(): - """交互式对话""" - print("\n" + "="*60) - print("Kimi 金融助手 - 交互模式") - print("="*60) - print("提示:输入 'quit' 或 'exit' 退出") - print("="*60 + "\n") - - while True: - try: - user_input = input("你: ").strip() - - if not user_input: - continue - - if user_input.lower() in ['quit', 'exit', '退出']: - print("\n再见!") - break - - response = chat_with_kimi(user_input) - - except KeyboardInterrupt: - print("\n\n再见!") - break - except Exception as e: - print(f"\n[错误] {str(e)}\n") - - -def test_kimi_connection(): - """测试Kimi API连接""" - print("\n" + "="*60) - print("测试 Kimi API 连接") - print("="*60 + "\n") - - try: - # 简单的测试请求 - response = kimi_client.chat.completions.create( - model=KIMI_MODEL, - messages=[ - {"role": "user", "content": "你好,请介绍一下你自己"} - ], - temperature=0.6 - ) - - print("[✓] 连接成功!") - print(f"[✓] 模型: {KIMI_MODEL}") - print(f"[✓] 回复: {response.choices[0].message.content}\n") - - return True - except Exception as e: - print(f"[✗] 连接失败: {str(e)}\n") - return False - - -def show_available_tools(): - """显示所有可用工具""" - print("\n" + "="*60) - print("可用工具列表") - print("="*60 + "\n") - - tools, _ = convert_mcp_tools_to_kimi_format() - - for i, tool in enumerate(tools, 1): - func = tool["function"] - print(f"{i}. {func['name']}") - print(f" 描述: {func['description'][:80]}...") - print() - - print(f"总计: {len(tools)} 个工具\n") - - -if __name__ == "__main__": - import sys - - # 首先测试连接 - if not test_kimi_connection(): - print("请检查API Key和网络连接") - sys.exit(1) - - # 显示可用工具 - show_available_tools() - - # 运行演示 - print("\n选择运行模式:") - print("1. 简单查询演示") - print("2. 股票分析演示") - print("3. 概念研究演示") - print("4. 行业对比演示") - print("5. 综合分析演示") - print("6. 交互式对话") - print("7. 运行所有演示") - - try: - choice = input("\n请选择 (1-7): ").strip() - - if choice == "1": - demo_simple_query() - elif choice == "2": - demo_stock_analysis() - elif choice == "3": - demo_concept_research() - elif choice == "4": - demo_industry_comparison() - elif choice == "5": - demo_comprehensive_analysis() - elif choice == "6": - interactive_chat() - elif choice == "7": - demo_simple_query() - demo_stock_analysis() - demo_concept_research() - demo_industry_comparison() - demo_comprehensive_analysis() - else: - print("无效选择") - - except KeyboardInterrupt: - print("\n\n程序已退出") - finally: - mcp_client.close() diff --git a/mcp_agent_system.py b/mcp_agent_system.py deleted file mode 100644 index 2dfa836d..00000000 --- a/mcp_agent_system.py +++ /dev/null @@ -1,470 +0,0 @@ -""" -MCP Agent System - 基于 DeepResearch 逻辑的智能代理系统 -三阶段流程:计划制定 → 工具执行 → 结果总结 -""" - -from pydantic import BaseModel -from typing import List, Dict, Any, Optional, Literal -from datetime import datetime -import json -import logging -from openai import OpenAI -import asyncio -import os - -logger = logging.getLogger(__name__) - -# ==================== 数据模型 ==================== - -class ToolCall(BaseModel): - """工具调用""" - tool: str - arguments: Dict[str, Any] - reason: str # 为什么要调用这个工具 - -class ExecutionPlan(BaseModel): - """执行计划""" - goal: str # 用户的目标 - steps: List[ToolCall] # 执行步骤 - reasoning: str # 规划reasoning - -class StepResult(BaseModel): - """单步执行结果""" - step_index: int - tool: str - arguments: Dict[str, Any] - status: Literal["success", "failed", "skipped"] - result: Optional[Any] = None - error: Optional[str] = None - execution_time: float = 0 - -class AgentResponse(BaseModel): - """Agent响应""" - success: bool - message: str # 自然语言总结 - plan: Optional[ExecutionPlan] = None # 执行计划 - step_results: List[StepResult] = [] # 每步的结果 - final_summary: Optional[str] = None # 最终总结 - metadata: Optional[Dict[str, Any]] = None - -class ChatRequest(BaseModel): - """聊天请求""" - message: str - conversation_history: List[Dict[str, str]] = [] - stream: bool = False # 是否流式输出 - -# ==================== Agent 系统 ==================== - -class MCPAgent: - """MCP 智能代理 - 三阶段执行""" - - def __init__(self, provider: str = "qwen"): - self.provider = provider - - # LLM 配置 - config = { - "qwen": { - "api_key": os.getenv("DASHSCOPE_API_KEY", ""), - "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", - "model": "qwen-plus", - }, - "deepseek": { - "api_key": os.getenv("DEEPSEEK_API_KEY", ""), - "base_url": "https://api.deepseek.com/v1", - "model": "deepseek-chat", - }, - "openai": { - "api_key": os.getenv("OPENAI_API_KEY", ""), - "base_url": "https://api.openai.com/v1", - "model": "gpt-4o-mini", - }, - }.get(provider) - - if not config or not config["api_key"]: - raise ValueError(f"Provider '{provider}' not configured. Please set API key.") - - self.client = OpenAI( - api_key=config["api_key"], - base_url=config["base_url"], - ) - self.model = config["model"] - - # ==================== 阶段 1: 计划制定 ==================== - - def get_planning_prompt(self, tools: List[dict]) -> str: - """获取计划制定的系统提示词""" - tools_desc = "\n\n".join([ - f"**{tool['name']}**\n" - f"描述:{tool['description']}\n" - f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}" - for tool in tools - ]) - - return f"""你是一个专业的金融研究助手。你需要根据用户的问题,制定一个详细的执行计划。 - -## 可用工具 - -{tools_desc} - -## 重要知识 -- 贵州茅台股票代码: 600519 -- 涨停: 股价单日涨幅约10% -- 概念板块: 相同题材的股票分类 - -## 特殊工具说明 -- **summarize_with_llm**: 这是一个特殊工具,用于让你总结和分析收集到的数据 - - 当需要对多个数据源进行综合分析时使用 - - 当需要生成研究报告时使用 - - 参数: {{"data": "要分析的数据", "task": "分析任务描述"}} - -## 任务 -分析用户问题,制定执行计划。返回 JSON 格式: - -```json -{{ - "goal": "用户的目标(一句话概括)", - "reasoning": "你的分析思路(为什么这样规划)", - "steps": [ - {{ - "tool": "工具名称", - "arguments": {{"参数名": "参数值"}}, - "reason": "为什么要执行这一步" - }} - ] -}} -``` - -## 规划原则 -1. **从简到繁**: 先获取基础信息,再深入分析 -2. **数据先行**: 先收集数据,再总结分析 -3. **合理组合**: 可以调用多个工具,但不要超过5个 -4. **包含总结**: 最后一步通常是 summarize_with_llm - -## 示例 - -用户:"帮我全面分析一下贵州茅台这只股票" - -你的计划: -```json -{{ - "goal": "全面分析贵州茅台股票", - "reasoning": "需要获取基本信息、财务指标、交易数据,然后综合分析", - "steps": [ - {{ - "tool": "get_stock_basic_info", - "arguments": {{"seccode": "600519"}}, - "reason": "获取股票基本信息(公司名称、行业、市值等)" - }}, - {{ - "tool": "get_stock_financial_index", - "arguments": {{"seccode": "600519", "limit": 5}}, - "reason": "获取最近5期财务指标(营收、利润、ROE等)" - }}, - {{ - "tool": "get_stock_trade_data", - "arguments": {{"seccode": "600519", "limit": 30}}, - "reason": "获取最近30天交易数据(价格走势、成交量)" - }}, - {{ - "tool": "search_china_news", - "arguments": {{"query": "贵州茅台", "top_k": 5}}, - "reason": "获取最新新闻,了解市场动态" - }}, - {{ - "tool": "summarize_with_llm", - "arguments": {{ - "data": "前面收集的所有数据", - "task": "综合分析贵州茅台的投资价值,包括基本面、财务状况、股价走势、市场情绪" - }}, - "reason": "综合所有数据,生成投资分析报告" - }} - ] -}} -``` - -只返回JSON,不要额外解释。""" - - async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan: - """阶段1: 创建执行计划""" - logger.info(f"[Planning] Creating plan for: {user_query}") - - messages = [ - {"role": "system", "content": self.get_planning_prompt(tools)}, - {"role": "user", "content": user_query}, - ] - - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - temperature=0.3, - max_tokens=1500, - ) - - plan_json = response.choices[0].message.content.strip() - logger.info(f"[Planning] Raw response: {plan_json}") - - # 清理可能的代码块标记 - if "```json" in plan_json: - plan_json = plan_json.split("```json")[1].split("```")[0].strip() - elif "```" in plan_json: - plan_json = plan_json.split("```")[1].split("```")[0].strip() - - plan_data = json.loads(plan_json) - - plan = ExecutionPlan( - goal=plan_data["goal"], - reasoning=plan_data.get("reasoning", ""), - steps=[ - ToolCall(**step) for step in plan_data["steps"] - ], - ) - - logger.info(f"[Planning] Plan created: {len(plan.steps)} steps") - return plan - - # ==================== 阶段 2: 工具执行 ==================== - - async def execute_tool( - self, - tool_name: str, - arguments: Dict[str, Any], - tool_handlers: Dict[str, Any], - ) -> Dict[str, Any]: - """执行单个工具""" - - # 特殊处理:summarize_with_llm - if tool_name == "summarize_with_llm": - return await self.summarize_with_llm( - data=arguments.get("data", ""), - task=arguments.get("task", "总结数据"), - ) - - # 调用 MCP 工具 - handler = tool_handlers.get(tool_name) - if not handler: - raise ValueError(f"Tool '{tool_name}' not found") - - result = await handler(arguments) - return result - - async def execute_plan( - self, - plan: ExecutionPlan, - tool_handlers: Dict[str, Any], - ) -> List[StepResult]: - """阶段2: 执行计划中的所有步骤""" - logger.info(f"[Execution] Starting execution: {len(plan.steps)} steps") - - results = [] - collected_data = {} # 收集的数据,供后续步骤使用 - - for i, step in enumerate(plan.steps): - logger.info(f"[Execution] Step {i+1}/{len(plan.steps)}: {step.tool}") - - start_time = datetime.now() - - try: - # 替换 arguments 中的占位符 - arguments = step.arguments.copy() - if step.tool == "summarize_with_llm" and arguments.get("data") == "前面收集的所有数据": - # 将收集的数据传递给总结工具 - arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) - - # 执行工具 - result = await self.execute_tool(step.tool, arguments, tool_handlers) - - execution_time = (datetime.now() - start_time).total_seconds() - - # 保存结果 - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=arguments, - status="success", - result=result, - execution_time=execution_time, - ) - results.append(step_result) - - # 收集数据 - collected_data[f"step_{i+1}_{step.tool}"] = result - - logger.info(f"[Execution] Step {i+1} completed in {execution_time:.2f}s") - - except Exception as e: - logger.error(f"[Execution] Step {i+1} failed: {str(e)}") - - execution_time = (datetime.now() - start_time).total_seconds() - - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=step.arguments, - status="failed", - error=str(e), - execution_time=execution_time, - ) - results.append(step_result) - - # 根据错误类型决定是否继续 - if "not found" in str(e).lower(): - logger.warning(f"[Execution] Stopping due to critical error") - break - else: - logger.warning(f"[Execution] Continuing despite error") - continue - - logger.info(f"[Execution] Execution completed: {len(results)} steps") - return results - - async def summarize_with_llm(self, data: str, task: str) -> str: - """特殊工具:使用 LLM 总结数据""" - logger.info(f"[LLM Summary] Task: {task}") - - messages = [ - { - "role": "system", - "content": "你是一个专业的金融分析师。根据提供的数据,完成指定的分析任务。" - }, - { - "role": "user", - "content": f"## 任务\n{task}\n\n## 数据\n{data}\n\n请根据数据完成分析任务,用专业且易懂的语言呈现。" - }, - ] - - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - temperature=0.7, - max_tokens=2000, - ) - - summary = response.choices[0].message.content - return summary - - # ==================== 阶段 3: 结果总结 ==================== - - async def generate_final_summary( - self, - user_query: str, - plan: ExecutionPlan, - step_results: List[StepResult], - ) -> str: - """阶段3: 生成最终总结""" - logger.info("[Summary] Generating final summary") - - # 收集所有成功的结果 - successful_results = [r for r in step_results if r.status == "success"] - - if not successful_results: - return "很抱歉,所有步骤都执行失败,无法生成分析报告。" - - # 构建总结提示 - results_text = "\n\n".join([ - f"**步骤 {r.step_index + 1}: {r.tool}**\n" - f"结果: {json.dumps(r.result, ensure_ascii=False, indent=2)[:1000]}..." - for r in successful_results - ]) - - messages = [ - { - "role": "system", - "content": "你是一个专业的金融研究助手。根据执行结果,生成一份简洁清晰的报告。" - }, - { - "role": "user", - "content": f""" -用户问题:{user_query} - -执行计划:{plan.goal} - -执行结果: -{results_text} - -请根据以上信息,生成一份专业的分析报告(300字以内)。 -""" - }, - ] - - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - temperature=0.7, - max_tokens=1000, - ) - - summary = response.choices[0].message.content - logger.info("[Summary] Final summary generated") - return summary - - # ==================== 主流程 ==================== - - async def process_query( - self, - user_query: str, - tools: List[dict], - tool_handlers: Dict[str, Any], - ) -> AgentResponse: - """主流程:处理用户查询""" - logger.info(f"[Agent] Processing query: {user_query}") - - try: - # 阶段 1: 创建计划 - plan = await self.create_plan(user_query, tools) - - # 阶段 2: 执行计划 - step_results = await self.execute_plan(plan, tool_handlers) - - # 阶段 3: 生成总结 - final_summary = await self.generate_final_summary( - user_query, plan, step_results - ) - - return AgentResponse( - success=True, - message=final_summary, - plan=plan, - step_results=step_results, - final_summary=final_summary, - metadata={ - "total_steps": len(plan.steps), - "successful_steps": len([r for r in step_results if r.status == "success"]), - "failed_steps": len([r for r in step_results if r.status == "failed"]), - "total_execution_time": sum(r.execution_time for r in step_results), - }, - ) - - except Exception as e: - logger.error(f"[Agent] Error: {str(e)}", exc_info=True) - return AgentResponse( - success=False, - message=f"处理失败: {str(e)}", - ) - -# ==================== FastAPI 端点 ==================== - -""" -在 mcp_server.py 中添加: - -from mcp_agent_system import MCPAgent, ChatRequest, AgentResponse - -# 创建 Agent 实例 -agent = MCPAgent(provider="qwen") - -@app.post("/agent/chat", response_model=AgentResponse) -async def agent_chat(request: ChatRequest): - \"\"\"智能代理对话端点\"\"\" - logger.info(f"Agent chat: {request.message}") - - # 获取工具列表和处理器 - tools = [tool.dict() for tool in TOOLS] - - # 处理查询 - response = await agent.process_query( - user_query=request.message, - tools=tools, - tool_handlers=TOOL_HANDLERS, - ) - - return response -""" diff --git a/mcp_chat_endpoint.py b/mcp_chat_endpoint.py deleted file mode 100644 index 11da0ad8..00000000 --- a/mcp_chat_endpoint.py +++ /dev/null @@ -1,295 +0,0 @@ -""" -MCP Chat Endpoint - 添加到 mcp_server.py -集成LLM实现智能对话,自动调用MCP工具并总结结果 -""" - -from pydantic import BaseModel -from typing import List, Dict, Any, Optional -import os -import json -from openai import OpenAI -import logging - -logger = logging.getLogger(__name__) - -# ==================== LLM配置 ==================== - -# 支持多种LLM提供商 -LLM_PROVIDERS = { - "openai": { - "api_key": os.getenv("OPENAI_API_KEY", ""), - "base_url": "https://api.openai.com/v1", - "model": "gpt-4o-mini", # 便宜且快速 - }, - "qwen": { - "api_key": os.getenv("DASHSCOPE_API_KEY", ""), - "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", - "model": "qwen-plus", - }, - "deepseek": { - "api_key": os.getenv("DEEPSEEK_API_KEY", ""), - "base_url": "https://api.deepseek.com/v1", - "model": "deepseek-chat", - }, -} - -# 默认使用的LLM提供商 -DEFAULT_PROVIDER = "qwen" # 推荐使用通义千问,价格便宜 - -# ==================== 数据模型 ==================== - -class Message(BaseModel): - """消息""" - role: str # system, user, assistant - content: str - -class ChatRequest(BaseModel): - """聊天请求""" - message: str - conversation_history: List[Dict[str, str]] = [] - provider: Optional[str] = DEFAULT_PROVIDER - -class ChatResponse(BaseModel): - """聊天响应""" - success: bool - message: str - tool_used: Optional[str] = None - raw_data: Optional[Any] = None - error: Optional[str] = None - -# ==================== LLM助手类 ==================== - -class MCPChatAssistant: - """MCP聊天助手 - 集成LLM和工具调用""" - - def __init__(self, provider: str = DEFAULT_PROVIDER): - self.provider = provider - config = LLM_PROVIDERS.get(provider) - - if not config or not config["api_key"]: - logger.warning(f"LLM provider '{provider}' not configured, using fallback mode") - self.client = None - else: - self.client = OpenAI( - api_key=config["api_key"], - base_url=config["base_url"], - ) - self.model = config["model"] - - def get_system_prompt(self, tools: List[dict]) -> str: - """构建系统提示词""" - tools_desc = "\n\n".join([ - f"**{tool['name']}**\n描述:{tool['description']}\n参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}" - for tool in tools - ]) - - return f"""你是一个专业的金融投资助手。你可以使用以下工具来帮助用户查询信息: - -{tools_desc} - -## 工作流程 -1. **理解用户意图**:分析用户问题,确定需要什么信息 -2. **选择工具**:从上面的工具中选择最合适的一个或多个 -3. **提取参数**:从用户输入中提取工具需要的参数 -4. **返回工具调用指令**(JSON格式): - {{"tool": "工具名", "arguments": {{...}}}} - -## 重要规则 -- 贵州茅台的股票代码是 **600519** -- 如果用户提到股票名称,尝试推断股票代码 -- 如果不确定需要什么信息,使用 search_china_news 搜索相关新闻 -- 涨停是指股票当日涨幅达到10%左右 -- 只返回工具调用指令,不要额外解释 - -## 示例 -用户:"查询贵州茅台的股票信息" -你:{{"tool": "get_stock_basic_info", "arguments": {{"seccode": "600519"}}}} - -用户:"今日涨停的股票有哪些" -你:{{"tool": "search_limit_up_stocks", "arguments": {{"query": "", "mode": "hybrid", "page_size": 10}}}} - -用户:"新能源概念板块表现如何" -你:{{"tool": "search_concepts", "arguments": {{"query": "新能源", "size": 10, "sort_by": "change_pct"}}}} -""" - - async def chat(self, user_message: str, conversation_history: List[Dict[str, str]], tools: List[dict]) -> ChatResponse: - """智能对话""" - try: - if not self.client: - # 降级到简单匹配 - return await self.fallback_chat(user_message) - - # 1. 构建消息历史 - messages = [ - {"role": "system", "content": self.get_system_prompt(tools)}, - ] - - # 添加历史对话(最多保留最近10轮) - for msg in conversation_history[-20:]: - messages.append({ - "role": "user" if msg.get("isUser") else "assistant", - "content": msg.get("content", ""), - }) - - messages.append({"role": "user", "content": user_message}) - - # 2. 调用LLM获取工具调用指令 - logger.info(f"Calling LLM with {len(messages)} messages") - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - temperature=0.3, # 低温度,更确定性 - max_tokens=500, - ) - - tool_call_instruction = response.choices[0].message.content.strip() - logger.info(f"LLM response: {tool_call_instruction}") - - # 3. 解析工具调用指令 - try: - tool_call = json.loads(tool_call_instruction) - tool_name = tool_call.get("tool") - tool_args = tool_call.get("arguments", {}) - - if not tool_name: - raise ValueError("No tool specified") - - # 4. 调用工具(这里需要导入 mcp_server 的工具处理器) - from mcp_server import TOOL_HANDLERS - - handler = TOOL_HANDLERS.get(tool_name) - if not handler: - raise ValueError(f"Tool '{tool_name}' not found") - - tool_result = await handler(tool_args) - - # 5. 让LLM总结结果 - summary_messages = messages + [ - {"role": "assistant", "content": tool_call_instruction}, - {"role": "system", "content": f"工具 {tool_name} 返回的数据:\n{json.dumps(tool_result, ensure_ascii=False, indent=2)}\n\n请用自然语言总结这些数据,给用户一个简洁清晰的回复(不超过200字)。"} - ] - - summary_response = self.client.chat.completions.create( - model=self.model, - messages=summary_messages, - temperature=0.7, - max_tokens=300, - ) - - summary = summary_response.choices[0].message.content - - return ChatResponse( - success=True, - message=summary, - tool_used=tool_name, - raw_data=tool_result, - ) - - except json.JSONDecodeError: - # LLM没有返回JSON格式,直接返回其回复 - return ChatResponse( - success=True, - message=tool_call_instruction, - ) - except Exception as tool_error: - logger.error(f"Tool execution error: {str(tool_error)}") - return ChatResponse( - success=False, - message="工具调用失败", - error=str(tool_error), - ) - - except Exception as e: - logger.error(f"Chat error: {str(e)}", exc_info=True) - return ChatResponse( - success=False, - message="对话处理失败", - error=str(e), - ) - - async def fallback_chat(self, user_message: str) -> ChatResponse: - """降级方案:简单关键词匹配""" - from mcp_server import TOOL_HANDLERS - - try: - # 茅台特殊处理 - if "茅台" in user_message or "贵州茅台" in user_message: - handler = TOOL_HANDLERS.get("get_stock_basic_info") - result = await handler({"seccode": "600519"}) - return ChatResponse( - success=True, - message="已为您查询贵州茅台(600519)的股票信息:", - tool_used="get_stock_basic_info", - raw_data=result, - ) - - # 涨停分析 - elif "涨停" in user_message: - handler = TOOL_HANDLERS.get("search_limit_up_stocks") - query = user_message.replace("涨停", "").strip() - result = await handler({"query": query, "mode": "hybrid", "page_size": 10}) - return ChatResponse( - success=True, - message="已为您查询涨停股票信息:", - tool_used="search_limit_up_stocks", - raw_data=result, - ) - - # 概念板块 - elif "概念" in user_message or "板块" in user_message: - handler = TOOL_HANDLERS.get("search_concepts") - query = user_message.replace("概念", "").replace("板块", "").strip() - result = await handler({"query": query, "size": 10, "sort_by": "change_pct"}) - return ChatResponse( - success=True, - message=f"已为您查询'{query}'相关概念板块:", - tool_used="search_concepts", - raw_data=result, - ) - - # 默认:搜索新闻 - else: - handler = TOOL_HANDLERS.get("search_china_news") - result = await handler({"query": user_message, "top_k": 5}) - return ChatResponse( - success=True, - message="已为您搜索相关新闻:", - tool_used="search_china_news", - raw_data=result, - ) - - except Exception as e: - logger.error(f"Fallback chat error: {str(e)}") - return ChatResponse( - success=False, - message="查询失败", - error=str(e), - ) - -# ==================== FastAPI端点 ==================== - -# 在 mcp_server.py 中添加以下代码: - -""" -from mcp_chat_endpoint import MCPChatAssistant, ChatRequest, ChatResponse - -# 创建聊天助手实例 -chat_assistant = MCPChatAssistant(provider="qwen") # 或 "openai", "deepseek" - -@app.post("/chat", response_model=ChatResponse) -async def chat_endpoint(request: ChatRequest): - \"\"\"智能对话端点 - 使用LLM理解意图并调用工具\"\"\" - logger.info(f"Chat request: {request.message}") - - # 获取可用工具列表 - tools = [tool.dict() for tool in TOOLS] - - # 调用聊天助手 - response = await chat_assistant.chat( - user_message=request.message, - conversation_history=request.conversation_history, - tools=tools, - ) - - return response -""" diff --git a/mcp_client_example.py b/mcp_client_example.py deleted file mode 100644 index 19727435..00000000 --- a/mcp_client_example.py +++ /dev/null @@ -1,248 +0,0 @@ -""" -MCP客户端使用示例 -演示如何调用MCP服务器的各种工具 -""" - -import httpx -import json -from typing import Dict, Any - - -class MCPClient: - """MCP客户端""" - - def __init__(self, base_url: str = "http://localhost:8900"): - self.base_url = base_url - self.client = httpx.Client(timeout=60.0) - - def list_tools(self): - """列出所有可用工具""" - response = self.client.get(f"{self.base_url}/tools") - response.raise_for_status() - return response.json() - - def get_tool(self, tool_name: str): - """获取特定工具的定义""" - response = self.client.get(f"{self.base_url}/tools/{tool_name}") - response.raise_for_status() - return response.json() - - def call_tool(self, tool_name: str, arguments: Dict[str, Any]): - """调用工具""" - payload = { - "tool": tool_name, - "arguments": arguments - } - response = self.client.post(f"{self.base_url}/tools/call", json=payload) - response.raise_for_status() - return response.json() - - def close(self): - """关闭客户端""" - self.client.close() - - -def print_result(title: str, result: Dict[str, Any]): - """打印结果""" - print(f"\n{'=' * 60}") - print(f"{title}") - print(f"{'=' * 60}") - print(json.dumps(result, ensure_ascii=False, indent=2)) - - -def main(): - """主函数 - 演示各种工具的使用""" - - client = MCPClient() - - try: - # 1. 列出所有工具 - print("\n示例1: 列出所有可用工具") - tools = client.list_tools() - print(f"可用工具数量: {len(tools['tools'])}") - for tool in tools['tools']: - print(f" - {tool['name']}: {tool['description'][:50]}...") - - # 2. 搜索中国新闻 - print("\n示例2: 搜索中国新闻(关键词:人工智能)") - result = client.call_tool( - "search_china_news", - { - "query": "人工智能", - "top_k": 5 - } - ) - if result['success']: - print_result("中国新闻搜索结果", result['data']) - - # 3. 搜索概念板块(按涨跌幅排序) - print("\n示例3: 搜索概念板块(关键词:新能源,按涨跌幅排序)") - result = client.call_tool( - "search_concepts", - { - "query": "新能源", - "size": 5, - "sort_by": "change_pct" - } - ) - if result['success']: - print_result("概念搜索结果", result['data']) - - # 4. 获取股票的相关概念 - print("\n示例4: 获取股票相关概念(股票代码:600519)") - result = client.call_tool( - "get_stock_concepts", - { - "stock_code": "600519", - "size": 10 - } - ) - if result['success']: - print_result("股票概念结果", result['data']) - - # 5. 搜索涨停股票 - print("\n示例5: 搜索涨停股票(关键词:锂电池)") - result = client.call_tool( - "search_limit_up_stocks", - { - "query": "锂电池", - "mode": "hybrid", - "page_size": 5 - } - ) - if result['success']: - print_result("涨停股票搜索结果", result['data']) - - # 6. 搜索研究报告 - print("\n示例6: 搜索研究报告(关键词:投资策略)") - result = client.call_tool( - "search_research_reports", - { - "query": "投资策略", - "mode": "hybrid", - "size": 3 - } - ) - if result['success']: - print_result("研究报告搜索结果", result['data']) - - # 7. 获取概念统计数据 - print("\n示例7: 获取概念统计(最近7天)") - result = client.call_tool( - "get_concept_statistics", - { - "days": 7, - "min_stock_count": 3 - } - ) - if result['success']: - print_result("概念统计结果", result['data']) - - # 8. 搜索路演信息 - print("\n示例8: 搜索路演信息(关键词:业绩)") - result = client.call_tool( - "search_roadshows", - { - "query": "业绩", - "size": 3 - } - ) - if result['success']: - print_result("路演搜索结果", result['data']) - - # 9. 获取股票基本信息 - print("\n示例9: 获取股票基本信息(股票:600519)") - result = client.call_tool( - "get_stock_basic_info", - { - "seccode": "600519" - } - ) - if result['success']: - print_result("股票基本信息", result['data']) - - # 10. 获取股票财务指标 - print("\n示例10: 获取股票财务指标(股票:600519,最近5期)") - result = client.call_tool( - "get_stock_financial_index", - { - "seccode": "600519", - "limit": 5 - } - ) - if result['success']: - print_result("财务指标", result['data']) - - # 11. 获取股票交易数据 - print("\n示例11: 获取股票交易数据(股票:600519,最近10天)") - result = client.call_tool( - "get_stock_trade_data", - { - "seccode": "600519", - "limit": 10 - } - ) - if result['success']: - print_result("交易数据", result['data']) - - # 12. 按行业搜索股票 - print("\n示例12: 按行业搜索股票(行业:半导体)") - result = client.call_tool( - "search_stocks_by_criteria", - { - "industry": "半导体", - "limit": 10 - } - ) - if result['success']: - print_result("行业股票", result['data']) - - # 13. 股票对比分析 - print("\n示例13: 股票对比分析(600519 vs 000858)") - result = client.call_tool( - "get_stock_comparison", - { - "seccodes": ["600519", "000858"], - "metric": "financial" - } - ) - if result['success']: - print_result("股票对比", result['data']) - - except Exception as e: - print(f"\n错误: {str(e)}") - - finally: - client.close() - - -def test_single_tool(): - """测试单个工具(用于快速测试)""" - client = MCPClient() - - try: - # 修改这里来测试不同的工具 - result = client.call_tool( - "search_china_news", - { - "query": "芯片", - "exact_match": True, - "top_k": 3 - } - ) - - print_result("测试结果", result) - - except Exception as e: - print(f"错误: {str(e)}") - - finally: - client.close() - - -if __name__ == "__main__": - # 运行完整示例 - main() - - # 或者测试单个工具 - # test_single_tool() diff --git a/mcp_server_agent_integration.py b/mcp_server_agent_integration.py deleted file mode 100644 index fbb47f88..00000000 --- a/mcp_server_agent_integration.py +++ /dev/null @@ -1,492 +0,0 @@ -""" -集成到 mcp_server.py 的 Agent 系统 -使用 Kimi (kimi-k2-thinking) 和 DeepMoney 两个模型 -""" - -from openai import OpenAI -from pydantic import BaseModel -from typing import List, Dict, Any, Optional, Literal -from datetime import datetime -import json -import logging - -logger = logging.getLogger(__name__) - -# ==================== 模型配置 ==================== - -# Kimi 配置 - 用于计划制定和深度推理 -KIMI_CONFIG = { - "api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5", - "base_url": "https://api.moonshot.cn/v1", - "model": "kimi-k2-thinking", # 思考模型 -} - -# DeepMoney 配置 - 用于新闻总结 -DEEPMONEY_CONFIG = { - "api_key": "", # 空值 - "base_url": "http://111.62.35.50:8000/v1", - "model": "deepmoney", -} - -# ==================== 数据模型 ==================== - -class ToolCall(BaseModel): - """工具调用""" - tool: str - arguments: Dict[str, Any] - reason: str - -class ExecutionPlan(BaseModel): - """执行计划""" - goal: str - steps: List[ToolCall] - reasoning: str - -class StepResult(BaseModel): - """单步执行结果""" - step_index: int - tool: str - arguments: Dict[str, Any] - status: Literal["success", "failed", "skipped"] - result: Optional[Any] = None - error: Optional[str] = None - execution_time: float = 0 - -class AgentResponse(BaseModel): - """Agent响应""" - success: bool - message: str - plan: Optional[ExecutionPlan] = None - step_results: List[StepResult] = [] - final_summary: Optional[str] = None - metadata: Optional[Dict[str, Any]] = None - -class ChatRequest(BaseModel): - """聊天请求""" - message: str - conversation_history: List[Dict[str, str]] = [] - -# ==================== Agent 系统 ==================== - -class MCPAgentIntegrated: - """集成版 MCP Agent - 使用 Kimi 和 DeepMoney""" - - def __init__(self): - # 初始化 Kimi 客户端(计划制定) - self.kimi_client = OpenAI( - api_key=KIMI_CONFIG["api_key"], - base_url=KIMI_CONFIG["base_url"], - ) - self.kimi_model = KIMI_CONFIG["model"] - - # 初始化 DeepMoney 客户端(新闻总结) - self.deepmoney_client = OpenAI( - api_key=DEEPMONEY_CONFIG["api_key"], - base_url=DEEPMONEY_CONFIG["base_url"], - ) - self.deepmoney_model = DEEPMONEY_CONFIG["model"] - - def get_planning_prompt(self, tools: List[dict]) -> str: - """获取计划制定的系统提示词""" - tools_desc = "\n\n".join([ - f"**{tool['name']}**\n" - f"描述:{tool['description']}\n" - f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}" - for tool in tools - ]) - - return f"""你是一个专业的金融研究助手。根据用户问题,制定详细的执行计划。 - -## 可用工具 - -{tools_desc} - -## 特殊工具 -- **summarize_news**: 使用 DeepMoney 模型总结新闻数据 - - 参数: {{"data": "新闻列表JSON", "focus": "关注点"}} - - 适用场景: 当需要总结新闻、研报等文本数据时 - -## 重要知识 -- 贵州茅台: 600519 -- 涨停: 涨幅约10% -- 概念板块: 相同题材股票分类 - -## 任务 -分析用户问题,制定执行计划。返回 JSON: - -```json -{{ - "goal": "用户目标", - "reasoning": "分析思路", - "steps": [ - {{ - "tool": "工具名", - "arguments": {{"参数": "值"}}, - "reason": "原因" - }} - ] -}} -``` - -## 规划原则 -1. 先收集数据,再分析总结 -2. 使用 summarize_news 总结新闻类数据 -3. 不超过5个步骤 -4. 最后一步通常是总结 - -## 示例 - -用户:"贵州茅台最近有什么新闻" - -计划: -```json -{{ - "goal": "查询并总结贵州茅台最新新闻", - "reasoning": "先搜索新闻,再用 DeepMoney 总结", - "steps": [ - {{ - "tool": "search_china_news", - "arguments": {{"query": "贵州茅台", "top_k": 10}}, - "reason": "搜索贵州茅台相关新闻" - }}, - {{ - "tool": "summarize_news", - "arguments": {{ - "data": "前面的新闻数据", - "focus": "贵州茅台的重要动态和市场影响" - }}, - "reason": "使用DeepMoney总结新闻要点" - }} - ] -}} -``` - -只返回JSON,不要其他内容。""" - - async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan: - """阶段1: 使用 Kimi 创建执行计划(带思考过程)""" - logger.info(f"[Planning] Kimi开始制定计划: {user_query}") - - messages = [ - {"role": "system", "content": self.get_planning_prompt(tools)}, - {"role": "user", "content": user_query}, - ] - - # 使用 Kimi 思考模型 - response = self.kimi_client.chat.completions.create( - model=self.kimi_model, - messages=messages, - temperature=1.0, # Kimi 推荐 - max_tokens=16000, # 足够容纳 reasoning_content - ) - - choice = response.choices[0] - message = choice.message - - # 提取思考过程 - reasoning_content = "" - if hasattr(message, "reasoning_content"): - reasoning_content = getattr(message, "reasoning_content") - logger.info(f"[Planning] Kimi思考过程: {reasoning_content[:200]}...") - - # 提取计划内容 - plan_json = message.content.strip() - - # 清理可能的代码块标记 - if "```json" in plan_json: - plan_json = plan_json.split("```json")[1].split("```")[0].strip() - elif "```" in plan_json: - plan_json = plan_json.split("```")[1].split("```")[0].strip() - - plan_data = json.loads(plan_json) - - plan = ExecutionPlan( - goal=plan_data["goal"], - reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""), - steps=[ToolCall(**step) for step in plan_data["steps"]], - ) - - logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步") - return plan - - async def execute_tool( - self, - tool_name: str, - arguments: Dict[str, Any], - tool_handlers: Dict[str, Any], - ) -> Dict[str, Any]: - """执行单个工具""" - - # 特殊工具:summarize_news(使用 DeepMoney) - if tool_name == "summarize_news": - return await self.summarize_news_with_deepmoney( - data=arguments.get("data", ""), - focus=arguments.get("focus", "关键信息"), - ) - - # 调用 MCP 工具 - handler = tool_handlers.get(tool_name) - if not handler: - raise ValueError(f"Tool '{tool_name}' not found") - - result = await handler(arguments) - return result - - async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str: - """使用 DeepMoney 模型总结新闻""" - logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}") - - messages = [ - { - "role": "system", - "content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。" - }, - { - "role": "user", - "content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}" - }, - ] - - try: - response = self.deepmoney_client.chat.completions.create( - model=self.deepmoney_model, - messages=messages, - temperature=0.7, - max_tokens=1000, - ) - - summary = response.choices[0].message.content - logger.info(f"[DeepMoney] 总结完成") - return summary - - except Exception as e: - logger.error(f"[DeepMoney] 总结失败: {str(e)}") - # 降级:返回简化摘要 - return f"新闻总结失败,原始数据:{data[:500]}..." - - async def execute_plan( - self, - plan: ExecutionPlan, - tool_handlers: Dict[str, Any], - ) -> List[StepResult]: - """阶段2: 执行计划""" - logger.info(f"[Execution] 开始执行: {len(plan.steps)} 步") - - results = [] - collected_data = {} - - for i, step in enumerate(plan.steps): - logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}") - - start_time = datetime.now() - - try: - # 替换占位符 - arguments = step.arguments.copy() - - # 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据" - if step.tool == "summarize_news": - if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]: - # 将收集的数据传递 - arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) - - # 执行工具 - result = await self.execute_tool(step.tool, arguments, tool_handlers) - - execution_time = (datetime.now() - start_time).total_seconds() - - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=arguments, - status="success", - result=result, - execution_time=execution_time, - ) - results.append(step_result) - - # 收集数据 - collected_data[f"step_{i+1}_{step.tool}"] = result - - logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s") - - except Exception as e: - logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}") - - execution_time = (datetime.now() - start_time).total_seconds() - - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=step.arguments, - status="failed", - error=str(e), - execution_time=execution_time, - ) - results.append(step_result) - - # 继续执行其他步骤 - continue - - logger.info(f"[Execution] 执行完成") - return results - - async def generate_final_summary( - self, - user_query: str, - plan: ExecutionPlan, - step_results: List[StepResult], - ) -> str: - """阶段3: 使用 Kimi 生成最终总结""" - logger.info("[Summary] Kimi生成最终总结") - - # 收集成功的结果 - successful_results = [r for r in step_results if r.status == "success"] - - if not successful_results: - return "很抱歉,所有步骤都执行失败,无法生成分析报告。" - - # 构建结果文本(精简版) - results_text = "\n\n".join([ - f"**步骤 {r.step_index + 1}: {r.tool}**\n" - f"结果: {str(r.result)[:800]}..." - for r in successful_results[:3] # 只取前3个,避免超长 - ]) - - messages = [ - { - "role": "system", - "content": "你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。" - }, - { - "role": "user", - "content": f"""用户问题:{user_query} - -执行计划:{plan.goal} - -执行结果: -{results_text} - -请生成专业的分析报告(300字以内)。""" - }, - ] - - try: - response = self.kimi_client.chat.completions.create( - model="kimi-k2-turbpreview", # 使用非思考模型,更快 - messages=messages, - temperature=0.7, - max_tokens=1000, - ) - - summary = response.choices[0].message.content - logger.info("[Summary] 总结完成") - return summary - - except Exception as e: - logger.error(f"[Summary] 总结失败: {str(e)}") - # 降级:返回最后一步的结果 - if successful_results: - last_result = successful_results[-1] - if isinstance(last_result.result, str): - return last_result.result - else: - return json.dumps(last_result.result, ensure_ascii=False, indent=2) - return "总结生成失败" - - async def process_query( - self, - user_query: str, - tools: List[dict], - tool_handlers: Dict[str, Any], - ) -> AgentResponse: - """主流程""" - logger.info(f"[Agent] 处理查询: {user_query}") - - try: - # 阶段1: Kimi 制定计划 - plan = await self.create_plan(user_query, tools) - - # 阶段2: 执行工具 - step_results = await self.execute_plan(plan, tool_handlers) - - # 阶段3: Kimi 生成总结 - final_summary = await self.generate_final_summary( - user_query, plan, step_results - ) - - return AgentResponse( - success=True, - message=final_summary, - plan=plan, - step_results=step_results, - final_summary=final_summary, - metadata={ - "total_steps": len(plan.steps), - "successful_steps": len([r for r in step_results if r.status == "success"]), - "failed_steps": len([r for r in step_results if r.status == "failed"]), - "total_execution_time": sum(r.execution_time for r in step_results), - "model_used": { - "planning": self.kimi_model, - "summarization": "kimi-k2-turbpreview", - "news_summary": self.deepmoney_model, - }, - }, - ) - - except Exception as e: - logger.error(f"[Agent] 错误: {str(e)}", exc_info=True) - return AgentResponse( - success=False, - message=f"处理失败: {str(e)}", - ) - -# ==================== 添加到 mcp_server.py ==================== - -""" -在 mcp_server.py 中添加以下代码: - -# 导入 Agent 系统 -from mcp_server_agent_integration import MCPAgentIntegrated, ChatRequest, AgentResponse - -# 创建 Agent 实例(全局) -agent = MCPAgentIntegrated() - -# 添加端点 -@app.post("/agent/chat", response_model=AgentResponse) -async def agent_chat(request: ChatRequest): - \"\"\"智能代理对话端点\"\"\" - logger.info(f"Agent chat: {request.message}") - - # 获取工具列表 - tools = [tool.dict() for tool in TOOLS] - - # 添加特殊工具:summarize_news - tools.append({ - "name": "summarize_news", - "description": "使用 DeepMoney 模型总结新闻数据,提取关键信息", - "parameters": { - "type": "object", - "properties": { - "data": { - "type": "string", - "description": "要总结的新闻数据(JSON格式)" - }, - "focus": { - "type": "string", - "description": "关注点,例如:'市场影响'、'投资机会'等" - } - }, - "required": ["data"] - } - }) - - # 处理查询 - response = await agent.process_query( - user_query=request.message, - tools=tools, - tool_handlers=TOOL_HANDLERS, - ) - - return response -"""