diff --git a/__pycache__/mcp_server.cpython-310.pyc b/__pycache__/mcp_server.cpython-310.pyc index d8e93f25..e06d1636 100644 Binary files a/__pycache__/mcp_server.cpython-310.pyc and b/__pycache__/mcp_server.cpython-310.pyc differ diff --git a/mcp_server.py b/mcp_server.py index 890251be..9889235d 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -2542,9 +2542,22 @@ A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 assistant_message = response.choices[0].message logger.info(f"[Agent Stream] LLM 响应: finish_reason={response.choices[0].finish_reason}") - # 检查是否有工具调用 - if assistant_message.tool_calls: - logger.info(f"[Agent Stream] 检测到 {len(assistant_message.tool_calls)} 个工具调用") + # 获取工具调用(优先使用原生 tool_calls,其次解析文本格式) + native_tool_calls = assistant_message.tool_calls or [] + text_tool_calls = [] + + # 如果没有原生工具调用,尝试从文本内容中解析 + if not native_tool_calls and assistant_message.content: + content = assistant_message.content + # 检查是否包含工具调用标记 + if '' in content or '```tool_call' in content or '"tool":' in content: + logger.info(f"[Agent Stream] 尝试从文本内容解析工具调用") + logger.info(f"[Agent Stream] 内容预览: {content[:500]}") + text_tool_calls = self._parse_text_tool_calls(content) + + # 检查是否有工具调用(原生或文本格式) + if native_tool_calls: + logger.info(f"[Agent Stream] 检测到 {len(native_tool_calls)} 个原生工具调用") # 将 assistant 消息添加到历史(包含 tool_calls) messages.append(assistant_message) @@ -2557,7 +2570,7 @@ A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 "reasoning": "使用工具获取相关数据进行分析", "steps": [] } - for tc in assistant_message.tool_calls: + for tc in native_tool_calls: try: args = json.loads(tc.function.arguments) if tc.function.arguments else {} except: @@ -2569,10 +2582,10 @@ A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 }) yield self._format_sse("plan", plan_data) - yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(assistant_message.tool_calls)} 个工具调用"}) + yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(native_tool_calls)} 个工具调用"}) # 执行每个工具调用 - for tool_call in assistant_message.tool_calls: + for tool_call in native_tool_calls: tool_name = tool_call.function.name tool_call_id = tool_call.id @@ -2675,6 +2688,120 @@ A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 logger.info(f"[Tool Call] ========== 工具调用结束 ==========") step_index += 1 + elif text_tool_calls: + # 处理文本格式的工具调用 + logger.info(f"[Agent Stream] 检测到 {len(text_tool_calls)} 个文本格式工具调用") + + # 将 assistant 消息添加到历史 + messages.append({"role": "assistant", "content": assistant_message.content}) + + # 如果是第一次工具调用,发送计划事件 + if step_index == 0: + plan_data = { + "goal": f"分析用户问题:{user_query[:50]}...", + "reasoning": "使用工具获取相关数据进行分析", + "steps": [ + {"tool": tc["name"], "arguments": tc["arguments"], "reason": f"调用 {tc['name']}"} + for tc in text_tool_calls + ] + } + yield self._format_sse("plan", plan_data) + yield self._format_sse("status", {"stage": "executing", "message": f"开始执行 {len(text_tool_calls)} 个工具调用"}) + + # 执行每个工具调用 + for tc in text_tool_calls: + tool_name = tc["name"] + arguments = tc["arguments"] + tool_call_id = f"text_call_{step_index}_{tool_name}" + + logger.info(f"[Tool Call] ========== 文本工具调用开始 ==========") + logger.info(f"[Tool Call] 工具名: {tool_name}") + logger.info(f"[Tool Call] 参数内容: {json.dumps(arguments, ensure_ascii=False)}") + + # 发送步骤开始事件 + yield self._format_sse("step_start", { + "step_index": step_index, + "tool": tool_name, + "arguments": arguments, + "reason": f"调用 {tool_name}", + }) + + start_time = datetime.now() + + try: + # 特殊处理 summarize_news + if tool_name == "summarize_news": + data_arg = arguments.get("data", "") + if data_arg in ["前面的新闻数据", "前面收集的所有数据", ""]: + arguments["data"] = json.dumps(collected_data, ensure_ascii=False, indent=2) + + # 执行工具 + result = await self.execute_tool(tool_name, arguments, tool_handlers) + execution_time = (datetime.now() - start_time).total_seconds() + + # 记录结果 + step_result = StepResult( + step_index=step_index, + tool=tool_name, + arguments=arguments, + status="success", + result=result, + execution_time=execution_time, + ) + step_results.append(step_result) + collected_data[f"step_{step_index+1}_{tool_name}"] = result + plan_steps.append({"tool": tool_name, "arguments": arguments, "reason": f"调用 {tool_name}"}) + + # 发送步骤完成事件 + yield self._format_sse("step_complete", { + "step_index": step_index, + "tool": tool_name, + "status": "success", + "result": result, + "execution_time": execution_time, + }) + + # 将工具结果添加到消息历史(简化格式,因为模型可能不支持标准 tool 消息) + result_str = json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result) + messages.append({ + "role": "user", + "content": f"[工具调用结果] {tool_name}: {result_str[:3000]}" + }) + + logger.info(f"[Tool Call] 执行成功,耗时 {execution_time:.2f}s") + + except Exception as e: + execution_time = (datetime.now() - start_time).total_seconds() + error_msg = str(e) + + step_result = StepResult( + step_index=step_index, + tool=tool_name, + arguments=arguments, + status="failed", + error=error_msg, + execution_time=execution_time, + ) + step_results.append(step_result) + + yield self._format_sse("step_complete", { + "step_index": step_index, + "tool": tool_name, + "status": "failed", + "error": error_msg, + "execution_time": execution_time, + }) + + messages.append({ + "role": "user", + "content": f"[工具调用失败] {tool_name}: {error_msg}" + }) + + logger.error(f"[Tool Call] 执行失败: {error_msg}") + + logger.info(f"[Tool Call] ========== 文本工具调用结束 ==========") + step_index += 1 + else: # 没有工具调用,模型生成了最终回复 logger.info(f"[Agent Stream] 模型生成最终回复") @@ -2813,6 +2940,75 @@ A股交易时间: 上午 9:30-11:30,下午 13:00-15:00 """格式化 SSE 消息""" return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + def _parse_text_tool_calls(self, content: str) -> List[Dict[str, Any]]: + """ + 解析文本格式的工具调用 + + 支持的格式: + 1. value + 2. ```tool_call\n{"name": "xxx", "arguments": {...}}\n``` + + 返回: [{"name": "tool_name", "arguments": {...}}, ...] + """ + import re + + tool_calls = [] + + # 格式1: 标签格式 + # 例如: 300274 + pattern1 = r'\s*(.*?)\s*' + matches1 = re.findall(pattern1, content, re.DOTALL) + + for func_name, params_str in matches1: + arguments = {} + # 解析参数: value + param_pattern = r'\s*(.*?)\s*' + param_matches = re.findall(param_pattern, params_str, re.DOTALL) + for param_name, param_value in param_matches: + # 尝试解析 JSON 值,否则作为字符串 + param_value = param_value.strip() + try: + arguments[param_name] = json.loads(param_value) + except: + arguments[param_name] = param_value + + tool_calls.append({ + "name": func_name, + "arguments": arguments + }) + + # 格式2: ```tool_call 代码块格式 + pattern2 = r'```tool_call\s*\n?(.*?)\n?```' + matches2 = re.findall(pattern2, content, re.DOTALL) + + for match in matches2: + try: + data = json.loads(match.strip()) + if isinstance(data, dict) and "name" in data: + tool_calls.append({ + "name": data["name"], + "arguments": data.get("arguments", {}) + }) + except: + pass + + # 格式3: 直接 JSON 格式 {"tool": "xxx", "arguments": {...}} + pattern3 = r'\{\s*"tool"\s*:\s*"(\w+)"\s*,\s*"arguments"\s*:\s*(\{[^}]*\})\s*\}' + matches3 = re.findall(pattern3, content) + + for tool_name, args_str in matches3: + try: + arguments = json.loads(args_str) + tool_calls.append({ + "name": tool_name, + "arguments": arguments + }) + except: + pass + + logger.info(f"[Text Tool Call] 解析到 {len(tool_calls)} 个工具调用: {tool_calls}") + return tool_calls + # 创建 Agent 实例(全局) agent = MCPAgentIntegrated()