233 lines
6.8 KiB
Python
233 lines
6.8 KiB
Python
# mcp_chat_api.py - MCP 聊天 API 端点
|
|
# 添加到 app.py 或作为独立模块导入
|
|
|
|
from flask import Blueprint, request, jsonify, session, Response
|
|
import json
|
|
import time
|
|
from typing import Generator
|
|
|
|
# 创建蓝图
|
|
mcp_chat_bp = Blueprint('mcp_chat', __name__, url_prefix='/api/mcp')
|
|
|
|
# 假设已有 mcp_server.py 中的相关功能
|
|
# from mcp_server import process_message, get_available_tools, execute_code
|
|
|
|
def check_chat_permission():
|
|
"""检查用户是否有权限访问 AI 聊天功能"""
|
|
if not session.get('user_id'):
|
|
return False, '请先登录'
|
|
|
|
# 这里可以添加订阅检查逻辑
|
|
# user = User.query.get(session['user_id'])
|
|
# if user.subscription_tier not in ['premium', 'pro', 'enterprise']:
|
|
# return False, '需要订阅才能使用 AI 助手'
|
|
|
|
return True, None
|
|
|
|
@mcp_chat_bp.route('/chat', methods=['POST'])
|
|
def chat():
|
|
"""处理聊天消息"""
|
|
# 权限检查
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
data = request.json
|
|
message = data.get('message', '')
|
|
session_id = data.get('sessionId', '')
|
|
model = data.get('model', 'claude-3-opus')
|
|
|
|
if not message:
|
|
return jsonify({'error': '消息不能为空'}), 400
|
|
|
|
try:
|
|
# 调用 MCP 处理消息
|
|
# response = process_message(
|
|
# message=message,
|
|
# user_id=session['user_id'],
|
|
# session_id=session_id,
|
|
# model=model
|
|
# )
|
|
|
|
# 临时模拟响应
|
|
response = {
|
|
'reply': f'收到您的消息: "{message}"。这是一个模拟响应,实际部署时会调用 MCP 模型。',
|
|
'sessionId': session_id,
|
|
'timestamp': time.time()
|
|
}
|
|
|
|
return jsonify(response)
|
|
|
|
except Exception as e:
|
|
print(f"Chat error: {e}")
|
|
return jsonify({'error': '处理消息时出错'}), 500
|
|
|
|
@mcp_chat_bp.route('/chat/stream', methods=['POST'])
|
|
def chat_stream():
|
|
"""流式处理聊天消息"""
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
data = request.json
|
|
message = data.get('message', '')
|
|
session_id = data.get('sessionId', '')
|
|
|
|
def generate() -> Generator[str, None, None]:
|
|
"""生成 SSE 流"""
|
|
try:
|
|
# 这里应该调用实际的 MCP 流式 API
|
|
# for chunk in mcp_stream_message(message, session_id):
|
|
# yield f"data: {json.dumps(chunk)}\n\n"
|
|
|
|
# 临时模拟流式响应
|
|
words = f'这是对 "{message}" 的流式响应。每个字会逐个发送。'.split()
|
|
for word in words:
|
|
chunk = {'content': word + ' ', 'type': 'text'}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
time.sleep(0.1) # 模拟延迟
|
|
|
|
yield "data: [DONE]\n\n"
|
|
|
|
except Exception as e:
|
|
error_chunk = {'error': str(e), 'type': 'error'}
|
|
yield f"data: {json.dumps(error_chunk)}\n\n"
|
|
|
|
return Response(
|
|
generate(),
|
|
mimetype='text/event-stream',
|
|
headers={
|
|
'Cache-Control': 'no-cache',
|
|
'X-Accel-Buffering': 'no', # Nginx 不缓冲
|
|
}
|
|
)
|
|
|
|
@mcp_chat_bp.route('/tools', methods=['GET'])
|
|
def get_tools():
|
|
"""获取可用的 MCP 工具列表"""
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
# 这里应该返回实际的工具列表
|
|
tools = [
|
|
{
|
|
'id': 'code_executor',
|
|
'name': '代码执行器',
|
|
'description': '执行 Python、JavaScript 等代码',
|
|
'icon': 'code'
|
|
},
|
|
{
|
|
'id': 'web_search',
|
|
'name': '网页搜索',
|
|
'description': '搜索网络信息',
|
|
'icon': 'search'
|
|
},
|
|
{
|
|
'id': 'calculator',
|
|
'name': '计算器',
|
|
'description': '执行数学计算',
|
|
'icon': 'calculator'
|
|
}
|
|
]
|
|
|
|
return jsonify(tools)
|
|
|
|
@mcp_chat_bp.route('/execute', methods=['POST'])
|
|
def execute_code():
|
|
"""执行代码"""
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
data = request.json
|
|
code = data.get('code', '')
|
|
language = data.get('language', 'python')
|
|
|
|
if not code:
|
|
return jsonify({'error': '代码不能为空'}), 400
|
|
|
|
try:
|
|
# 实际执行代码
|
|
# result = execute_code_safely(code, language)
|
|
|
|
# 临时模拟
|
|
result = {
|
|
'output': f'执行 {language} 代码成功\n输出: Hello World',
|
|
'error': None,
|
|
'executionTime': 0.023
|
|
}
|
|
|
|
return jsonify(result)
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@mcp_chat_bp.route('/generate-code', methods=['POST'])
|
|
def generate_code():
|
|
"""生成代码"""
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
data = request.json
|
|
prompt = data.get('prompt', '')
|
|
language = data.get('language', 'python')
|
|
|
|
if not prompt:
|
|
return jsonify({'error': '提示不能为空'}), 400
|
|
|
|
try:
|
|
# 调用 MCP 生成代码
|
|
# code = mcp_generate_code(prompt, language)
|
|
|
|
# 临时模拟
|
|
code = f'''# {prompt}
|
|
def hello_world():
|
|
print("Hello, World!")
|
|
|
|
if __name__ == "__main__":
|
|
hello_world()'''
|
|
|
|
return jsonify({
|
|
'code': code,
|
|
'language': language,
|
|
'prompt': prompt
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@mcp_chat_bp.route('/history', methods=['GET'])
|
|
def get_history():
|
|
"""获取聊天历史"""
|
|
has_permission, error_msg = check_chat_permission()
|
|
if not has_permission:
|
|
return jsonify({'error': error_msg}), 401 if error_msg == '请先登录' else 403
|
|
|
|
session_id = request.args.get('sessionId')
|
|
|
|
# 这里应该从数据库获取历史
|
|
# history = ChatHistory.query.filter_by(
|
|
# user_id=session['user_id'],
|
|
# session_id=session_id
|
|
# ).all()
|
|
|
|
# 临时模拟
|
|
history = [
|
|
{
|
|
'role': 'user',
|
|
'content': '你好',
|
|
'timestamp': '2024-01-01T10:00:00Z'
|
|
},
|
|
{
|
|
'role': 'assistant',
|
|
'content': '你好!有什么可以帮助您的吗?',
|
|
'timestamp': '2024-01-01T10:00:01Z'
|
|
}
|
|
]
|
|
|
|
return jsonify(history)
|
|
|
|
# 在 app.py 中注册蓝图
|
|
# app.register_blueprint(mcp_chat_bp) |