agent功能开发增加MCP后端

This commit is contained in:
2025-11-07 20:23:54 +08:00
parent f01eff6eb7
commit 322b1dd845
3 changed files with 332 additions and 58 deletions

View File

@@ -6,9 +6,9 @@ MCP Server for Financial Data Search
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Literal
from typing import List, Dict, Any, Optional, Literal, AsyncGenerator
from datetime import datetime, date
import logging
import httpx
@@ -16,6 +16,7 @@ from enum import Enum
import mcp_database as db
from openai import OpenAI
import json
import asyncio
# 配置日志
logging.basicConfig(level=logging.INFO)
@@ -1327,7 +1328,7 @@ class MCPAgentIntegrated:
tools: List[dict],
tool_handlers: Dict[str, Any],
) -> AgentResponse:
"""主流程"""
"""主流程(非流式)"""
logger.info(f"[Agent] 处理查询: {user_query}")
try:
@@ -1368,6 +1369,131 @@ class MCPAgentIntegrated:
message=f"处理失败: {str(e)}",
)
async def process_query_stream(
self,
user_query: str,
tools: List[dict],
tool_handlers: Dict[str, Any],
) -> AsyncGenerator[str, None]:
"""主流程(流式输出)- 逐步返回执行结果"""
logger.info(f"[Agent Stream] 处理查询: {user_query}")
try:
# 发送开始事件
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
# 阶段1: Kimi 制定计划
yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."})
plan = await self.create_plan(user_query, tools)
# 发送计划
yield self._format_sse("plan", {
"goal": plan.goal,
"reasoning": plan.reasoning,
"steps": [
{"tool": step.tool, "arguments": step.arguments, "reason": step.reason}
for step in plan.steps
],
})
# 阶段2: 执行工具(逐步返回)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"})
step_results = []
collected_data = {}
for i, step in enumerate(plan.steps):
# 发送步骤开始事件
yield self._format_sse("step_start", {
"step_index": i,
"tool": step.tool,
"arguments": step.arguments,
"reason": step.reason,
})
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
step_results.append(step_result)
collected_data[f"step_{i+1}_{step.tool}"] = result
# 发送步骤完成事件(包含结果)
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"status": "success",
"result": result,
"execution_time": execution_time,
})
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
status="failed",
error=str(e),
execution_time=execution_time,
)
step_results.append(step_result)
# 发送步骤失败事件
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"status": "failed",
"error": str(e),
"execution_time": execution_time,
})
# 阶段3: Kimi 生成总结
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
final_summary = await self.generate_final_summary(user_query, plan, step_results)
# 发送最终总结
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
# 发送完成事件
yield self._format_sse("done", {"message": "处理完成"})
except Exception as e:
logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)
yield self._format_sse("error", {"message": f"处理失败: {str(e)}"})
def _format_sse(self, event: str, data: dict) -> str:
"""格式化 SSE 消息"""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
# 创建 Agent 实例(全局)
agent = MCPAgentIntegrated()
@@ -1406,7 +1532,7 @@ async def chat(request: ChatRequest):
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(request: AgentChatRequest):
"""智能代理对话端点"""
"""智能代理对话端点(非流式)"""
logger.info(f"Agent chat: {request.message}")
# 获取工具列表
@@ -1441,6 +1567,49 @@ async def agent_chat(request: AgentChatRequest):
return response
@app.post("/agent/chat/stream")
async def agent_chat_stream(request: AgentChatRequest):
"""智能代理对话端点(流式 SSE"""
logger.info(f"Agent chat stream: {request.message}")
# 获取工具列表
tools = [tool.dict() for tool in TOOLS]
# 添加特殊工具summarize_news
tools.append({
"name": "summarize_news",
"description": "使用 DeepMoney 模型总结新闻数据,提取关键信息",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "要总结的新闻数据JSON格式"
},
"focus": {
"type": "string",
"description": "关注点,例如:'市场影响''投资机会'"
}
},
"required": ["data"]
}
})
# 返回流式响应
return StreamingResponse(
agent.process_query_stream(
user_query=request.message,
tools=tools,
tool_handlers=TOOL_HANDLERS,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
# ==================== 健康检查 ====================
@app.get("/health")

View File

@@ -95,7 +95,7 @@ export const ChatInterfaceV2 = () => {
});
};
// 发送消息Agent模式
// 发送消息Agent模式 - 流式
const handleSendMessage = async () => {
if (!inputValue.trim() || isProcessing) return;
@@ -106,10 +106,16 @@ export const ChatInterfaceV2 = () => {
};
addMessage(userMessage);
const userInput = inputValue; // 保存输入值
setInputValue('');
setIsProcessing(true);
setCurrentProgress(0);
// 用于存储步骤结果
let currentPlan = null;
let stepResults = [];
let executingMessageId = null;
try {
// 1. 显示思考状态
addMessage({
@@ -120,18 +126,40 @@ export const ChatInterfaceV2 = () => {
setCurrentProgress(10);
// Agent API
const response = await fetch(`${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat`, {
// 使EventSource 接收流式数据
const eventSource = new EventSource(
`${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat/stream`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: userInput,
conversation_history: messages
.filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE)
.map(m => ({
isUser: m.type === MessageTypes.USER,
content: m.content,
})),
}),
}
);
// 由于 EventSource 不支持 POST我们使用 fetch + ReadableStream
const response = await fetch(`${mcpService.baseURL.replace('/mcp', '')}/mcp/agent/chat/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: inputValue,
conversation_history: messages.filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE).map(m => ({
isUser: m.type === MessageTypes.USER,
content: m.content,
})),
message: userInput,
conversation_history: messages
.filter(m => m.type === MessageTypes.USER || m.type === MessageTypes.AGENT_RESPONSE)
.map(m => ({
isUser: m.type === MessageTypes.USER,
content: m.content,
})),
}),
});
@@ -139,62 +167,139 @@ export const ChatInterfaceV2 = () => {
throw new Error('Agent请求失败');
}
const agentResponse = await response.json();
logger.info('Agent response', agentResponse);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// 移除思考消息
setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING));
// 读取流式数据
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (!agentResponse.success) {
throw new Error(agentResponse.message || '处理失败');
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // 保留不完整的行
setCurrentProgress(30);
for (const line of lines) {
if (!line.trim()) continue;
// 2. 显示执行计划
if (agentResponse.plan) {
addMessage({
type: MessageTypes.AGENT_PLAN,
content: '已制定执行计划',
plan: agentResponse.plan,
timestamp: new Date().toISOString(),
});
}
// 解析 SSE 消息
const eventMatch = line.match(/^event: (.+)$/m);
const dataMatch = line.match(/^data: (.+)$/m);
setCurrentProgress(40);
if (!eventMatch || !dataMatch) continue;
// 3. 显示执行过程
if (agentResponse.step_results && agentResponse.step_results.length > 0) {
addMessage({
type: MessageTypes.AGENT_EXECUTING,
content: '正在执行步骤...',
plan: agentResponse.plan,
stepResults: agentResponse.step_results,
timestamp: new Date().toISOString(),
});
const event = eventMatch[1];
const data = JSON.parse(dataMatch[1]);
// 模拟进度更新
for (let i = 0; i < agentResponse.step_results.length; i++) {
setCurrentProgress(40 + (i + 1) / agentResponse.step_results.length * 50);
await new Promise(resolve => setTimeout(resolve, 100));
logger.info(`SSE Event: ${event}`, data);
// 处理不同类型的事件
switch (event) {
case 'status':
if (data.stage === 'planning') {
// 移除思考消息,显示规划中
setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING));
addMessage({
type: MessageTypes.AGENT_THINKING,
content: data.message,
timestamp: new Date().toISOString(),
});
setCurrentProgress(20);
} else if (data.stage === 'executing') {
setCurrentProgress(30);
} else if (data.stage === 'summarizing') {
setCurrentProgress(90);
}
break;
case 'plan':
// 移除思考消息
setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_THINKING));
// 显示执行计划
currentPlan = data;
addMessage({
type: MessageTypes.AGENT_PLAN,
content: '已制定执行计划',
plan: data,
timestamp: new Date().toISOString(),
});
setCurrentProgress(30);
break;
case 'step_start':
// 如果还没有执行中消息,创建一个
if (!executingMessageId) {
const executingMsg = {
type: MessageTypes.AGENT_EXECUTING,
content: '正在执行步骤...',
plan: currentPlan,
stepResults: [],
timestamp: new Date().toISOString(),
};
addMessage(executingMsg);
executingMessageId = Date.now();
}
break;
case 'step_complete':
// 添加步骤结果
stepResults.push({
step_index: data.step_index,
tool: data.tool,
status: data.status,
result: data.result,
error: data.error,
execution_time: data.execution_time,
arguments: data.arguments,
});
// 更新执行中消息
setMessages(prev =>
prev.map(msg =>
msg.type === MessageTypes.AGENT_EXECUTING
? { ...msg, stepResults: [...stepResults] }
: msg
)
);
// 更新进度
if (currentPlan) {
const progress = 30 + ((data.step_index + 1) / currentPlan.steps.length) * 60;
setCurrentProgress(progress);
}
break;
case 'summary':
// 移除执行中消息
setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_EXECUTING));
// 显示最终结果
addMessage({
type: MessageTypes.AGENT_RESPONSE,
content: data.content,
plan: currentPlan,
stepResults: stepResults,
metadata: data.metadata,
timestamp: new Date().toISOString(),
});
setCurrentProgress(100);
break;
case 'error':
throw new Error(data.message);
case 'done':
logger.info('Stream完成');
break;
default:
logger.warn('未知事件类型:', event);
}
}
}
setCurrentProgress(100);
// 移除执行中消息
setMessages(prev => prev.filter(m => m.type !== MessageTypes.AGENT_EXECUTING));
// 4. 显示最终结果
addMessage({
type: MessageTypes.AGENT_RESPONSE,
content: agentResponse.message || agentResponse.final_summary,
plan: agentResponse.plan,
stepResults: agentResponse.step_results,
metadata: agentResponse.metadata,
timestamp: new Date().toISOString(),
});
} catch (error) {
logger.error('Agent chat error', error);