Files
vf_react/mcp_chat_api.py
2025-11-22 08:57:37 +08:00

233 lines
6.8 KiB
Python

# mcp_chat_api.py - MCP 聊天 API 端点
# 添加到 app.py 或作为独立模块导入
from flask import Blueprint, request, jsonify, session, Response
import json
import time
from typing import Generator
# 创建蓝图
mcp_chat_bp = Blueprint('mcp_chat', __name__, url_prefix='/api/mcp')
# 假设已有 mcp_server.py 中的相关功能
# from mcp_server import process_message, get_available_tools, execute_code
def check_chat_permission():
"""检查用户是否有权限访问 AI 聊天功能"""
if not session.get('user_id'):
return False, '请先登录'
# 这里可以添加订阅检查逻辑
# user = User.query.get(session['user_id'])
# if user.subscription_tier not in ['premium', 'pro', 'enterprise']:
# return False, '需要订阅才能使用 AI 助手'
return True, None
@mcp_chat_bp.route('/chat', methods=['POST'])
def chat():
"""处理聊天消息"""
# 权限检查
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
data = request.json
message = data.get('message', '')
session_id = data.get('sessionId', '')
model = data.get('model', 'claude-3-opus')
if not message:
return jsonify({'error': '消息不能为空'}), 400
try:
# 调用 MCP 处理消息
# response = process_message(
# message=message,
# user_id=session['user_id'],
# session_id=session_id,
# model=model
# )
# 临时模拟响应
response = {
'reply': f'收到您的消息: "{message}"。这是一个模拟响应,实际部署时会调用 MCP 模型。',
'sessionId': session_id,
'timestamp': time.time()
}
return jsonify(response)
except Exception as e:
print(f"Chat error: {e}")
return jsonify({'error': '处理消息时出错'}), 500
@mcp_chat_bp.route('/chat/stream', methods=['POST'])
def chat_stream():
"""流式处理聊天消息"""
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
data = request.json
message = data.get('message', '')
session_id = data.get('sessionId', '')
def generate() -> Generator[str, None, None]:
"""生成 SSE 流"""
try:
# 这里应该调用实际的 MCP 流式 API
# for chunk in mcp_stream_message(message, session_id):
# yield f"data: {json.dumps(chunk)}\n\n"
# 临时模拟流式响应
words = f'这是对 "{message}" 的流式响应。每个字会逐个发送。'.split()
for word in words:
chunk = {'content': word + ' ', 'type': 'text'}
yield f"data: {json.dumps(chunk)}\n\n"
time.sleep(0.1) # 模拟延迟
yield "data: [DONE]\n\n"
except Exception as e:
error_chunk = {'error': str(e), 'type': 'error'}
yield f"data: {json.dumps(error_chunk)}\n\n"
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no', # Nginx 不缓冲
}
)
@mcp_chat_bp.route('/tools', methods=['GET'])
def get_tools():
"""获取可用的 MCP 工具列表"""
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
# 这里应该返回实际的工具列表
tools = [
{
'id': 'code_executor',
'name': '代码执行器',
'description': '执行 Python、JavaScript 等代码',
'icon': 'code'
},
{
'id': 'web_search',
'name': '网页搜索',
'description': '搜索网络信息',
'icon': 'search'
},
{
'id': 'calculator',
'name': '计算器',
'description': '执行数学计算',
'icon': 'calculator'
}
]
return jsonify(tools)
@mcp_chat_bp.route('/execute', methods=['POST'])
def execute_code():
"""执行代码"""
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
data = request.json
code = data.get('code', '')
language = data.get('language', 'python')
if not code:
return jsonify({'error': '代码不能为空'}), 400
try:
# 实际执行代码
# result = execute_code_safely(code, language)
# 临时模拟
result = {
'output': f'执行 {language} 代码成功\n输出: Hello World',
'error': None,
'executionTime': 0.023
}
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@mcp_chat_bp.route('/generate-code', methods=['POST'])
def generate_code():
"""生成代码"""
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
data = request.json
prompt = data.get('prompt', '')
language = data.get('language', 'python')
if not prompt:
return jsonify({'error': '提示不能为空'}), 400
try:
# 调用 MCP 生成代码
# code = mcp_generate_code(prompt, language)
# 临时模拟
code = f'''# {prompt}
def hello_world():
print("Hello, World!")
if __name__ == "__main__":
hello_world()'''
return jsonify({
'code': code,
'language': language,
'prompt': prompt
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@mcp_chat_bp.route('/history', methods=['GET'])
def get_history():
"""获取聊天历史"""
has_permission, error_msg = check_chat_permission()
if not has_permission:
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
session_id = request.args.get('sessionId')
# 这里应该从数据库获取历史
# history = ChatHistory.query.filter_by(
# user_id=session['user_id'],
# session_id=session_id
# ).all()
# 临时模拟
history = [
{
'role': 'user',
'content': '你好',
'timestamp': '2024-01-01T10:00:00Z'
},
{
'role': 'assistant',
'content': '你好!有什么可以帮助您的吗?',
'timestamp': '2024-01-01T10:00:01Z'
}
]
return jsonify(history)
# 在 app.py 中注册蓝图
# app.register_blueprint(mcp_chat_bp)