# mcp_chat_api.py - MCP 聊天 API 端点 # 添加到 app.py 或作为独立模块导入 from flask import Blueprint, request, jsonify, session, Response import json import time from typing import Generator # 创建蓝图 mcp_chat_bp = Blueprint('mcp_chat', __name__, url_prefix='/api/mcp') # 假设已有 mcp_server.py 中的相关功能 # from mcp_server import process_message, get_available_tools, execute_code def check_chat_permission(): """检查用户是否有权限访问 AI 聊天功能""" if not session.get('user_id'): return False, '请先登录' # 这里可以添加订阅检查逻辑 # user = User.query.get(session['user_id']) # if user.subscription_tier not in ['premium', 'pro', 'enterprise']: # return False, '需要订阅才能使用 AI 助手' return True, None @mcp_chat_bp.route('/chat', methods=['POST']) def chat(): """处理聊天消息""" # 权限检查 has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 data = request.json message = data.get('message', '') session_id = data.get('sessionId', '') model = data.get('model', 'claude-3-opus') if not message: return jsonify({'error': '消息不能为空'}), 400 try: # 调用 MCP 处理消息 # response = process_message( # message=message, # user_id=session['user_id'], # session_id=session_id, # model=model # ) # 临时模拟响应 response = { 'reply': f'收到您的消息: "{message}"。这是一个模拟响应,实际部署时会调用 MCP 模型。', 'sessionId': session_id, 'timestamp': time.time() } return jsonify(response) except Exception as e: print(f"Chat error: {e}") return jsonify({'error': '处理消息时出错'}), 500 @mcp_chat_bp.route('/chat/stream', methods=['POST']) def chat_stream(): """流式处理聊天消息""" has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 data = request.json message = data.get('message', '') session_id = data.get('sessionId', '') def generate() -> Generator[str, None, None]: """生成 SSE 流""" try: # 这里应该调用实际的 MCP 流式 API # for chunk in mcp_stream_message(message, session_id): # yield f"data: {json.dumps(chunk)}\n\n" # 临时模拟流式响应 words = f'这是对 "{message}" 的流式响应。每个字会逐个发送。'.split() for word in words: chunk = {'content': word + ' ', 'type': 'text'} yield f"data: {json.dumps(chunk)}\n\n" time.sleep(0.1) # 模拟延迟 yield "data: [DONE]\n\n" except Exception as e: error_chunk = {'error': str(e), 'type': 'error'} yield f"data: {json.dumps(error_chunk)}\n\n" return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', # Nginx 不缓冲 } ) @mcp_chat_bp.route('/tools', methods=['GET']) def get_tools(): """获取可用的 MCP 工具列表""" has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 # 这里应该返回实际的工具列表 tools = [ { 'id': 'code_executor', 'name': '代码执行器', 'description': '执行 Python、JavaScript 等代码', 'icon': 'code' }, { 'id': 'web_search', 'name': '网页搜索', 'description': '搜索网络信息', 'icon': 'search' }, { 'id': 'calculator', 'name': '计算器', 'description': '执行数学计算', 'icon': 'calculator' } ] return jsonify(tools) @mcp_chat_bp.route('/execute', methods=['POST']) def execute_code(): """执行代码""" has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 data = request.json code = data.get('code', '') language = data.get('language', 'python') if not code: return jsonify({'error': '代码不能为空'}), 400 try: # 实际执行代码 # result = execute_code_safely(code, language) # 临时模拟 result = { 'output': f'执行 {language} 代码成功\n输出: Hello World', 'error': None, 'executionTime': 0.023 } return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @mcp_chat_bp.route('/generate-code', methods=['POST']) def generate_code(): """生成代码""" has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 data = request.json prompt = data.get('prompt', '') language = data.get('language', 'python') if not prompt: return jsonify({'error': '提示不能为空'}), 400 try: # 调用 MCP 生成代码 # code = mcp_generate_code(prompt, language) # 临时模拟 code = f'''# {prompt} def hello_world(): print("Hello, World!") if __name__ == "__main__": hello_world()''' return jsonify({ 'code': code, 'language': language, 'prompt': prompt }) except Exception as e: return jsonify({'error': str(e)}), 500 @mcp_chat_bp.route('/history', methods=['GET']) def get_history(): """获取聊天历史""" has_permission, error_msg = check_chat_permission() if not has_permission: return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403 session_id = request.args.get('sessionId') # 这里应该从数据库获取历史 # history = ChatHistory.query.filter_by( # user_id=session['user_id'], # session_id=session_id # ).all() # 临时模拟 history = [ { 'role': 'user', 'content': '你好', 'timestamp': '2024-01-01T10:00:00Z' }, { 'role': 'assistant', 'content': '你好!有什么可以帮助您的吗?', 'timestamp': '2024-01-01T10:00:01Z' } ] return jsonify(history) # 在 app.py 中注册蓝图 # app.register_blueprint(mcp_chat_bp)