agent功能开发增加MCP后端

This commit is contained in:
2025-11-07 23:51:18 +08:00
parent 6899b9d0d2
commit 95eb86c06a
2 changed files with 279 additions and 61 deletions

View File

@@ -1439,12 +1439,68 @@ class MCPAgentIntegrated:
# 发送开始事件
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
# 阶段1: Kimi 制定计划
# 阶段1: Kimi 制定计划(流式)
yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."})
plan = await self.create_plan(user_query, tools)
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "user", "content": user_query},
]
# 发送计划
# 使用流式 API 调用 Kimi
stream = self.kimi_client.chat.completions.create(
model=self.kimi_model,
messages=messages,
temperature=1.0,
max_tokens=16000,
stream=True, # 启用流式输出
)
reasoning_content = ""
plan_content = ""
# 逐块接收 Kimi 的响应
for chunk in stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
plan_content += content_chunk
# 发送思考过程片段
yield self._format_sse("thinking", {
"content": content_chunk,
"stage": "planning"
})
# 提取 reasoning_content如果有
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'reasoning_content'):
reasoning_chunk = chunk.choices[0].delta.reasoning_content
if reasoning_chunk:
reasoning_content += reasoning_chunk
# 发送推理过程片段
yield self._format_sse("reasoning", {
"content": reasoning_chunk
})
# 解析完整的计划
plan_json = plan_content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)}")
# 发送完整计划
yield self._format_sse("plan", {
"goal": plan.goal,
"reasoning": plan.reasoning,
@@ -1524,21 +1580,108 @@ class MCPAgentIntegrated:
"execution_time": execution_time,
})
# 阶段3: Kimi 生成总结
# 阶段3: Kimi 生成总结(流式)
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
final_summary = await self.generate_final_summary(user_query, plan, step_results)
# 收集成功的结果
successful_results = [r for r in step_results if r.status == "success"]
# 发送最终总结
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": len([r for r in step_results if r.status == "success"]),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
if not successful_results:
yield self._format_sse("summary", {
"content": "很抱歉,所有步骤都执行失败,无法生成分析报告。",
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": 0,
"failed_steps": len(step_results),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
else:
# 构建结果文本(精简版)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {str(r.result)[:800]}..."
for r in successful_results[:3] # 只取前3个避免超长
])
messages = [
{
"role": "system",
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
## 数据可视化能力
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
支持的图表类型:
- 折线图line适合时间序列数据如股价走势、财务指标趋势
- 柱状图bar适合对比数据如不同年份的收入、利润对比
- 饼图pie适合占比数据如业务结构、资产分布
### 图表格式(使用 Markdown 代码块)
在报告中插入图表时,使用以下格式:
```echarts
{
"title": {"text": "图表标题"},
"tooltip": {},
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
"yAxis": {"type": "value"},
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
}
```
**重要提示**
- ECharts 配置必须是合法的 JSON 格式
- 只在有明确数值数据时才生成图表
- 不要凭空捏造数据"""
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告500字以内。如果结果中包含数值型数据请使用 ECharts 图表进行可视化展示。"""
},
]
# 使用流式 API 生成总结
summary_stream = self.kimi_client.chat.completions.create(
model="kimi-k2-turbo-preview",
messages=messages,
temperature=0.7,
max_tokens=2000,
stream=True, # 启用流式输出
)
final_summary = ""
# 逐块发送总结内容
for chunk in summary_stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
final_summary += content_chunk
# 发送总结片段
yield self._format_sse("summary_chunk", {
"content": content_chunk
})
logger.info("[Summary] 流式总结完成")
# 发送完整的总结和元数据
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": len(successful_results),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
# 发送完成事件
yield self._format_sse("done", {"message": "处理完成"})