""" 集成到 mcp_server.py 的 Agent 系统 使用 Kimi (kimi-k2-thinking) 和 DeepMoney 两个模型 """ from openai import OpenAI from pydantic import BaseModel from typing import List, Dict, Any, Optional, Literal from datetime import datetime import json import logging logger = logging.getLogger(__name__) # ==================== 模型配置 ==================== # Kimi 配置 - 用于计划制定和深度推理 KIMI_CONFIG = { "api_key": "sk-TzB4VYJfCoXGcGrGMiewukVRzjuDsbVCkaZXi2LvkS8s60E5", "base_url": "https://api.moonshot.cn/v1", "model": "kimi-k2-thinking", # 思考模型 } # DeepMoney 配置 - 用于新闻总结 DEEPMONEY_CONFIG = { "api_key": "", # 空值 "base_url": "http://111.62.35.50:8000/v1", "model": "deepmoney", } # ==================== 数据模型 ==================== class ToolCall(BaseModel): """工具调用""" tool: str arguments: Dict[str, Any] reason: str class ExecutionPlan(BaseModel): """执行计划""" goal: str steps: List[ToolCall] reasoning: str class StepResult(BaseModel): """单步执行结果""" step_index: int tool: str arguments: Dict[str, Any] status: Literal["success", "failed", "skipped"] result: Optional[Any] = None error: Optional[str] = None execution_time: float = 0 class AgentResponse(BaseModel): """Agent响应""" success: bool message: str plan: Optional[ExecutionPlan] = None step_results: List[StepResult] = [] final_summary: Optional[str] = None metadata: Optional[Dict[str, Any]] = None class ChatRequest(BaseModel): """聊天请求""" message: str conversation_history: List[Dict[str, str]] = [] # ==================== Agent 系统 ==================== class MCPAgentIntegrated: """集成版 MCP Agent - 使用 Kimi 和 DeepMoney""" def __init__(self): # 初始化 Kimi 客户端(计划制定) self.kimi_client = OpenAI( api_key=KIMI_CONFIG["api_key"], base_url=KIMI_CONFIG["base_url"], ) self.kimi_model = KIMI_CONFIG["model"] # 初始化 DeepMoney 客户端(新闻总结) self.deepmoney_client = OpenAI( api_key=DEEPMONEY_CONFIG["api_key"], base_url=DEEPMONEY_CONFIG["base_url"], ) self.deepmoney_model = DEEPMONEY_CONFIG["model"] def get_planning_prompt(self, tools: List[dict]) -> str: """获取计划制定的系统提示词""" tools_desc = "\n\n".join([ f"**{tool['name']}**\n" f"描述:{tool['description']}\n" f"参数:{json.dumps(tool['parameters'], ensure_ascii=False, indent=2)}" for tool in tools ]) return f"""你是一个专业的金融研究助手。根据用户问题,制定详细的执行计划。 ## 可用工具 {tools_desc} ## 特殊工具 - **summarize_news**: 使用 DeepMoney 模型总结新闻数据 - 参数: {{"data": "新闻列表JSON", "focus": "关注点"}} - 适用场景: 当需要总结新闻、研报等文本数据时 ## 重要知识 - 贵州茅台: 600519 - 涨停: 涨幅约10% - 概念板块: 相同题材股票分类 ## 任务 分析用户问题,制定执行计划。返回 JSON: ```json {{ "goal": "用户目标", "reasoning": "分析思路", "steps": [ {{ "tool": "工具名", "arguments": {{"参数": "值"}}, "reason": "原因" }} ] }} ``` ## 规划原则 1. 先收集数据,再分析总结 2. 使用 summarize_news 总结新闻类数据 3. 不超过5个步骤 4. 最后一步通常是总结 ## 示例 用户:"贵州茅台最近有什么新闻" 计划: ```json {{ "goal": "查询并总结贵州茅台最新新闻", "reasoning": "先搜索新闻,再用 DeepMoney 总结", "steps": [ {{ "tool": "search_china_news", "arguments": {{"query": "贵州茅台", "top_k": 10}}, "reason": "搜索贵州茅台相关新闻" }}, {{ "tool": "summarize_news", "arguments": {{ "data": "前面的新闻数据", "focus": "贵州茅台的重要动态和市场影响" }}, "reason": "使用DeepMoney总结新闻要点" }} ] }} ``` 只返回JSON,不要其他内容。""" async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan: """阶段1: 使用 Kimi 创建执行计划(带思考过程)""" logger.info(f"[Planning] Kimi开始制定计划: {user_query}") messages = [ {"role": "system", "content": self.get_planning_prompt(tools)}, {"role": "user", "content": user_query}, ] # 使用 Kimi 思考模型 response = self.kimi_client.chat.completions.create( model=self.kimi_model, messages=messages, temperature=1.0, # Kimi 推荐 max_tokens=16000, # 足够容纳 reasoning_content ) choice = response.choices[0] message = choice.message # 提取思考过程 reasoning_content = "" if hasattr(message, "reasoning_content"): reasoning_content = getattr(message, "reasoning_content") logger.info(f"[Planning] Kimi思考过程: {reasoning_content[:200]}...") # 提取计划内容 plan_json = message.content.strip() # 清理可能的代码块标记 if "```json" in plan_json: plan_json = plan_json.split("```json")[1].split("```")[0].strip() elif "```" in plan_json: plan_json = plan_json.split("```")[1].split("```")[0].strip() plan_data = json.loads(plan_json) plan = ExecutionPlan( goal=plan_data["goal"], reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""), steps=[ToolCall(**step) for step in plan_data["steps"]], ) logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步") return plan async def execute_tool( self, tool_name: str, arguments: Dict[str, Any], tool_handlers: Dict[str, Any], ) -> Dict[str, Any]: """执行单个工具""" # 特殊工具:summarize_news(使用 DeepMoney) if tool_name == "summarize_news": return await self.summarize_news_with_deepmoney( data=arguments.get("data", ""), focus=arguments.get("focus", "关键信息"), ) # 调用 MCP 工具 handler = tool_handlers.get(tool_name) if not handler: raise ValueError(f"Tool '{tool_name}' not found") result = await handler(arguments) return result async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str: """使用 DeepMoney 模型总结新闻""" logger.info(f"[DeepMoney] 总结新闻,关注点: {focus}") messages = [ { "role": "system", "content": "你是一个专业的金融新闻分析师,擅长提取关键信息并进行总结。" }, { "role": "user", "content": f"请总结以下新闻数据,关注点:{focus}\n\n数据:\n{data[:3000]}" }, ] try: response = self.deepmoney_client.chat.completions.create( model=self.deepmoney_model, messages=messages, temperature=0.7, max_tokens=1000, ) summary = response.choices[0].message.content logger.info(f"[DeepMoney] 总结完成") return summary except Exception as e: logger.error(f"[DeepMoney] 总结失败: {str(e)}") # 降级:返回简化摘要 return f"新闻总结失败,原始数据:{data[:500]}..." async def execute_plan( self, plan: ExecutionPlan, tool_handlers: Dict[str, Any], ) -> List[StepResult]: """阶段2: 执行计划""" logger.info(f"[Execution] 开始执行: {len(plan.steps)} 步") results = [] collected_data = {} for i, step in enumerate(plan.steps): logger.info(f"[Execution] 步骤 {i+1}/{len(plan.steps)}: {step.tool}") start_time = datetime.now() try: # 替换占位符 arguments = step.arguments.copy() # 如果参数值是 "前面的新闻数据" 或 "前面收集的所有数据" if step.tool == "summarize_news": if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]: # 将收集的数据传递 arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) # 执行工具 result = await self.execute_tool(step.tool, arguments, tool_handlers) execution_time = (datetime.now() - start_time).total_seconds() step_result = StepResult( step_index=i, tool=step.tool, arguments=arguments, status="success", result=result, execution_time=execution_time, ) results.append(step_result) # 收集数据 collected_data[f"step_{i+1}_{step.tool}"] = result logger.info(f"[Execution] 步骤 {i+1} 完成: {execution_time:.2f}s") except Exception as e: logger.error(f"[Execution] 步骤 {i+1} 失败: {str(e)}") execution_time = (datetime.now() - start_time).total_seconds() step_result = StepResult( step_index=i, tool=step.tool, arguments=step.arguments, status="failed", error=str(e), execution_time=execution_time, ) results.append(step_result) # 继续执行其他步骤 continue logger.info(f"[Execution] 执行完成") return results async def generate_final_summary( self, user_query: str, plan: ExecutionPlan, step_results: List[StepResult], ) -> str: """阶段3: 使用 Kimi 生成最终总结""" logger.info("[Summary] Kimi生成最终总结") # 收集成功的结果 successful_results = [r for r in step_results if r.status == "success"] if not successful_results: return "很抱歉,所有步骤都执行失败,无法生成分析报告。" # 构建结果文本(精简版) results_text = "\n\n".join([ f"**步骤 {r.step_index + 1}: {r.tool}**\n" f"结果: {str(r.result)[:800]}..." for r in successful_results[:3] # 只取前3个,避免超长 ]) messages = [ { "role": "system", "content": "你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。" }, { "role": "user", "content": f"""用户问题:{user_query} 执行计划:{plan.goal} 执行结果: {results_text} 请生成专业的分析报告(300字以内)。""" }, ] try: response = self.kimi_client.chat.completions.create( model="kimi-k2-turbpreview", # 使用非思考模型,更快 messages=messages, temperature=0.7, max_tokens=1000, ) summary = response.choices[0].message.content logger.info("[Summary] 总结完成") return summary except Exception as e: logger.error(f"[Summary] 总结失败: {str(e)}") # 降级:返回最后一步的结果 if successful_results: last_result = successful_results[-1] if isinstance(last_result.result, str): return last_result.result else: return json.dumps(last_result.result, ensure_ascii=False, indent=2) return "总结生成失败" async def process_query( self, user_query: str, tools: List[dict], tool_handlers: Dict[str, Any], ) -> AgentResponse: """主流程""" logger.info(f"[Agent] 处理查询: {user_query}") try: # 阶段1: Kimi 制定计划 plan = await self.create_plan(user_query, tools) # 阶段2: 执行工具 step_results = await self.execute_plan(plan, tool_handlers) # 阶段3: Kimi 生成总结 final_summary = await self.generate_final_summary( user_query, plan, step_results ) return AgentResponse( success=True, message=final_summary, plan=plan, step_results=step_results, final_summary=final_summary, metadata={ "total_steps": len(plan.steps), "successful_steps": len([r for r in step_results if r.status == "success"]), "failed_steps": len([r for r in step_results if r.status == "failed"]), "total_execution_time": sum(r.execution_time for r in step_results), "model_used": { "planning": self.kimi_model, "summarization": "kimi-k2-turbpreview", "news_summary": self.deepmoney_model, }, }, ) except Exception as e: logger.error(f"[Agent] 错误: {str(e)}", exc_info=True) return AgentResponse( success=False, message=f"处理失败: {str(e)}", ) # ==================== 添加到 mcp_server.py ==================== """ 在 mcp_server.py 中添加以下代码: # 导入 Agent 系统 from mcp_server_agent_integration import MCPAgentIntegrated, ChatRequest, AgentResponse # 创建 Agent 实例(全局) agent = MCPAgentIntegrated() # 添加端点 @app.post("/agent/chat", response_model=AgentResponse) async def agent_chat(request: ChatRequest): \"\"\"智能代理对话端点\"\"\" logger.info(f"Agent chat: {request.message}") # 获取工具列表 tools = [tool.dict() for tool in TOOLS] # 添加特殊工具:summarize_news tools.append({ "name": "summarize_news", "description": "使用 DeepMoney 模型总结新闻数据,提取关键信息", "parameters": { "type": "object", "properties": { "data": { "type": "string", "description": "要总结的新闻数据(JSON格式)" }, "focus": { "type": "string", "description": "关注点,例如:'市场影响'、'投资机会'等" } }, "required": ["data"] } }) # 处理查询 response = await agent.process_query( user_query=request.message, tools=tools, tool_handlers=TOOL_HANDLERS, ) return response """