From 95eb86c06aaef22550b0cf0c04c79b11ab5d3d7f Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Fri, 7 Nov 2025 23:51:18 +0800 Subject: [PATCH] =?UTF-8?q?agent=E5=8A=9F=E8=83=BD=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0MCP=E5=90=8E=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mcp_server.py | 173 ++++++++++++++++++++++++++++++++--- src/views/AgentChat/index.js | 167 +++++++++++++++++++++++---------- 2 files changed, 279 insertions(+), 61 deletions(-) diff --git a/mcp_server.py b/mcp_server.py index a71f2bee..fc18565d 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -1439,12 +1439,68 @@ class MCPAgentIntegrated: # 发送开始事件 yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"}) - # 阶段1: Kimi 制定计划 + # 阶段1: Kimi 制定计划(流式) yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."}) - plan = await self.create_plan(user_query, tools) + messages = [ + {"role": "system", "content": self.get_planning_prompt(tools)}, + {"role": "user", "content": user_query}, + ] - # 发送计划 + # 使用流式 API 调用 Kimi + stream = self.kimi_client.chat.completions.create( + model=self.kimi_model, + messages=messages, + temperature=1.0, + max_tokens=16000, + stream=True, # 启用流式输出 + ) + + reasoning_content = "" + plan_content = "" + + # 逐块接收 Kimi 的响应 + for chunk in stream: + if chunk.choices[0].delta.content: + content_chunk = chunk.choices[0].delta.content + plan_content += content_chunk + + # 发送思考过程片段 + yield self._format_sse("thinking", { + "content": content_chunk, + "stage": "planning" + }) + + # 提取 reasoning_content(如果有) + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'reasoning_content'): + reasoning_chunk = chunk.choices[0].delta.reasoning_content + if reasoning_chunk: + reasoning_content += reasoning_chunk + # 发送推理过程片段 + yield self._format_sse("reasoning", { + "content": reasoning_chunk + }) + + # 解析完整的计划 + plan_json = plan_content.strip() + + # 清理可能的代码块标记 + if "```json" in plan_json: + plan_json = plan_json.split("```json")[1].split("```")[0].strip() + elif "```" in plan_json: + plan_json = plan_json.split("```")[1].split("```")[0].strip() + + plan_data = json.loads(plan_json) + + plan = ExecutionPlan( + goal=plan_data["goal"], + reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""), + steps=[ToolCall(**step) for step in plan_data["steps"]], + ) + + logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步") + + # 发送完整计划 yield self._format_sse("plan", { "goal": plan.goal, "reasoning": plan.reasoning, @@ -1524,21 +1580,108 @@ class MCPAgentIntegrated: "execution_time": execution_time, }) - # 阶段3: Kimi 生成总结 + # 阶段3: Kimi 生成总结(流式) yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."}) - final_summary = await self.generate_final_summary(user_query, plan, step_results) + # 收集成功的结果 + successful_results = [r for r in step_results if r.status == "success"] - # 发送最终总结 - yield self._format_sse("summary", { - "content": final_summary, - "metadata": { - "total_steps": len(plan.steps), - "successful_steps": len([r for r in step_results if r.status == "success"]), - "failed_steps": len([r for r in step_results if r.status == "failed"]), - "total_execution_time": sum(r.execution_time for r in step_results), - }, - }) + if not successful_results: + yield self._format_sse("summary", { + "content": "很抱歉,所有步骤都执行失败,无法生成分析报告。", + "metadata": { + "total_steps": len(plan.steps), + "successful_steps": 0, + "failed_steps": len(step_results), + "total_execution_time": sum(r.execution_time for r in step_results), + }, + }) + else: + # 构建结果文本(精简版) + results_text = "\n\n".join([ + f"**步骤 {r.step_index + 1}: {r.tool}**\n" + f"结果: {str(r.result)[:800]}..." + for r in successful_results[:3] # 只取前3个,避免超长 + ]) + + messages = [ + { + "role": "system", + "content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。 + +## 数据可视化能力 +如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。 + +支持的图表类型: +- 折线图(line):适合时间序列数据(如股价走势、财务指标趋势) +- 柱状图(bar):适合对比数据(如不同年份的收入、利润对比) +- 饼图(pie):适合占比数据(如业务结构、资产分布) + +### 图表格式(使用 Markdown 代码块) +在报告中插入图表时,使用以下格式: + +```echarts +{ + "title": {"text": "图表标题"}, + "tooltip": {}, + "xAxis": {"type": "category", "data": ["类别1", "类别2"]}, + "yAxis": {"type": "value"}, + "series": [{"name": "数据系列", "type": "line", "data": [100, 200]}] +} +``` + +**重要提示**: +- ECharts 配置必须是合法的 JSON 格式 +- 只在有明确数值数据时才生成图表 +- 不要凭空捏造数据""" + }, + { + "role": "user", + "content": f"""用户问题:{user_query} + +执行计划:{plan.goal} + +执行结果: +{results_text} + +请生成专业的分析报告(500字以内)。如果结果中包含数值型数据,请使用 ECharts 图表进行可视化展示。""" + }, + ] + + # 使用流式 API 生成总结 + summary_stream = self.kimi_client.chat.completions.create( + model="kimi-k2-turbo-preview", + messages=messages, + temperature=0.7, + max_tokens=2000, + stream=True, # 启用流式输出 + ) + + final_summary = "" + + # 逐块发送总结内容 + for chunk in summary_stream: + if chunk.choices[0].delta.content: + content_chunk = chunk.choices[0].delta.content + final_summary += content_chunk + + # 发送总结片段 + yield self._format_sse("summary_chunk", { + "content": content_chunk + }) + + logger.info("[Summary] 流式总结完成") + + # 发送完整的总结和元数据 + yield self._format_sse("summary", { + "content": final_summary, + "metadata": { + "total_steps": len(plan.steps), + "successful_steps": len(successful_results), + "failed_steps": len([r for r in step_results if r.status == "failed"]), + "total_execution_time": sum(r.execution_time for r in step_results), + }, + }) # 发送完成事件 yield self._format_sse("done", {"message": "处理完成"}) diff --git a/src/views/AgentChat/index.js b/src/views/AgentChat/index.js index f262d420..52e76a08 100644 --- a/src/views/AgentChat/index.js +++ b/src/views/AgentChat/index.js @@ -307,6 +307,12 @@ const AgentChatV3 = () => { const decoder = new TextDecoder(); let buffer = ''; + // 流式状态变量 + let thinkingMessageId = null; + let thinkingContent = ''; + let summaryMessageId = null; + let summaryContent = ''; + // 读取流式数据 while (true) { const { done, value } = await reader.read(); @@ -316,11 +322,14 @@ const AgentChatV3 = () => { const lines = buffer.split('\n'); buffer = lines.pop(); // 保留不完整的行 + let currentEvent = null; + for (const line of lines) { if (!line.trim() || line.startsWith(':')) continue; if (line.startsWith('event:')) { - // 忽略事件类型行 + // 提取事件类型 + currentEvent = line.substring(6).trim(); continue; } @@ -328,19 +337,37 @@ const AgentChatV3 = () => { try { const data = JSON.parse(line.substring(5).trim()); - // 处理不同类型的事件 - if (data.stage === 'planning') { - // 正在制定计划 - setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING)); - addMessage({ - type: MessageTypes.AGENT_THINKING, - content: '正在制定执行计划...', - timestamp: new Date().toISOString(), - }); - setCurrentProgress(20); - } else if (data.goal) { + // 根据事件类型处理数据 + if (currentEvent === 'thinking') { + // Kimi 流式思考过程 + if (!thinkingMessageId) { + thinkingMessageId = Date.now(); + thinkingContent = ''; + setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING)); + addMessage({ + id: thinkingMessageId, + type: MessageTypes.AGENT_THINKING, + content: '', + timestamp: new Date().toISOString(), + }); + } + thinkingContent += data.content; + // 实时更新思考内容 + setMessages((prev) => + prev.map((m) => + m.id === thinkingMessageId + ? { ...m, content: thinkingContent } + : m + ) + ); + } else if (currentEvent === 'reasoning') { + // Kimi 推理过程(可选显示) + logger.info('Kimi reasoning:', data.content); + } else if (currentEvent === 'plan') { // 收到执行计划 currentPlan = data; + thinkingMessageId = null; + thinkingContent = ''; setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING)); addMessage({ type: MessageTypes.AGENT_PLAN, @@ -349,20 +376,7 @@ const AgentChatV3 = () => { timestamp: new Date().toISOString(), }); setCurrentProgress(30); - } else if (data.stage === 'executing') { - // 开始执行步骤 - const msgId = Date.now(); - executingMessageId = msgId; - addMessage({ - id: msgId, - type: MessageTypes.AGENT_EXECUTING, - content: `正在执行 ${data.message}`, - plan: currentPlan, - stepResults: [], - timestamp: new Date().toISOString(), - }); - setCurrentProgress(40); - } else if (data.step_index !== undefined && data.tool) { + } else if (currentEvent === 'step_complete') { // 收到步骤完成事件 const stepResult = { step_index: data.step_index, @@ -386,29 +400,90 @@ const AgentChatV3 = () => { // 更新进度 const progress = 40 + (stepResults.length / (currentPlan?.steps?.length || 5)) * 40; setCurrentProgress(Math.min(progress, 80)); - } else if (data.stage === 'summarizing') { - // 正在生成总结 - setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_EXECUTING)); - addMessage({ - type: MessageTypes.AGENT_THINKING, - content: '正在生成分析报告...', - timestamp: new Date().toISOString(), - }); - setCurrentProgress(85); - } else if (data.content) { - // 收到最终总结 + } else if (currentEvent === 'summary_chunk') { + // 流式总结内容 + if (!summaryMessageId) { + summaryMessageId = Date.now(); + summaryContent = ''; + setMessages((prev) => + prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING && m.type !== MessageTypes.AGENT_EXECUTING) + ); + addMessage({ + id: summaryMessageId, + type: MessageTypes.AGENT_RESPONSE, + content: '', + plan: currentPlan, + stepResults: stepResults, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(85); + } + summaryContent += data.content; + // 实时更新总结内容 setMessages((prev) => - prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING && m.type !== MessageTypes.AGENT_EXECUTING) + prev.map((m) => + m.id === summaryMessageId + ? { ...m, content: summaryContent } + : m + ) ); - addMessage({ - type: MessageTypes.AGENT_RESPONSE, - content: data.content, - plan: currentPlan, - stepResults: stepResults, - metadata: data.metadata, - timestamp: new Date().toISOString(), - }); + } else if (currentEvent === 'summary') { + // 收到完整总结(包含元数据) + if (summaryMessageId) { + // 更新已有消息的元数据 + setMessages((prev) => + prev.map((m) => + m.id === summaryMessageId + ? { ...m, metadata: data.metadata } + : m + ) + ); + } else { + // 如果没有流式片段,直接显示完整总结 + setMessages((prev) => + prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING && m.type !== MessageTypes.AGENT_EXECUTING) + ); + addMessage({ + type: MessageTypes.AGENT_RESPONSE, + content: data.content, + plan: currentPlan, + stepResults: stepResults, + metadata: data.metadata, + timestamp: new Date().toISOString(), + }); + } setCurrentProgress(100); + } else if (currentEvent === 'status') { + // 状态更新 + if (data.stage === 'planning') { + setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_THINKING)); + addMessage({ + type: MessageTypes.AGENT_THINKING, + content: data.message, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(10); + } else if (data.stage === 'executing') { + const msgId = Date.now(); + executingMessageId = msgId; + addMessage({ + id: msgId, + type: MessageTypes.AGENT_EXECUTING, + content: data.message, + plan: currentPlan, + stepResults: [], + timestamp: new Date().toISOString(), + }); + setCurrentProgress(40); + } else if (data.stage === 'summarizing') { + setMessages((prev) => prev.filter((m) => m.type !== MessageTypes.AGENT_EXECUTING)); + addMessage({ + type: MessageTypes.AGENT_THINKING, + content: data.message, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(80); + } } } catch (e) { logger.error('解析 SSE 数据失败', e);