diff --git a/__pycache__/mcp_elasticsearch.cpython-310.pyc b/__pycache__/mcp_elasticsearch.cpython-310.pyc new file mode 100644 index 00000000..9e9dea52 Binary files /dev/null and b/__pycache__/mcp_elasticsearch.cpython-310.pyc differ diff --git a/__pycache__/mcp_server.cpython-310.pyc b/__pycache__/mcp_server.cpython-310.pyc new file mode 100644 index 00000000..fcea44c9 Binary files /dev/null and b/__pycache__/mcp_server.cpython-310.pyc differ diff --git a/mcp_elasticsearch.py b/mcp_elasticsearch.py index 53407f5f..53d70e3c 100644 --- a/mcp_elasticsearch.py +++ b/mcp_elasticsearch.py @@ -69,6 +69,8 @@ class ESClient: }, "plan": {"type": "text"}, # 执行计划(仅 assistant) "steps": {"type": "text"}, # 执行步骤(仅 assistant) + "session_title": {"type": "text"}, # 会话标题/概述(新增) + "is_first_message": {"type": "boolean"}, # 是否是会话首条消息(新增) "timestamp": {"type": "date"}, # 时间戳 "created_at": {"type": "date"}, # 创建时间 } @@ -105,6 +107,8 @@ class ESClient: message: str, plan: Optional[str] = None, steps: Optional[str] = None, + session_title: Optional[str] = None, + is_first_message: bool = False, ) -> str: """ 保存聊天消息 @@ -118,6 +122,8 @@ class ESClient: message: 消息内容 plan: 执行计划(可选) steps: 执行步骤(可选) + session_title: 会话标题(可选,通常在首条消息时设置) + is_first_message: 是否是会话首条消息 Returns: 文档ID @@ -136,6 +142,8 @@ class ESClient: "message_embedding": embedding if embedding else None, "plan": plan, "steps": steps, + "session_title": session_title, + "is_first_message": is_first_message, "timestamp": datetime.now(), "created_at": datetime.now(), } @@ -157,10 +165,10 @@ class ESClient: limit: 返回数量 Returns: - 会话列表,每个会话包含:session_id, last_message, last_timestamp + 会话列表,每个会话包含:session_id, title, last_message, last_timestamp """ try: - # 聚合查询:按 session_id 分组,获取每个会话的最后一条消息 + # 聚合查询:按 session_id 分组,获取每个会话的最后一条消息和标题 query = { "query": { "term": {"user_id": user_id} @@ -180,7 +188,15 @@ class ESClient: "top_hits": { "size": 1, "sort": [{"timestamp": {"order": "desc"}}], - "_source": ["message", "timestamp", "message_type"] + "_source": ["message", "timestamp", "message_type", "session_title"] + } + }, + # 获取首条消息(包含标题) + "first_message": { + "top_hits": { + "size": 1, + "sort": [{"timestamp": {"order": "asc"}}], + "_source": ["session_title", "message"] } } } @@ -193,11 +209,21 @@ class ESClient: sessions = [] for bucket in result["aggregations"]["sessions"]["buckets"]: - session_data = bucket["last_message_content"]["hits"]["hits"][0]["_source"] + last_msg = bucket["last_message_content"]["hits"]["hits"][0]["_source"] + first_msg = bucket["first_message"]["hits"]["hits"][0]["_source"] + + # 优先使用 session_title,否则使用首条消息的前30字符 + title = ( + last_msg.get("session_title") or + first_msg.get("session_title") or + first_msg.get("message", "")[:30] + ) + sessions.append({ "session_id": bucket["key"], - "last_message": session_data["message"], - "last_timestamp": session_data["timestamp"], + "title": title, + "last_message": last_msg["message"], + "last_timestamp": last_msg["timestamp"], "message_count": bucket["doc_count"], }) diff --git a/mcp_server.py b/mcp_server.py index 6a3c043f..7e4e2494 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -1845,6 +1845,24 @@ class MCPAgentIntegrated: for tool in tools ]) + # 获取当前时间信息 + from datetime import datetime + now = datetime.now() + current_time_info = f"""## 当前时间 +- **日期**: {now.strftime('%Y年%m月%d日')} +- **时间**: {now.strftime('%H:%M:%S')} +- **星期**: {['周一', '周二', '周三', '周四', '周五', '周六', '周日'][now.weekday()]} +- **A股交易时间**: 上午 9:30-11:30,下午 13:00-15:00 +- **当前是否交易时段**: {'是' if (now.weekday() < 5 and ((now.hour == 9 and now.minute >= 30) or (10 <= now.hour < 11) or (now.hour == 11 and now.minute <= 30) or (13 <= now.hour < 15))) else '否'} + +**时间语义理解**: +- "今天/当天" = {now.strftime('%Y-%m-%d')} +- "最近/近期" = 最近 5-10 个交易日 +- "短线" = 5-20 个交易日 +- "中线" = 1-3 个月 +- "长线" = 6 个月以上 +""" + return f"""你是"价小前",北京价值前沿科技公司的AI投研聊天助手。 ## 你的人格特征 @@ -1854,6 +1872,8 @@ class MCPAgentIntegrated: - **性格**: 专业、严谨、友好,擅长用简洁的语言解释复杂的金融概念 - **服务宗旨**: 帮助投资者做出更明智的投资决策,提供数据驱动的研究支持 +{current_time_info} + ## 可用工具 {tools_desc} @@ -1950,15 +1970,30 @@ class MCPAgentIntegrated: 只返回JSON,不要其他内容。""" - async def create_plan(self, user_query: str, tools: List[dict]) -> ExecutionPlan: - """阶段1: 使用 Kimi 创建执行计划(带思考过程)""" + async def create_plan(self, user_query: str, tools: List[dict], chat_history: List[dict] = None) -> ExecutionPlan: + """阶段1: 使用 Kimi 创建执行计划(带思考过程和历史上下文)""" logger.info(f"[Planning] Kimi开始制定计划: {user_query}") messages = [ {"role": "system", "content": self.get_planning_prompt(tools)}, - {"role": "user", "content": user_query}, ] + # 添加会话历史(多轮对话上下文) + if chat_history: + # 限制历史消息数量,避免 context 过长 + recent_history = chat_history[-10:] # 最近10条消息 + for msg in recent_history: + role = "user" if msg.get("message_type") == "user" else "assistant" + content = msg.get("message", "") + # 截断过长的历史消息 + if len(content) > 500: + content = content[:500] + "..." + messages.append({"role": role, "content": content}) + logger.info(f"[Planning] 添加了 {len(recent_history)} 条历史消息到上下文") + + # 添加当前用户问题 + messages.append({"role": "user", "content": user_query}) + # 使用 Kimi 思考模型 response = self.kimi_client.chat.completions.create( model=self.kimi_model, @@ -2229,13 +2264,16 @@ class MCPAgentIntegrated: user_query: str, tools: List[dict], tool_handlers: Dict[str, Any], + chat_history: List[dict] = None, ) -> AgentResponse: """主流程(非流式)""" logger.info(f"[Agent] 处理查询: {user_query}") + if chat_history: + logger.info(f"[Agent] 带有 {len(chat_history)} 条历史消息") try: - # 阶段1: Kimi 制定计划 - plan = await self.create_plan(user_query, tools) + # 阶段1: Kimi 制定计划(带历史上下文) + plan = await self.create_plan(user_query, tools, chat_history) # 阶段2: 执行工具 step_results = await self.execute_plan(plan, tool_handlers) @@ -2271,6 +2309,46 @@ class MCPAgentIntegrated: message=f"处理失败: {str(e)}", ) + async def generate_session_title(self, user_message: str, assistant_response: str) -> str: + """生成会话标题(简短概述),使用 DeepMoney 模型""" + try: + messages = [ + { + "role": "system", + "content": "你是一个标题生成器。根据用户问题和AI回复,生成一个简短的会话标题(10-20个字)。只返回标题文本,不要任何其他内容。" + }, + { + "role": "user", + "content": f"用户问题:{user_message[:200]}\n\nAI回复:{assistant_response[:500]}\n\n请生成一个简短的会话标题:" + } + ] + + # 使用 DeepMoney 模型(更轻量,适合简单任务) + response = self.deepmoney_client.chat.completions.create( + model=self.deepmoney_model, + messages=messages, + temperature=0.3, + max_tokens=100, + ) + + title = response.choices[0].message.content.strip() + + # 处理 DeepMoney 的 ... 标签,只保留 之后的内容 + if "" in title: + title = title.split("")[-1].strip() + + # 清理可能的引号 + title = title.strip('"\'') + # 限制长度 + if len(title) > 30: + title = title[:27] + "..." + return title + + except Exception as e: + logger.error(f"[Title] 生成标题失败: {e}") + # 降级:使用用户消息的前20个字符 + return user_message[:20] + "..." if len(user_message) > 20 else user_message + async def process_query_stream( self, user_query: str, @@ -2282,9 +2360,15 @@ class MCPAgentIntegrated: user_avatar: str = None, cookies: dict = None, model_config: dict = None, # 新增:动态模型配置 + chat_history: List[dict] = None, # 新增:历史对话记录 + is_new_session: bool = False, # 新增:是否是新会话(用于生成标题) ) -> AsyncGenerator[str, None]: """主流程(流式输出)- 逐步返回执行结果""" logger.info(f"[Agent Stream] 处理查询: {user_query}") + if chat_history: + logger.info(f"[Agent Stream] 带有 {len(chat_history)} 条历史消息") + if is_new_session: + logger.info(f"[Agent Stream] 这是新会话,将在完成后生成标题") # 将 cookies 存储为实例属性,供工具调用时使用 self.cookies = cookies or {} @@ -2309,11 +2393,26 @@ class MCPAgentIntegrated: # 阶段1: 使用选中的模型制定计划(流式,带 DeepMoney 备选) yield self._format_sse("status", {"stage": "planning", "message": "正在制定执行计划..."}) + # 构建消息列表(包含历史对话上下文) messages = [ {"role": "system", "content": self.get_planning_prompt(tools)}, - {"role": "user", "content": user_query}, ] + # 添加历史对话(最近 10 轮,避免上下文过长) + if chat_history: + recent_history = chat_history[-10:] # 最近 10 条消息 + for msg in recent_history: + role = "user" if msg.get("message_type") == "user" else "assistant" + content = msg.get("message", "") + # 截断过长的历史消息 + if len(content) > 500: + content = content[:500] + "..." + messages.append({"role": role, "content": content}) + logger.info(f"[Agent Stream] 已添加 {len(recent_history)} 条历史消息到上下文") + + # 添加当前用户查询 + messages.append({"role": "user", "content": user_query}) + reasoning_content = "" plan_content = "" use_fallback = False @@ -2650,6 +2749,17 @@ class MCPAgentIntegrated: "steps": [{"tool": step.tool, "arguments": step.arguments, "reason": step.reason} for step in plan.steps] }, ensure_ascii=False) + # 如果是新会话,生成会话标题 + session_title = None + if is_new_session: + try: + session_title = await self.generate_session_title(user_query, final_summary) + logger.info(f"[Title] 新会话标题: {session_title}") + except Exception as title_error: + logger.error(f"[Title] 生成标题失败: {title_error}") + # 降级:使用用户消息的前 20 个字符 + session_title = user_query[:20] + "..." if len(user_query) > 20 else user_query + es_client.save_chat_message( session_id=session_id, user_id=user_id, @@ -2659,8 +2769,13 @@ class MCPAgentIntegrated: message=final_summary, plan=plan_json, steps=steps_json, + session_title=session_title, # 新会话时保存标题 ) logger.info(f"[ES] Agent 回复已保存到会话 {session_id}") + + # 如果生成了标题,通过 SSE 发送给前端 + if session_title: + yield self._format_sse("session_title", {"title": session_title}) except Exception as e: logger.error(f"[ES] 保存 Agent 回复失败: {e}", exc_info=True) @@ -2751,6 +2866,17 @@ async def agent_chat(request: AgentChatRequest): # ==================== 会话管理 ==================== # 如果没有提供 session_id,创建新会话 session_id = request.session_id or str(uuid.uuid4()) + is_new_session = not request.session_id + + # 获取会话历史(用于多轮对话) + chat_history = [] + if not is_new_session: + try: + history = es_client.get_chat_history(session_id, limit=20) # 最近20条 + chat_history = history + logger.info(f"加载会话历史: {len(chat_history)} 条消息") + except Exception as e: + logger.error(f"获取会话历史失败: {e}") # 保存用户消息到 ES try: @@ -2761,6 +2887,7 @@ async def agent_chat(request: AgentChatRequest): user_avatar=request.user_avatar or "", message_type="user", message=request.message, + is_first_message=is_new_session, ) except Exception as e: logger.error(f"保存用户消息失败: {e}") @@ -2796,14 +2923,16 @@ async def agent_chat(request: AgentChatRequest): } }) - # 处理查询 + # 处理查询(传入会话历史实现多轮对话) response = await agent.process_query( user_query=request.message, tools=tools, tool_handlers=TOOL_HANDLERS, + chat_history=chat_history, ) # 保存 Agent 回复到 ES + session_title = None try: # 将执行步骤转换为JSON字符串 steps_json = json.dumps( @@ -2814,6 +2943,11 @@ async def agent_chat(request: AgentChatRequest): # 将 plan 转换为 JSON 字符串(ES 中 plan 字段是 text 类型) plan_json = json.dumps(response.plan.dict(), ensure_ascii=False) if response.plan else None + # 如果是新会话,生成会话标题 + if is_new_session: + session_title = await agent.generate_session_title(request.message, response.final_summary) + logger.info(f"生成会话标题: {session_title}") + es_client.save_chat_message( session_id=session_id, user_id=request.user_id or "anonymous", @@ -2823,13 +2957,15 @@ async def agent_chat(request: AgentChatRequest): message=response.final_summary, # 使用 final_summary 而不是 final_answer plan=plan_json, # 传递 JSON 字符串而不是字典 steps=steps_json, + session_title=session_title, ) except Exception as e: logger.error(f"保存 Agent 回复失败: {e}", exc_info=True) - # 在响应中返回 session_id + # 在响应中返回 session_id 和 title response_dict = response.dict() response_dict["session_id"] = session_id + response_dict["session_title"] = session_title return response_dict @app.post("/agent/chat/stream") @@ -2872,9 +3008,21 @@ async def agent_chat_stream(chat_request: AgentChatRequest, request: Request): f"subscription_type: {user_subscription}" ) - # 如果没有提供 session_id,创建新会话 + # 判断是否是新会话 + is_new_session = not chat_request.session_id session_id = chat_request.session_id or str(uuid.uuid4()) + # ==================== 加载历史对话(多轮对话记忆)==================== + chat_history = [] + if not is_new_session: + try: + # 加载该会话的历史消息(最近 20 条) + history = es_client.get_chat_history(session_id, limit=20) + chat_history = history + logger.info(f"[Stream] 已加载 {len(chat_history)} 条历史消息") + except Exception as e: + logger.error(f"[Stream] 加载历史消息失败: {e}") + # 保存用户消息到 ES try: es_client.save_chat_message( @@ -2884,6 +3032,7 @@ async def agent_chat_stream(chat_request: AgentChatRequest, request: Request): user_avatar=chat_request.user_avatar or "", message_type="user", message=chat_request.message, + is_first_message=is_new_session, # 标记是否为首条消息 ) logger.info(f"[ES] 用户消息已保存到会话 {session_id}") except Exception as e: @@ -2940,6 +3089,8 @@ async def agent_chat_stream(chat_request: AgentChatRequest, request: Request): user_avatar=chat_request.user_avatar, cookies=cookies, # 传递 cookies 用于认证 API 调用 model_config=model_config, # 传递选中的模型配置 + chat_history=chat_history, # 传递历史对话(多轮对话记忆) + is_new_session=is_new_session, # 传递是否是新会话(用于生成标题) ), media_type="text/event-stream", headers={ diff --git a/src/views/AgentChat/components/LeftSidebar/SessionCard.js b/src/views/AgentChat/components/LeftSidebar/SessionCard.js index 63a4ef23..4e3f4d6c 100644 --- a/src/views/AgentChat/components/LeftSidebar/SessionCard.js +++ b/src/views/AgentChat/components/LeftSidebar/SessionCard.js @@ -40,16 +40,22 @@ const SessionCard = ({ session, isActive, onPress }) => { - - {session.title || '新对话'} + + {session.title || session.last_message?.substring(0, 30) || '新对话'} - {new Date(session.created_at || session.timestamp).toLocaleString('zh-CN', { - month: 'numeric', - day: 'numeric', - hour: '2-digit', - minute: '2-digit', - })} + {(() => { + const dateStr = session.created_at || session.last_timestamp || session.timestamp; + if (!dateStr) return '刚刚'; + const date = new Date(dateStr); + if (isNaN(date.getTime())) return '刚刚'; + return date.toLocaleString('zh-CN', { + month: 'numeric', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + }); + })()} {session.message_count && (