diff --git a/mcp_server.py b/mcp_server.py index fc18565d..636112aa 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -1431,6 +1431,10 @@ class MCPAgentIntegrated: user_query: str, tools: List[dict], tool_handlers: Dict[str, Any], + session_id: str = None, + user_id: str = None, + user_nickname: str = None, + user_avatar: str = None, ) -> AsyncGenerator[str, None]: """主流程(流式输出)- 逐步返回执行结果""" logger.info(f"[Agent Stream] 处理查询: {user_query}") @@ -1683,6 +1687,36 @@ class MCPAgentIntegrated: }, }) + # 保存 Agent 回复到 ES(如果提供了 session_id) + if session_id and user_id: + try: + # 将执行步骤转换为 JSON 字符串 + steps_json = json.dumps( + [{"tool": step.tool, "status": step.status, "result": step.result} for step in step_results], + ensure_ascii=False + ) + + # 将 plan 转换为 JSON 字符串(ES 中 plan 字段是 text 类型) + plan_json = json.dumps({ + "goal": plan.goal, + "reasoning": plan.reasoning, + "steps": [{"tool": step.tool, "description": step.description, "parameters": step.parameters} for step in plan.steps] + }, ensure_ascii=False) + + es_client.save_chat_message( + session_id=session_id, + user_id=user_id, + user_nickname=user_nickname or "匿名用户", + user_avatar=user_avatar or "", + message_type="assistant", + message=final_summary, + plan=plan_json, + steps=steps_json, + ) + logger.info(f"[ES] Agent 回复已保存到会话 {session_id}") + except Exception as e: + logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True) + # 发送完成事件 yield self._format_sse("done", {"message": "处理完成"}) @@ -1880,6 +1914,23 @@ async def agent_chat_stream(request: AgentChatRequest): f"subscription_type: {user_subscription}" ) + # 如果没有提供 session_id,创建新会话 + session_id = request.session_id or str(uuid.uuid4()) + + # 保存用户消息到 ES + try: + es_client.save_chat_message( + session_id=session_id, + user_id=request.user_id or "anonymous", + user_nickname=request.user_nickname or "匿名用户", + user_avatar=request.user_avatar or "", + message_type="user", + message=request.message, + ) + logger.info(f"[ES] 用户消息已保存到会话 {session_id}") + except Exception as e: + logger.error(f"[ES] 保存用户消息失败: {e}") + # 获取工具列表 tools = [tool.dict() for tool in TOOLS] @@ -1909,6 +1960,10 @@ async def agent_chat_stream(request: AgentChatRequest): user_query=request.message, tools=tools, tool_handlers=TOOL_HANDLERS, + session_id=session_id, + user_id=request.user_id, + user_nickname=request.user_nickname, + user_avatar=request.user_avatar, ), media_type="text/event-stream", headers={ diff --git a/src/views/AgentChat/index.js b/src/views/AgentChat/index.js index 52e76a08..cc7b05d5 100644 --- a/src/views/AgentChat/index.js +++ b/src/views/AgentChat/index.js @@ -414,6 +414,7 @@ const AgentChatV3 = () => { content: '', plan: currentPlan, stepResults: stepResults, + isStreaming: true, // 标记为流式输出中 timestamp: new Date().toISOString(), }); setCurrentProgress(85); @@ -430,11 +431,11 @@ const AgentChatV3 = () => { } else if (currentEvent === 'summary') { // 收到完整总结(包含元数据) if (summaryMessageId) { - // 更新已有消息的元数据 + // 更新已有消息的元数据,并标记流式输出完成 setMessages((prev) => prev.map((m) => m.id === summaryMessageId - ? { ...m, metadata: data.metadata } + ? { ...m, metadata: data.metadata, isStreaming: false } : m ) ); @@ -449,6 +450,7 @@ const AgentChatV3 = () => { plan: currentPlan, stepResults: stepResults, metadata: data.metadata, + isStreaming: false, // 非流式,直接标记完成 timestamp: new Date().toISOString(), }); } @@ -964,7 +966,14 @@ const MessageRenderer = ({ message, userAvatar }) => { borderColor={borderColor} boxShadow="md" > - + {/* 流式输出中显示纯文本,完成后才渲染 Markdown + 图表 */} + {message.isStreaming ? ( + + {message.content} + + ) : ( + + )} {/* 元数据 */} {message.metadata && (