diff --git a/__pycache__/mcp_server.cpython-310.pyc b/__pycache__/mcp_server.cpython-310.pyc index 0b667eb0..ef714330 100644 Binary files a/__pycache__/mcp_server.cpython-310.pyc and b/__pycache__/mcp_server.cpython-310.pyc differ diff --git a/mcp_server.py b/mcp_server.py index 534816b5..61ea6a53 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -6,9 +6,9 @@ MCP Server for Financial Data Search from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field -from typing import List, Dict, Any, Optional, Literal +from typing import List, Dict, Any, Optional, Literal, AsyncGenerator from datetime import datetime, date import logging import httpx @@ -16,6 +16,7 @@ from enum import Enum import mcp_database as db from openai import OpenAI import json +import asyncio # 配置日志 logging.basicConfig(level=logging.INFO) @@ -1327,7 +1328,7 @@ class MCPAgentIntegrated: tools: List[dict], tool_handlers: Dict[str, Any], ) -> AgentResponse: - """主流程""" + """主流程(非流式)""" logger.info(f"[Agent] 处理查询: {user_query}") try: @@ -1368,6 +1369,131 @@ class MCPAgentIntegrated: message=f"处理失败: {str(e)}", ) + async def process_query_stream( + self, + user_query: str, + tools: List[dict], + tool_handlers: Dict[str, Any], + ) -> AsyncGenerator[str, None]: + """主流程(流式输出)- 逐步返回执行结果""" + logger.info(f"[Agent Stream] 处理查询: {user_query}") + + try: + # 发送开始事件 + yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"}) + + # 阶段1: Kimi 制定计划 + yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."}) + + plan = await self.create_plan(user_query, tools) + + # 发送计划 + yield self._format_sse("plan", { + "goal": plan.goal, + "reasoning": plan.reasoning, + "steps": [ + {"tool": step.tool, "arguments": step.arguments, "reason": step.reason} + for step in plan.steps + ], + }) + + # 阶段2: 执行工具(逐步返回) + yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"}) + + step_results = [] + collected_data = {} + + for i, step in enumerate(plan.steps): + # 发送步骤开始事件 + yield self._format_sse("step_start", { + "step_index": i, + "tool": step.tool, + "arguments": step.arguments, + "reason": step.reason, + }) + + start_time = datetime.now() + + try: + # 替换占位符 + arguments = step.arguments.copy() + if step.tool == "summarize_news": + if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]: + arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) + + # 执行工具 + result = await self.execute_tool(step.tool, arguments, tool_handlers) + execution_time = (datetime.now() - start_time).total_seconds() + + step_result = StepResult( + step_index=i, + tool=step.tool, + arguments=arguments, + status="success", + result=result, + execution_time=execution_time, + ) + step_results.append(step_result) + collected_data[f"step_{i+1}_{step.tool}"] = result + + # 发送步骤完成事件(包含结果) + yield self._format_sse("step_complete", { + "step_index": i, + "tool": step.tool, + "status": "success", + "result": result, + "execution_time": execution_time, + }) + + except Exception as e: + execution_time = (datetime.now() - start_time).total_seconds() + + step_result = StepResult( + step_index=i, + tool=step.tool, + arguments=step.arguments, + status="failed", + error=str(e), + execution_time=execution_time, + ) + step_results.append(step_result) + + # 发送步骤失败事件 + yield self._format_sse("step_complete", { + "step_index": i, + "tool": step.tool, + "status": "failed", + "error": str(e), + "execution_time": execution_time, + }) + + # 阶段3: Kimi 生成总结 + yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."}) + + final_summary = await self.generate_final_summary(user_query, plan, step_results) + + # 发送最终总结 + yield self._format_sse("summary", { + "content": final_summary, + "metadata": { + "total_steps": len(plan.steps), + "successful_steps": len([r for r in step_results if r.status == "success"]), + "failed_steps": len([r for r in step_results if r.status == "failed"]), + "total_execution_time": sum(r.execution_time for r in step_results), + }, + }) + + # 发送完成事件 + yield self._format_sse("done", {"message": "处理完成"}) + + except Exception as e: + logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True) + yield self._format_sse("error", {"message": f"处理失败: {str(e)}"}) + + def _format_sse(self, event: str, data: dict) -> str: + """格式化 SSE 消息""" + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + # 创建 Agent 实例(全局) agent = MCPAgentIntegrated() @@ -1406,7 +1532,7 @@ async def chat(request: ChatRequest): @app.post("/agent/chat", response_model=AgentResponse) async def agent_chat(request: AgentChatRequest): - """智能代理对话端点""" + """智能代理对话端点(非流式)""" logger.info(f"Agent chat: {request.message}") # 获取工具列表 @@ -1441,6 +1567,49 @@ async def agent_chat(request: AgentChatRequest): return response +@app.post("/agent/chat/stream") +async def agent_chat_stream(request: AgentChatRequest): + """智能代理对话端点(流式 SSE)""" + logger.info(f"Agent chat stream: {request.message}") + + # 获取工具列表 + tools = [tool.dict() for tool in TOOLS] + + # 添加特殊工具:summarize_news + tools.append({ + "name": "summarize_news", + "description": "使用 DeepMoney 模型总结新闻数据,提取关键信息", + "parameters": { + "type": "object", + "properties": { + "data": { + "type": "string", + "description": "要总结的新闻数据(JSON格式)" + }, + "focus": { + "type": "string", + "description": "关注点,例如:'市场影响'、'投资机会'等" + } + }, + "required": ["data"] + } + }) + + # 返回流式响应 + return StreamingResponse( + agent.process_query_stream( + user_query=request.message, + tools=tools, + tool_handlers=TOOL_HANDLERS, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # 禁用 Nginx 缓冲 + }, + ) + # ==================== 健康检查 ==================== @app.get("/health") diff --git a/src/components/ChatBot/ChatInterfaceV2.js b/src/components/ChatBot/ChatInterfaceV2.js index d617d729..b9785e34 100644 --- a/src/components/ChatBot/ChatInterfaceV2.js +++ b/src/components/ChatBot/ChatInterfaceV2.js @@ -95,7 +95,7 @@ export const ChatInterfaceV2 = () => { }); }; - // 发送消息(Agent模式) + // 发送消息(Agent模式 - 流式) const handleSendMessage = async () => { if (!inputValue.trim() || isProcessing) return; @@ -106,10 +106,16 @@ export const ChatInterfaceV2 = () => { }; addMessage(userMessage); + const userInput = inputValue; // 保存输入值 setInputValue(''); setIsProcessing(true); setCurrentProgress(0); + // 用于存储步骤结果 + let currentPlan = null; + let stepResults = []; + let executingMessageId = null; + try { // 1. 显示思考状态 addMessage({ @@ -120,18 +126,40 @@ export const ChatInterfaceV2 = () => { setCurrentProgress(10); - // 调用 Agent API - const response = await fetch(`${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat`, { + // 使用 EventSource 接收流式数据 + const eventSource = new EventSource( + `${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat/stream`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + message: userInput, + conversation_history: messages + .filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE) + .map(m => ({ + isUser: m.type === MessageTypes.USER, + content: m.content, + })), + }), + } + ); + + // 由于 EventSource 不支持 POST,我们使用 fetch + ReadableStream + const response = await fetch(`${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat/stream`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ - message: inputValue, - conversation_history: messages.filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE).map(m => ({ - isUser: m.type === MessageTypes.USER, - content: m.content, - })), + message: userInput, + conversation_history: messages + .filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE) + .map(m => ({ + isUser: m.type === MessageTypes.USER, + content: m.content, + })), }), }); @@ -139,62 +167,139 @@ export const ChatInterfaceV2 = () => { throw new Error('Agent请求失败'); } - const agentResponse = await response.json(); - logger.info('Agent response', agentResponse); + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; - // 移除思考消息 - setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING)); + // 读取流式数据 + while (true) { + const { done, value } = await reader.read(); + if (done) break; - if (!agentResponse.success) { - throw new Error(agentResponse.message || '处理失败'); - } + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n\n'); + buffer = lines.pop(); // 保留不完整的行 - setCurrentProgress(30); + for (const line of lines) { + if (!line.trim()) continue; - // 2. 显示执行计划 - if (agentResponse.plan) { - addMessage({ - type: MessageTypes.AGENT_PLAN, - content: '已制定执行计划', - plan: agentResponse.plan, - timestamp: new Date().toISOString(), - }); - } + // 解析 SSE 消息 + const eventMatch = line.match(/^event: (.+)$/m); + const dataMatch = line.match(/^data: (.+)$/m); - setCurrentProgress(40); + if (!eventMatch || !dataMatch) continue; - // 3. 显示执行过程 - if (agentResponse.step_results && agentResponse.step_results.length > 0) { - addMessage({ - type: MessageTypes.AGENT_EXECUTING, - content: '正在执行步骤...', - plan: agentResponse.plan, - stepResults: agentResponse.step_results, - timestamp: new Date().toISOString(), - }); + const event = eventMatch[1]; + const data = JSON.parse(dataMatch[1]); - // 模拟进度更新 - for (let i = 0; i < agentResponse.step_results.length; i++) { - setCurrentProgress(40 + (i + 1) / agentResponse.step_results.length * 50); - await new Promise(resolve => setTimeout(resolve, 100)); + logger.info(`SSE Event: ${event}`, data); + + // 处理不同类型的事件 + switch (event) { + case 'status': + if (data.stage === 'planning') { + // 移除思考消息,显示规划中 + setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING)); + addMessage({ + type: MessageTypes.AGENT_THINKING, + content: data.message, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(20); + } else if (data.stage === 'executing') { + setCurrentProgress(30); + } else if (data.stage === 'summarizing') { + setCurrentProgress(90); + } + break; + + case 'plan': + // 移除思考消息 + setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING)); + + // 显示执行计划 + currentPlan = data; + addMessage({ + type: MessageTypes.AGENT_PLAN, + content: '已制定执行计划', + plan: data, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(30); + break; + + case 'step_start': + // 如果还没有执行中消息,创建一个 + if (!executingMessageId) { + const executingMsg = { + type: MessageTypes.AGENT_EXECUTING, + content: '正在执行步骤...', + plan: currentPlan, + stepResults: [], + timestamp: new Date().toISOString(), + }; + addMessage(executingMsg); + executingMessageId = Date.now(); + } + break; + + case 'step_complete': + // 添加步骤结果 + stepResults.push({ + step_index: data.step_index, + tool: data.tool, + status: data.status, + result: data.result, + error: data.error, + execution_time: data.execution_time, + arguments: data.arguments, + }); + + // 更新执行中消息 + setMessages(prev => + prev.map(msg => + msg.type === MessageTypes.AGENT_EXECUTING + ? { ...msg, stepResults: [...stepResults] } + : msg + ) + ); + + // 更新进度 + if (currentPlan) { + const progress = 30 + ((data.step_index + 1) / currentPlan.steps.length) * 60; + setCurrentProgress(progress); + } + break; + + case 'summary': + // 移除执行中消息 + setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_EXECUTING)); + + // 显示最终结果 + addMessage({ + type: MessageTypes.AGENT_RESPONSE, + content: data.content, + plan: currentPlan, + stepResults: stepResults, + metadata: data.metadata, + timestamp: new Date().toISOString(), + }); + setCurrentProgress(100); + break; + + case 'error': + throw new Error(data.message); + + case 'done': + logger.info('Stream完成'); + break; + + default: + logger.warn('未知事件类型:', event); + } } } - setCurrentProgress(100); - - // 移除执行中消息 - setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_EXECUTING)); - - // 4. 显示最终结果 - addMessage({ - type: MessageTypes.AGENT_RESPONSE, - content: agentResponse.message || agentResponse.final_summary, - plan: agentResponse.plan, - stepResults: agentResponse.step_results, - metadata: agentResponse.metadata, - timestamp: new Date().toISOString(), - }); - } catch (error) { logger.error('Agent chat error', error);