update pay ui

This commit is contained in:
2025-12-17 13:53:16 +08:00
parent 0310c40323
commit fff13edcdf
2 changed files with 277 additions and 302 deletions

View File

@@ -2060,6 +2060,11 @@ class MCPAgentIntegrated:
tool_handlers: Dict[str, Any],
) -> Dict[str, Any]:
"""执行单个工具"""
# 详细日志:打印工具名和参数
logger.info(f"[Tool Call] ========== 工具调用开始 ==========")
logger.info(f"[Tool Call] 工具名: {tool_name}")
logger.info(f"[Tool Call] 参数类型: {type(arguments)}")
logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False, indent=2)}")
# 特殊工具summarize_news使用 DeepMoney
if tool_name == "summarize_news":
@@ -2071,9 +2076,13 @@ class MCPAgentIntegrated:
# 调用 MCP 工具
handler = tool_handlers.get(tool_name)
if not handler:
logger.error(f"[Tool Call] 工具 '{tool_name}' 未找到!可用工具: {list(tool_handlers.keys())}")
raise ValueError(f"Tool '{tool_name}' not found")
logger.info(f"[Tool Call] 调用 handler: {handler.__name__}")
result = await handler(arguments)
logger.info(f"[Tool Call] 返回结果类型: {type(result)}")
logger.info(f"[Tool Call] ========== 工具调用结束 ==========")
return result
async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str:
@@ -2412,7 +2421,11 @@ class MCPAgentIntegrated:
chat_history: List[dict] = None, # 新增:历史对话记录
is_new_session: bool = False, # 新增:是否是新会话(用于生成标题)
) -> AsyncGenerator[str, None]:
"""主流程(流式输出)- 逐步返回执行结果"""
"""
主流程(流式输出)- 使用原生 OpenAI Tool Calling API
支持 vLLM 的 --enable-auto-tool-choice --tool-call-parser qwen3_coder
"""
logger.info(f"[Agent Stream] 处理查询: {user_query}")
if chat_history:
logger.info(f"[Agent Stream] 带有 {len(chat_history)} 条历史消息")
@@ -2424,36 +2437,79 @@ class MCPAgentIntegrated:
# 如果传入了自定义模型配置,使用自定义配置,否则使用默认 LLM
if model_config:
planning_client = OpenAI(
llm_client = OpenAI(
api_key=model_config["api_key"],
base_url=model_config["base_url"],
)
planning_model = model_config["model"]
logger.info(f"[Agent Stream] 使用自定义模型: {planning_model}")
llm_model = model_config["model"]
llm_max_tokens = model_config.get("max_tokens", 8192)
logger.info(f"[Agent Stream] 使用自定义模型: {llm_model}")
else:
planning_client = self.llm_client
planning_model = self.llm_model
logger.info(f"[Agent Stream] 使用默认模型: {planning_model}")
llm_client = self.llm_client
llm_model = self.llm_model
llm_max_tokens = self.llm_max_tokens
logger.info(f"[Agent Stream] 使用默认模型: {llm_model}")
# 将工具列表转换为 OpenAI tools 格式
openai_tools = []
for tool in tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
})
logger.info(f"[Agent Stream] 已加载 {len(openai_tools)} 个工具")
# 获取当前时间信息
now = datetime.now()
current_time_info = f"""当前时间: {now.strftime('%Y年%m月%d%H:%M:%S')} {['周一', '周二', '周三', '周四', '周五', '周六', '周日'][now.weekday()]}
A股交易时间: 上午 9:30-11:30下午 13:00-15:00
时间语义: "今天"={now.strftime('%Y-%m-%d')}, "最近"=最近5-10个交易日"""
# 构建系统提示词(适用于原生 tool calling
system_prompt = f"""你是"价小前"北京价值前沿科技公司的AI投研聊天助手。
## 你的能力
- 专业领域: 股票投资研究、市场分析、新闻解读、财务分析
- 你可以调用各种工具来查询股票数据、新闻、概念板块等信息
- 根据用户问题,智能选择并调用合适的工具
{current_time_info}
## 重要知识
- 贵州茅台: 600519
- 涨停: 涨幅约10%
- 概念板块: 相同题材股票分类
## 工具使用原则
1. 根据用户问题,选择最合适的工具
2. 可以多次调用工具来完成复杂任务
3. 获取数据后,给出专业的分析和总结
4. 如果需要总结新闻类数据,使用 summarize_news 工具
## 输出要求
- 使用 Markdown 格式,结构清晰
- 重要数据用 **加粗** 标注
- 如有数值数据,可使用 ECharts 图表展示(使用 ```echarts 代码块)"""
try:
# 发送开始事件
yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"})
# 阶段1: 使用选中的模型制定计划(流式,带 DeepMoney 备选)
yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."})
# 构建消息列表(包含历史对话上下文)
# 构建消息列表
messages = [
{"role": "system", "content": self.get_planning_prompt(tools)},
{"role": "system", "content": system_prompt},
]
# 添加历史对话(最近 10 轮,避免上下文过长
# 添加历史对话(最近 10 轮)
if chat_history:
recent_history = chat_history[-10:] # 最近 10 条消息
recent_history = chat_history[-10:]
for msg in recent_history:
role = "user" if msg.get("message_type") == "user" else "assistant"
content = msg.get("message", "")
# 截断过长的历史消息
if len(content) > 500:
content = content[:500] + "..."
messages.append({"role": role, "content": content})
@@ -2462,183 +2518,183 @@ class MCPAgentIntegrated:
# 添加当前用户查询
messages.append({"role": "user", "content": user_query})
reasoning_content = ""
plan_content = ""
use_fallback = False
try:
# 尝试使用选中的模型流式 API
# 从模型配置获取 max_tokens默认 8192
model_max_tokens = model_config.get("max_tokens", 8192) if model_config else 8192
stream = planning_client.chat.completions.create(
model=planning_model,
messages=messages,
temperature=1.0,
max_tokens=model_max_tokens,
stream=True, # 启用流式输出
)
# 逐块接收 LLM 的响应
for chunk in stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
plan_content += content_chunk
# 发送思考过程片段
yield self._format_sse("thinking", {
"content": content_chunk,
"stage": "planning"
})
# 提取 reasoning_content如果有
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'reasoning_content'):
reasoning_chunk = chunk.choices[0].delta.reasoning_content
if reasoning_chunk:
reasoning_content += reasoning_chunk
# 发送推理过程片段
yield self._format_sse("reasoning", {
"content": reasoning_chunk
})
except Exception as llm_error:
# 检查是否是内容风控错误400
error_str = str(llm_error)
if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str):
logger.warning(f"[Planning] LLM 内容风控拒绝,切换到 DeepMoney: {error_str}")
use_fallback = True
yield self._format_sse("status", {
"stage": "planning",
"message": "切换到备用模型制定计划..."
})
try:
# 使用 DeepMoney 备选方案(非流式)
fallback_response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192),
)
plan_content = fallback_response.choices[0].message.content
# 发送完整的计划内容(一次性)
yield self._format_sse("thinking", {
"content": plan_content,
"stage": "planning"
})
logger.info(f"[Planning] DeepMoney 备选方案成功")
except Exception as fallback_error:
logger.error(f"[Planning] DeepMoney 备选方案也失败: {fallback_error}")
raise Exception(f"LLM 和 DeepMoney 都无法生成计划: {llm_error}, {fallback_error}")
else:
# 不是内容风控错误,直接抛出
logger.error(f"[Planning] LLM 调用失败(非风控原因): {llm_error}")
raise
# 解析完整的计划
plan_json = plan_content.strip()
# 清理可能的代码块标记
if "```json" in plan_json:
plan_json = plan_json.split("```json")[1].split("```")[0].strip()
elif "```" in plan_json:
plan_json = plan_json.split("```")[1].split("```")[0].strip()
plan_data = json.loads(plan_json)
plan = ExecutionPlan(
goal=plan_data["goal"],
reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""),
steps=[ToolCall(**step) for step in plan_data["steps"]],
)
logger.info(f"[Planning] 计划制定完成: {len(plan.steps)}")
# 发送完整计划
yield self._format_sse("plan", {
"goal": plan.goal,
"reasoning": plan.reasoning,
"steps": [
{"tool": step.tool, "arguments": step.arguments, "reason": step.reason}
for step in plan.steps
],
})
# 阶段2: 执行工具(逐步返回)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"})
# 用于收集执行结果
step_results = []
collected_data = {}
plan_steps = [] # 记录执行的步骤,用于前端显示
step_index = 0
max_tool_calls = 10 # 最大工具调用次数,防止无限循环
yield self._format_sse("status", {"stage": "thinking", "message": "正在分析问题..."})
# 循环处理,直到模型不再调用工具
while step_index < max_tool_calls:
logger.info(f"[Agent Stream] 第 {step_index + 1} 轮 LLM 调用")
# 使用原生 tool calling非流式因为需要获取 tool_calls
try:
response = llm_client.chat.completions.create(
model=llm_model,
messages=messages,
tools=openai_tools,
tool_choice="auto",
temperature=0.7,
max_tokens=llm_max_tokens,
stream=False, # 工具调用需要非流式
)
except Exception as e:
logger.error(f"[Agent Stream] LLM 调用失败: {e}")
raise
assistant_message = response.choices[0].message
logger.info(f"[Agent Stream] LLM 响应: finish_reason={response.choices[0].finish_reason}")
# 检查是否有工具调用
if assistant_message.tool_calls:
logger.info(f"[Agent Stream] 检测到 {len(assistant_message.tool_calls)} 个工具调用")
# 将 assistant 消息添加到历史(包含 tool_calls
messages.append(assistant_message)
# 如果是第一次工具调用,发送计划事件
if step_index == 0:
# 构建计划数据
plan_data = {
"goal": f"分析用户问题:{user_query[:50]}...",
"reasoning": "使用工具获取相关数据进行分析",
"steps": []
}
for tc in assistant_message.tool_calls:
try:
args = json.loads(tc.function.arguments) if tc.function.arguments else {}
except:
args = {}
plan_data["steps"].append({
"tool": tc.function.name,
"arguments": args,
"reason": f"调用 {tc.function.name}"
})
yield self._format_sse("plan", plan_data)
yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(assistant_message.tool_calls)} 个工具调用"})
# 执行每个工具调用
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_call_id = tool_call.id
try:
arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}
except json.JSONDecodeError:
arguments = {}
logger.warning(f"[Agent Stream] 工具参数解析失败: {tool_call.function.arguments}")
logger.info(f"[Tool Call] ========== 工具调用开始 ==========")
logger.info(f"[Tool Call] 工具名: {tool_name}")
logger.info(f"[Tool Call] tool_call_id: {tool_call_id}")
logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False)}")
for i, step in enumerate(plan.steps):
# 发送步骤开始事件
yield self._format_sse("step_start", {
"step_index": i,
"tool": step.tool,
"arguments": step.arguments,
"reason": step.reason,
"step_index": step_index,
"tool": tool_name,
"arguments": arguments,
"reason": f"调用 {tool_name}",
})
start_time = datetime.now()
try:
# 替换占位符
arguments = step.arguments.copy()
if step.tool == "summarize_news":
if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]:
# 特殊处理 summarize_news
if tool_name == "summarize_news":
data_arg = arguments.get("data", "")
if data_arg in ["前面的新闻数据", "前面收集的所有数据", ""]:
arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2)
# 执行工具
result = await self.execute_tool(step.tool, arguments, tool_handlers)
result = await self.execute_tool(tool_name, arguments, tool_handlers)
execution_time = (datetime.now() - start_time).total_seconds()
# 记录结果
step_result = StepResult(
step_index=i,
tool=step.tool,
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="success",
result=result,
execution_time=execution_time,
)
step_results.append(step_result)
collected_data[f"step_{i+1}_{step.tool}"] = result
collected_data[f"step_{step_index+1}_{tool_name}"] = result
plan_steps.append({"tool": tool_name, "arguments": arguments, "reason": f"调用 {tool_name}"})
# 发送步骤完成事件(包含结果)
# 发送步骤完成事件
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"step_index": step_index,
"tool": tool_name,
"status": "success",
"result": result,
"execution_time": execution_time,
})
# 将工具结果添加到消息历史
result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result)
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_str[:5000] # 截断过长的结果
})
logger.info(f"[Tool Call] 执行成功,耗时 {execution_time:.2f}s")
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
step_result = StepResult(
step_index=i,
tool=step.tool,
arguments=step.arguments,
step_index=step_index,
tool=tool_name,
arguments=arguments,
status="failed",
error=str(e),
error=error_msg,
execution_time=execution_time,
)
step_results.append(step_result)
# 发送步骤失败事件
yield self._format_sse("step_complete", {
"step_index": i,
"tool": step.tool,
"step_index": step_index,
"tool": tool_name,
"status": "failed",
"error": str(e),
"error": error_msg,
"execution_time": execution_time,
})
# 阶段3: LLM 生成总结(流式)
# 将错误信息添加到消息历史
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": f"工具执行失败: {error_msg}"
})
logger.error(f"[Tool Call] 执行失败: {error_msg}")
logger.info(f"[Tool Call] ========== 工具调用结束 ==========")
step_index += 1
else:
# 没有工具调用,模型生成了最终回复
logger.info(f"[Agent Stream] 模型生成最终回复")
break
# 构建 plan 对象(用于保存到 ES
plan = ExecutionPlan(
goal=f"分析用户问题:{user_query[:50]}...",
reasoning="使用工具获取相关数据进行分析",
steps=[ToolCall(tool=s["tool"], arguments=s["arguments"], reason=s["reason"]) for s in plan_steps],
)
# 阶段3: 生成最终总结
yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."})
# 收集成功的结果
@@ -2647,84 +2703,34 @@ class MCPAgentIntegrated:
# 初始化 final_summary
final_summary = ""
if not successful_results:
final_summary = "很抱歉,所有步骤都执行失败,无法生成分析报告。"
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"successful_steps": 0,
"failed_steps": len(step_results),
"total_execution_time": sum(r.execution_time for r in step_results),
},
})
if not successful_results and step_index == 0:
# 如果没有执行任何工具(模型直接回复),使用模型的回复
if assistant_message and assistant_message.content:
final_summary = assistant_message.content
# 流式发送(虽然已经是完整的,但保持前端兼容)
yield self._format_sse("summary_chunk", {"content": final_summary})
else:
# 构建结果文本(精简版)
results_text = "\n\n".join([
f"**步骤 {r.step_index + 1}: {r.tool}**\n"
f"结果: {str(r.result)[:800]}..."
for r in successful_results[:3] # 只取前3个避免超长
])
messages = [
{
"role": "system",
"content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。
## 数据可视化能力
如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。
支持的图表类型:
- 折线图line适合时间序列数据如股价走势、财务指标趋势
- 柱状图bar适合对比数据如不同年份的收入、利润对比
- 饼图pie适合占比数据如业务结构、资产分布
### 图表格式(使用 Markdown 代码块)
在报告中插入图表时,使用以下格式:
```echarts
{
"title": {"text": "图表标题"},
"tooltip": {},
"xAxis": {"type": "category", "data": ["类别1", "类别2"]},
"yAxis": {"type": "value"},
"series": [{"name": "数据系列", "type": "line", "data": [100, 200]}]
}
```
**重要提示**
- ECharts 配置必须是合法的 JSON 格式
- 只在有明确数值数据时才生成图表
- 不要凭空捏造数据"""
},
{
"role": "user",
"content": f"""用户问题:{user_query}
执行计划:{plan.goal}
执行结果:
{results_text}
请生成专业的分析报告500字以内。如果结果中包含数值型数据请使用 ECharts 图表进行可视化展示。"""
},
]
# 使用流式 API 生成总结(带 DeepMoney 备选)
final_summary = ""
final_summary = "抱歉,我无法处理您的请求。"
yield self._format_sse("summary_chunk", {"content": final_summary})
elif not successful_results:
# 所有步骤都失败
final_summary = "很抱歉,所有步骤都执行失败,无法生成分析报告。"
yield self._format_sse("summary_chunk", {"content": final_summary})
else:
# 有成功的工具调用,使用流式 API 生成最终回复
try:
summary_stream = self.llm_client.chat.completions.create(
model=self.llm_model,
messages=messages,
# 使用流式 API 生成最终回复(不再传入 tools让模型生成文本回复
summary_stream = llm_client.chat.completions.create(
model=llm_model,
messages=messages, # messages 已包含所有工具调用历史
temperature=0.7,
max_tokens=self.llm_max_tokens,
max_tokens=llm_max_tokens,
stream=True, # 启用流式输出
)
# 逐块发送总结内容
for chunk in summary_stream:
if chunk.choices[0].delta.content:
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
final_summary += content_chunk
@@ -2736,55 +2742,24 @@ class MCPAgentIntegrated:
logger.info("[Summary] 流式总结完成")
except Exception as llm_error:
# 检查是否是内容风控错误400
error_str = str(llm_error)
if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str):
logger.warning(f"[Summary] LLM 内容风控拒绝,切换到 DeepMoney: {error_str}")
yield self._format_sse("status", {
"stage": "summarizing",
"message": "切换到备用模型生成总结..."
})
try:
# 使用 DeepMoney 备选方案(非流式)
fallback_response = self.deepmoney_client.chat.completions.create(
model=self.deepmoney_model,
messages=messages,
temperature=0.7,
max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192),
)
final_summary = fallback_response.choices[0].message.content
# 发送完整的总结内容(一次性)
yield self._format_sse("summary_chunk", {
"content": final_summary
})
logger.info(f"[Summary] DeepMoney 备选方案成功")
except Exception as fallback_error:
logger.error(f"[Summary] DeepMoney 备选方案也失败: {fallback_error}")
# 使用降级方案:简单拼接执行结果
final_summary = f"执行了 {len(plan.steps)} 个步骤,其中 {len(successful_results)} 个成功。\n\n执行结果:\n{results_text[:500]}..."
yield self._format_sse("summary_chunk", {
"content": final_summary
})
logger.warning("[Summary] 使用降级方案(简单拼接)")
else:
# 不是内容风控错误,直接抛出
logger.error(f"[Summary] LLM 调用失败(非风控原因): {llm_error}")
raise
logger.error(f"[Summary] 流式总结失败: {llm_error}")
# 降级:使用工具调用结果的简单拼接
results_text = "\n\n".join([
f"**{r.tool}**: {str(r.result)[:500]}..."
for r in successful_results[:5]
])
final_summary = f"根据查询结果:\n\n{results_text}"
yield self._format_sse("summary_chunk", {"content": final_summary})
logger.warning("[Summary] 使用降级方案")
# 发送完整的总结和元数据
yield self._format_sse("summary", {
"content": final_summary,
"metadata": {
"total_steps": len(plan.steps),
"total_steps": len(plan.steps) if plan_steps else 0,
"successful_steps": len(successful_results),
"failed_steps": len([r for r in step_results if r.status == "failed"]),
"total_execution_time": sum(r.execution_time for r in step_results),
"total_execution_time": sum(r.execution_time for r in step_results) if step_results else 0,
},
})
@@ -2834,8 +2809,8 @@ class MCPAgentIntegrated:
except Exception as e:
logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True)
# 发送完成事件
yield self._format_sse("done", {"message": "处理完成"})
# 发送完成事件(包含 session_id
yield self._format_sse("done", {"message": "处理完成", "session_id": session_id})
except Exception as e:
logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)