diff --git a/__pycache__/mcp_server.cpython-310.pyc b/__pycache__/mcp_server.cpython-310.pyc index cddcd1c7..dd509476 100644 Binary files a/__pycache__/mcp_server.cpython-310.pyc and b/__pycache__/mcp_server.cpython-310.pyc differ diff --git a/mcp_server.py b/mcp_server.py index 15d5d219..a4b40a12 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -2060,6 +2060,11 @@ class MCPAgentIntegrated: tool_handlers: Dict[str, Any], ) -> Dict[str, Any]: """执行单个工具""" + # 详细日志:打印工具名和参数 + logger.info(f"[Tool Call] ========== 工具调用开始 ==========") + logger.info(f"[Tool Call] 工具名: {tool_name}") + logger.info(f"[Tool Call] 参数类型: {type(arguments)}") + logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False, indent=2)}") # 特殊工具:summarize_news(使用 DeepMoney) if tool_name == "summarize_news": @@ -2071,9 +2076,13 @@ class MCPAgentIntegrated: # 调用 MCP 工具 handler = tool_handlers.get(tool_name) if not handler: + logger.error(f"[Tool Call] 工具 '{tool_name}' 未找到!可用工具: {list(tool_handlers.keys())}") raise ValueError(f"Tool '{tool_name}' not found") + logger.info(f"[Tool Call] 调用 handler: {handler.__name__}") result = await handler(arguments) + logger.info(f"[Tool Call] 返回结果类型: {type(result)}") + logger.info(f"[Tool Call] ========== 工具调用结束 ==========") return result async def summarize_news_with_deepmoney(self, data: str, focus: str) -> str: @@ -2412,7 +2421,11 @@ class MCPAgentIntegrated: chat_history: List[dict] = None, # 新增:历史对话记录 is_new_session: bool = False, # 新增:是否是新会话(用于生成标题) ) -> AsyncGenerator[str, None]: - """主流程(流式输出)- 逐步返回执行结果""" + """ + 主流程(流式输出)- 使用原生 OpenAI Tool Calling API + + 支持 vLLM 的 --enable-auto-tool-choice --tool-call-parser qwen3_coder + """ logger.info(f"[Agent Stream] 处理查询: {user_query}") if chat_history: logger.info(f"[Agent Stream] 带有 {len(chat_history)} 条历史消息") @@ -2424,36 +2437,79 @@ class MCPAgentIntegrated: # 如果传入了自定义模型配置,使用自定义配置,否则使用默认 LLM if model_config: - planning_client = OpenAI( + llm_client = OpenAI( api_key=model_config["api_key"], base_url=model_config["base_url"], ) - planning_model = model_config["model"] - logger.info(f"[Agent Stream] 使用自定义模型: {planning_model}") + llm_model = model_config["model"] + llm_max_tokens = model_config.get("max_tokens", 8192) + logger.info(f"[Agent Stream] 使用自定义模型: {llm_model}") else: - planning_client = self.llm_client - planning_model = self.llm_model - logger.info(f"[Agent Stream] 使用默认模型: {planning_model}") + llm_client = self.llm_client + llm_model = self.llm_model + llm_max_tokens = self.llm_max_tokens + logger.info(f"[Agent Stream] 使用默认模型: {llm_model}") + + # 将工具列表转换为 OpenAI tools 格式 + openai_tools = [] + for tool in tools: + openai_tools.append({ + "type": "function", + "function": { + "name": tool["name"], + "description": tool["description"], + "parameters": tool["parameters"] + } + }) + logger.info(f"[Agent Stream] 已加载 {len(openai_tools)} 个工具") + + # 获取当前时间信息 + now = datetime.now() + current_time_info = f"""当前时间: {now.strftime('%Y年%m月%d日 %H:%M:%S')} {['周一', '周二', '周三', '周四', '周五', '周六', '周日'][now.weekday()]} +A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 +时间语义: "今天"={now.strftime('%Y-%m-%d')}, "最近"=最近5-10个交易日""" + + # 构建系统提示词(适用于原生 tool calling) + system_prompt = f"""你是"价小前",北京价值前沿科技公司的AI投研聊天助手。 + +## 你的能力 +- 专业领域: 股票投资研究、市场分析、新闻解读、财务分析 +- 你可以调用各种工具来查询股票数据、新闻、概念板块等信息 +- 根据用户问题,智能选择并调用合适的工具 + +{current_time_info} + +## 重要知识 +- 贵州茅台: 600519 +- 涨停: 涨幅约10% +- 概念板块: 相同题材股票分类 + +## 工具使用原则 +1. 根据用户问题,选择最合适的工具 +2. 可以多次调用工具来完成复杂任务 +3. 获取数据后,给出专业的分析和总结 +4. 如果需要总结新闻类数据,使用 summarize_news 工具 + +## 输出要求 +- 使用 Markdown 格式,结构清晰 +- 重要数据用 **加粗** 标注 +- 如有数值数据,可使用 ECharts 图表展示(使用 ```echarts 代码块)""" try: # 发送开始事件 yield self._format_sse("status", {"stage": "start", "message": "开始处理查询"}) - # 阶段1: 使用选中的模型制定计划(流式,带 DeepMoney 备选) - yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."}) - - # 构建消息列表(包含历史对话上下文) + # 构建消息列表 messages = [ - {"role": "system", "content": self.get_planning_prompt(tools)}, + {"role": "system", "content": system_prompt}, ] - # 添加历史对话(最近 10 轮,避免上下文过长) + # 添加历史对话(最近 10 轮) if chat_history: - recent_history = chat_history[-10:] # 最近 10 条消息 + recent_history = chat_history[-10:] for msg in recent_history: role = "user" if msg.get("message_type") == "user" else "assistant" content = msg.get("message", "") - # 截断过长的历史消息 if len(content) > 500: content = content[:500] + "..." messages.append({"role": role, "content": content}) @@ -2462,183 +2518,183 @@ class MCPAgentIntegrated: # 添加当前用户查询 messages.append({"role": "user", "content": user_query}) - reasoning_content = "" - plan_content = "" - use_fallback = False - - try: - # 尝试使用选中的模型流式 API - # 从模型配置获取 max_tokens,默认 8192 - model_max_tokens = model_config.get("max_tokens", 8192) if model_config else 8192 - stream = planning_client.chat.completions.create( - model=planning_model, - messages=messages, - temperature=1.0, - max_tokens=model_max_tokens, - stream=True, # 启用流式输出 - ) - - # 逐块接收 LLM 的响应 - for chunk in stream: - if chunk.choices[0].delta.content: - content_chunk = chunk.choices[0].delta.content - plan_content += content_chunk - - # 发送思考过程片段 - yield self._format_sse("thinking", { - "content": content_chunk, - "stage": "planning" - }) - - # 提取 reasoning_content(如果有) - if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'reasoning_content'): - reasoning_chunk = chunk.choices[0].delta.reasoning_content - if reasoning_chunk: - reasoning_content += reasoning_chunk - # 发送推理过程片段 - yield self._format_sse("reasoning", { - "content": reasoning_chunk - }) - - except Exception as llm_error: - # 检查是否是内容风控错误(400) - error_str = str(llm_error) - if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str): - logger.warning(f"[Planning] LLM 内容风控拒绝,切换到 DeepMoney: {error_str}") - use_fallback = True - - yield self._format_sse("status", { - "stage": "planning", - "message": "切换到备用模型制定计划..." - }) - - try: - # 使用 DeepMoney 备选方案(非流式) - fallback_response = self.deepmoney_client.chat.completions.create( - model=self.deepmoney_model, - messages=messages, - temperature=0.7, - max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192), - ) - - plan_content = fallback_response.choices[0].message.content - - # 发送完整的计划内容(一次性) - yield self._format_sse("thinking", { - "content": plan_content, - "stage": "planning" - }) - - logger.info(f"[Planning] DeepMoney 备选方案成功") - - except Exception as fallback_error: - logger.error(f"[Planning] DeepMoney 备选方案也失败: {fallback_error}") - raise Exception(f"LLM 和 DeepMoney 都无法生成计划: {llm_error}, {fallback_error}") - else: - # 不是内容风控错误,直接抛出 - logger.error(f"[Planning] LLM 调用失败(非风控原因): {llm_error}") - raise - - # 解析完整的计划 - plan_json = plan_content.strip() - - # 清理可能的代码块标记 - if "```json" in plan_json: - plan_json = plan_json.split("```json")[1].split("```")[0].strip() - elif "```" in plan_json: - plan_json = plan_json.split("```")[1].split("```")[0].strip() - - plan_data = json.loads(plan_json) - - plan = ExecutionPlan( - goal=plan_data["goal"], - reasoning=plan_data.get("reasoning", "") + "\n\n" + (reasoning_content[:500] if reasoning_content else ""), - steps=[ToolCall(**step) for step in plan_data["steps"]], - ) - - logger.info(f"[Planning] 计划制定完成: {len(plan.steps)} 步") - - # 发送完整计划 - yield self._format_sse("plan", { - "goal": plan.goal, - "reasoning": plan.reasoning, - "steps": [ - {"tool": step.tool, "arguments": step.arguments, "reason": step.reason} - for step in plan.steps - ], - }) - - # 阶段2: 执行工具(逐步返回) - yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(plan.steps)} 个步骤"}) - + # 用于收集执行结果 step_results = [] collected_data = {} + plan_steps = [] # 记录执行的步骤,用于前端显示 + step_index = 0 + max_tool_calls = 10 # 最大工具调用次数,防止无限循环 - for i, step in enumerate(plan.steps): - # 发送步骤开始事件 - yield self._format_sse("step_start", { - "step_index": i, - "tool": step.tool, - "arguments": step.arguments, - "reason": step.reason, - }) + yield self._format_sse("status", {"stage": "thinking", "message": "正在分析问题..."}) - start_time = datetime.now() + # 循环处理,直到模型不再调用工具 + while step_index < max_tool_calls: + logger.info(f"[Agent Stream] 第 {step_index + 1} 轮 LLM 调用") + # 使用原生 tool calling(非流式,因为需要获取 tool_calls) try: - # 替换占位符 - arguments = step.arguments.copy() - if step.tool == "summarize_news": - if arguments.get("data") in ["前面的新闻数据", "前面收集的所有数据"]: - arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) - - # 执行工具 - result = await self.execute_tool(step.tool, arguments, tool_handlers) - execution_time = (datetime.now() - start_time).total_seconds() - - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=arguments, - status="success", - result=result, - execution_time=execution_time, + response = llm_client.chat.completions.create( + model=llm_model, + messages=messages, + tools=openai_tools, + tool_choice="auto", + temperature=0.7, + max_tokens=llm_max_tokens, + stream=False, # 工具调用需要非流式 ) - step_results.append(step_result) - collected_data[f"step_{i+1}_{step.tool}"] = result - - # 发送步骤完成事件(包含结果) - yield self._format_sse("step_complete", { - "step_index": i, - "tool": step.tool, - "status": "success", - "result": result, - "execution_time": execution_time, - }) - except Exception as e: - execution_time = (datetime.now() - start_time).total_seconds() + logger.error(f"[Agent Stream] LLM 调用失败: {e}") + raise - step_result = StepResult( - step_index=i, - tool=step.tool, - arguments=step.arguments, - status="failed", - error=str(e), - execution_time=execution_time, - ) - step_results.append(step_result) + assistant_message = response.choices[0].message + logger.info(f"[Agent Stream] LLM 响应: finish_reason={response.choices[0].finish_reason}") - # 发送步骤失败事件 - yield self._format_sse("step_complete", { - "step_index": i, - "tool": step.tool, - "status": "failed", - "error": str(e), - "execution_time": execution_time, - }) + # 检查是否有工具调用 + if assistant_message.tool_calls: + logger.info(f"[Agent Stream] 检测到 {len(assistant_message.tool_calls)} 个工具调用") - # 阶段3: LLM 生成总结(流式) + # 将 assistant 消息添加到历史(包含 tool_calls) + messages.append(assistant_message) + + # 如果是第一次工具调用,发送计划事件 + if step_index == 0: + # 构建计划数据 + plan_data = { + "goal": f"分析用户问题:{user_query[:50]}...", + "reasoning": "使用工具获取相关数据进行分析", + "steps": [] + } + for tc in assistant_message.tool_calls: + try: + args = json.loads(tc.function.arguments) if tc.function.arguments else {} + except: + args = {} + plan_data["steps"].append({ + "tool": tc.function.name, + "arguments": args, + "reason": f"调用 {tc.function.name}" + }) + + yield self._format_sse("plan", plan_data) + yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(assistant_message.tool_calls)} 个工具调用"}) + + # 执行每个工具调用 + for tool_call in assistant_message.tool_calls: + tool_name = tool_call.function.name + tool_call_id = tool_call.id + + try: + arguments = json.loads(tool_call.function.arguments) if tool_call.function.arguments else {} + except json.JSONDecodeError: + arguments = {} + logger.warning(f"[Agent Stream] 工具参数解析失败: {tool_call.function.arguments}") + + logger.info(f"[Tool Call] ========== 工具调用开始 ==========") + logger.info(f"[Tool Call] 工具名: {tool_name}") + logger.info(f"[Tool Call] tool_call_id: {tool_call_id}") + logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False)}") + + # 发送步骤开始事件 + yield self._format_sse("step_start", { + "step_index": step_index, + "tool": tool_name, + "arguments": arguments, + "reason": f"调用 {tool_name}", + }) + + start_time = datetime.now() + + try: + # 特殊处理 summarize_news + if tool_name == "summarize_news": + data_arg = arguments.get("data", "") + if data_arg in ["前面的新闻数据", "前面收集的所有数据", ""]: + arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) + + # 执行工具 + result = await self.execute_tool(tool_name, arguments, tool_handlers) + execution_time = (datetime.now() - start_time).total_seconds() + + # 记录结果 + step_result = StepResult( + step_index=step_index, + tool=tool_name, + arguments=arguments, + status="success", + result=result, + execution_time=execution_time, + ) + step_results.append(step_result) + collected_data[f"step_{step_index+1}_{tool_name}"] = result + plan_steps.append({"tool": tool_name, "arguments": arguments, "reason": f"调用 {tool_name}"}) + + # 发送步骤完成事件 + yield self._format_sse("step_complete", { + "step_index": step_index, + "tool": tool_name, + "status": "success", + "result": result, + "execution_time": execution_time, + }) + + # 将工具结果添加到消息历史 + result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result) + messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "content": result_str[:5000] # 截断过长的结果 + }) + + logger.info(f"[Tool Call] 执行成功,耗时 {execution_time:.2f}s") + + except Exception as e: + execution_time = (datetime.now() - start_time).total_seconds() + error_msg = str(e) + + step_result = StepResult( + step_index=step_index, + tool=tool_name, + arguments=arguments, + status="failed", + error=error_msg, + execution_time=execution_time, + ) + step_results.append(step_result) + + # 发送步骤失败事件 + yield self._format_sse("step_complete", { + "step_index": step_index, + "tool": tool_name, + "status": "failed", + "error": error_msg, + "execution_time": execution_time, + }) + + # 将错误信息添加到消息历史 + messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "content": f"工具执行失败: {error_msg}" + }) + + logger.error(f"[Tool Call] 执行失败: {error_msg}") + + logger.info(f"[Tool Call] ========== 工具调用结束 ==========") + step_index += 1 + + else: + # 没有工具调用,模型生成了最终回复 + logger.info(f"[Agent Stream] 模型生成最终回复") + break + + # 构建 plan 对象(用于保存到 ES) + plan = ExecutionPlan( + goal=f"分析用户问题:{user_query[:50]}...", + reasoning="使用工具获取相关数据进行分析", + steps=[ToolCall(tool=s["tool"], arguments=s["arguments"], reason=s["reason"]) for s in plan_steps], + ) + + # 阶段3: 生成最终总结 yield self._format_sse("status", {"stage": "summarizing", "message": "正在生成最终总结..."}) # 收集成功的结果 @@ -2647,84 +2703,34 @@ class MCPAgentIntegrated: # 初始化 final_summary final_summary = "" - if not successful_results: + if not successful_results and step_index == 0: + # 如果没有执行任何工具(模型直接回复),使用模型的回复 + if assistant_message and assistant_message.content: + final_summary = assistant_message.content + # 流式发送(虽然已经是完整的,但保持前端兼容) + yield self._format_sse("summary_chunk", {"content": final_summary}) + else: + final_summary = "抱歉,我无法处理您的请求。" + yield self._format_sse("summary_chunk", {"content": final_summary}) + elif not successful_results: + # 所有步骤都失败 final_summary = "很抱歉,所有步骤都执行失败,无法生成分析报告。" - yield self._format_sse("summary", { - "content": final_summary, - "metadata": { - "total_steps": len(plan.steps), - "successful_steps": 0, - "failed_steps": len(step_results), - "total_execution_time": sum(r.execution_time for r in step_results), - }, - }) + yield self._format_sse("summary_chunk", {"content": final_summary}) else: - # 构建结果文本(精简版) - results_text = "\n\n".join([ - f"**步骤 {r.step_index + 1}: {r.tool}**\n" - f"结果: {str(r.result)[:800]}..." - for r in successful_results[:3] # 只取前3个,避免超长 - ]) - - messages = [ - { - "role": "system", - "content": """你是专业的金融研究助手。根据执行结果,生成简洁清晰的报告。 - -## 数据可视化能力 -如果执行结果中包含数值型数据(如财务指标、交易数据、时间序列等),你可以使用 ECharts 生成图表来增强报告的可读性。 - -支持的图表类型: -- 折线图(line):适合时间序列数据(如股价走势、财务指标趋势) -- 柱状图(bar):适合对比数据(如不同年份的收入、利润对比) -- 饼图(pie):适合占比数据(如业务结构、资产分布) - -### 图表格式(使用 Markdown 代码块) -在报告中插入图表时,使用以下格式: - -```echarts -{ - "title": {"text": "图表标题"}, - "tooltip": {}, - "xAxis": {"type": "category", "data": ["类别1", "类别2"]}, - "yAxis": {"type": "value"}, - "series": [{"name": "数据系列", "type": "line", "data": [100, 200]}] -} -``` - -**重要提示**: -- ECharts 配置必须是合法的 JSON 格式 -- 只在有明确数值数据时才生成图表 -- 不要凭空捏造数据""" - }, - { - "role": "user", - "content": f"""用户问题:{user_query} - -执行计划:{plan.goal} - -执行结果: -{results_text} - -请生成专业的分析报告(500字以内)。如果结果中包含数值型数据,请使用 ECharts 图表进行可视化展示。""" - }, - ] - - # 使用流式 API 生成总结(带 DeepMoney 备选) - final_summary = "" - + # 有成功的工具调用,使用流式 API 生成最终回复 try: - summary_stream = self.llm_client.chat.completions.create( - model=self.llm_model, - messages=messages, + # 使用流式 API 生成最终回复(不再传入 tools,让模型生成文本回复) + summary_stream = llm_client.chat.completions.create( + model=llm_model, + messages=messages, # messages 已包含所有工具调用历史 temperature=0.7, - max_tokens=self.llm_max_tokens, + max_tokens=llm_max_tokens, stream=True, # 启用流式输出 ) # 逐块发送总结内容 for chunk in summary_stream: - if chunk.choices[0].delta.content: + if chunk.choices and chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content final_summary += content_chunk @@ -2736,57 +2742,26 @@ class MCPAgentIntegrated: logger.info("[Summary] 流式总结完成") except Exception as llm_error: - # 检查是否是内容风控错误(400) - error_str = str(llm_error) - if "400" in error_str and ("content_filter" in error_str or "high risk" in error_str): - logger.warning(f"[Summary] LLM 内容风控拒绝,切换到 DeepMoney: {error_str}") + logger.error(f"[Summary] 流式总结失败: {llm_error}") + # 降级:使用工具调用结果的简单拼接 + results_text = "\n\n".join([ + f"**{r.tool}**: {str(r.result)[:500]}..." + for r in successful_results[:5] + ]) + final_summary = f"根据查询结果:\n\n{results_text}" + yield self._format_sse("summary_chunk", {"content": final_summary}) + logger.warning("[Summary] 使用降级方案") - yield self._format_sse("status", { - "stage": "summarizing", - "message": "切换到备用模型生成总结..." - }) - - try: - # 使用 DeepMoney 备选方案(非流式) - fallback_response = self.deepmoney_client.chat.completions.create( - model=self.deepmoney_model, - messages=messages, - temperature=0.7, - max_tokens=DEEPMONEY_CONFIG.get("max_tokens", 8192), - ) - - final_summary = fallback_response.choices[0].message.content - - # 发送完整的总结内容(一次性) - yield self._format_sse("summary_chunk", { - "content": final_summary - }) - - logger.info(f"[Summary] DeepMoney 备选方案成功") - - except Exception as fallback_error: - logger.error(f"[Summary] DeepMoney 备选方案也失败: {fallback_error}") - # 使用降级方案:简单拼接执行结果 - final_summary = f"执行了 {len(plan.steps)} 个步骤,其中 {len(successful_results)} 个成功。\n\n执行结果:\n{results_text[:500]}..." - yield self._format_sse("summary_chunk", { - "content": final_summary - }) - logger.warning("[Summary] 使用降级方案(简单拼接)") - else: - # 不是内容风控错误,直接抛出 - logger.error(f"[Summary] LLM 调用失败(非风控原因): {llm_error}") - raise - - # 发送完整的总结和元数据 - yield self._format_sse("summary", { - "content": final_summary, - "metadata": { - "total_steps": len(plan.steps), - "successful_steps": len(successful_results), - "failed_steps": len([r for r in step_results if r.status == "failed"]), - "total_execution_time": sum(r.execution_time for r in step_results), - }, - }) + # 发送完整的总结和元数据 + yield self._format_sse("summary", { + "content": final_summary, + "metadata": { + "total_steps": len(plan.steps) if plan_steps else 0, + "successful_steps": len(successful_results), + "failed_steps": len([r for r in step_results if r.status == "failed"]), + "total_execution_time": sum(r.execution_time for r in step_results) if step_results else 0, + }, + }) # 保存 Agent 回复到 ES(如果提供了 session_id) if session_id and user_id: @@ -2834,8 +2809,8 @@ class MCPAgentIntegrated: except Exception as e: logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True) - # 发送完成事件 - yield self._format_sse("done", {"message": "处理完成"}) + # 发送完成事件(包含 session_id) + yield self._format_sse("done", {"message": "处理完成", "session_id": session_id}) except Exception as e: logger.error(f"[Agent Stream] 错误: {str(e)}", exc_info=True)