# -*- coding: utf-8 -*- """ 股票社区 API - Discord 风格 包含:频道、消息、帖子、回复、表情反应等接口 """ import uuid from datetime import datetime from functools import wraps from flask import Blueprint, request, jsonify, session, g, current_app from elasticsearch import Elasticsearch from sqlalchemy import text # ============================================================ # Blueprint 定义 # ============================================================ community_bp = Blueprint('community', __name__, url_prefix='/api/community') # ES 客户端 es_client = Elasticsearch( hosts=["http://222.128.1.157:19200"], request_timeout=30, max_retries=3, retry_on_timeout=True ) # 注意:数据库连接从 app.py 的 db 对象获取,不再独立创建 # engine 变量会在 Blueprint 注册后通过 app context 获取 engine = None # 将在第一次请求时初始化 socketio_instance = None # 将在注册时设置 def get_db_engine(): """获取数据库引擎(延迟初始化)""" global engine if engine is None: from flask import current_app # 从 app.py 的 SQLAlchemy 实例获取 engine from app import db engine = db.engine return engine # ============================================================ # 工具函数 # ============================================================ def generate_id(): """生成唯一 ID""" return str(uuid.uuid4()).replace('-', '')[:16] def get_current_user(): """获取当前登录用户""" user_id = session.get('user_id') if not user_id: return None return { 'id': str(user_id), 'username': session.get('username', '匿名用户'), 'avatar': session.get('avatar', ''), } def login_required(f): """登录验证装饰器""" @wraps(f) def decorated_function(*args, **kwargs): user = get_current_user() if not user: return jsonify({'code': 401, 'message': '请先登录'}), 401 g.current_user = user return f(*args, **kwargs) return decorated_function def get_user_admin_info(user_id): """获取用户管理员信息""" try: with get_db_engine().connect() as conn: sql = text(""" SELECT role, permissions FROM community_admins WHERE user_id = :user_id """) print(f"[Community API] 查询管理员信息: user_id={user_id}, type={type(user_id)}") result = conn.execute(sql, {'user_id': int(user_id)}).fetchone() print(f"[Community API] 查询结果: {result}") if result: import json permissions = result.permissions if isinstance(permissions, str): permissions = json.loads(permissions) admin_info = { 'role': result.role, 'permissions': permissions or {}, 'isAdmin': result.role == 'admin', 'isModerator': result.role in ['admin', 'moderator'] } print(f"[Community API] 返回管理员信息: {admin_info}") return admin_info except Exception as e: print(f"[Community API] 获取管理员信息失败: {e}") import traceback traceback.print_exc() return None def check_permission(user_id, permission): """检查用户是否有指定权限""" admin_info = get_user_admin_info(user_id) if not admin_info: return False if admin_info['isAdmin']: return True return admin_info['permissions'].get(permission, False) def admin_required(permission=None): """管理员权限验证装饰器""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): user = get_current_user() if not user: return jsonify({'code': 401, 'message': '请先登录'}), 401 g.current_user = user # 检查权限 if permission: if not check_permission(user['id'], permission): return jsonify({'code': 403, 'message': '无权限执行此操作'}), 403 else: admin_info = get_user_admin_info(user['id']) if not admin_info: return jsonify({'code': 403, 'message': '需要管理员权限'}), 403 g.admin_info = get_user_admin_info(user['id']) return f(*args, **kwargs) return decorated_function return decorator def get_channel_admin_info(channel_id, user_id): """获取用户在指定频道的管理员信息""" # 先检查是否是超级管理员 super_admin = get_user_admin_info(user_id) if super_admin and super_admin['isAdmin']: return { 'role': 'super_admin', 'isSuperAdmin': True, 'isOwner': True, 'isAdmin': True, 'isModerator': True, 'permissions': { 'delete_channel': True, 'edit_channel': True, 'manage_admins': True, 'pin_messages': True, 'delete_messages': True, 'slow_mode': True, 'lock_channel': True } } # 检查频道管理员 try: with get_db_engine().connect() as conn: sql = text(""" SELECT role, permissions FROM community_channel_admins WHERE channel_id = :channel_id AND user_id = :user_id """) result = conn.execute(sql, { 'channel_id': channel_id, 'user_id': int(user_id) }).fetchone() if result: import json permissions = result.permissions if isinstance(permissions, str): permissions = json.loads(permissions) role = result.role return { 'role': role, 'isSuperAdmin': False, 'isOwner': role == 'owner', 'isAdmin': role in ['owner', 'admin'], 'isModerator': role in ['owner', 'admin', 'moderator'], 'permissions': permissions or {} } except Exception as e: print(f"[Community API] 获取频道管理员信息失败: {e}") return None def check_channel_permission(channel_id, user_id, permission): """检查用户是否有指定频道的权限""" admin_info = get_channel_admin_info(channel_id, user_id) if not admin_info: return False if admin_info['isSuperAdmin'] or admin_info['isOwner']: return True return admin_info['permissions'].get(permission, False) def add_channel_admin(channel_id, user_id, role='owner'): """添加频道管理员""" try: with get_db_engine().connect() as conn: # 根据角色设置默认权限 permissions = {} if role == 'owner': permissions = { 'delete_channel': True, 'edit_channel': True, 'manage_admins': True, 'pin_messages': True, 'delete_messages': True, 'slow_mode': True, 'lock_channel': True } elif role == 'admin': permissions = { 'edit_channel': True, 'pin_messages': True, 'delete_messages': True, 'slow_mode': True } elif role == 'moderator': permissions = { 'pin_messages': True, 'delete_messages': True } import json sql = text(""" INSERT INTO community_channel_admins (channel_id, user_id, role, permissions) VALUES (:channel_id, :user_id, :role, :permissions) ON DUPLICATE KEY UPDATE role = :role, permissions = :permissions """) conn.execute(sql, { 'channel_id': channel_id, 'user_id': int(user_id), 'role': role, 'permissions': json.dumps(permissions) }) conn.commit() return True except Exception as e: print(f"[Community API] 添加频道管理员失败: {e}") return False def api_response(data=None, message='success', code=200): """统一 API 响应格式""" return jsonify({ 'code': code, 'message': message, 'data': data }) def api_error(message, code=400): """API 错误响应""" return jsonify({ 'code': code, 'message': message }), code if code >= 400 else 400 # ============================================================ # 在线成员 API # ============================================================ @community_bp.route('/members/online', methods=['GET']) def get_online_members_api(): """获取当前在线成员列表""" members = get_online_members() return api_response(members) # ============================================================ # 用户管理员状态 API # ============================================================ @community_bp.route('/me/admin-status', methods=['GET']) @login_required def get_my_admin_status(): """获取当前用户的管理员状态""" user = g.current_user print(f"[Community API] /me/admin-status 请求, user={user}") admin_info = get_user_admin_info(user['id']) print(f"[Community API] /me/admin-status admin_info={admin_info}") if admin_info: return api_response({ 'isAdmin': admin_info['isAdmin'], 'isModerator': admin_info['isModerator'], 'role': admin_info['role'], 'permissions': admin_info['permissions'] }) else: return api_response({ 'isAdmin': False, 'isModerator': False, 'role': None, 'permissions': {} }) @community_bp.route('/channels//admin-status', methods=['GET']) @login_required def get_channel_admin_status(channel_id): """获取当前用户在指定频道的管理员状态""" user = g.current_user admin_info = get_channel_admin_info(channel_id, user['id']) if admin_info: return api_response({ 'role': admin_info['role'], 'isSuperAdmin': admin_info['isSuperAdmin'], 'isOwner': admin_info['isOwner'], 'isAdmin': admin_info['isAdmin'], 'isModerator': admin_info['isModerator'], 'permissions': admin_info['permissions'] }) else: return api_response({ 'role': None, 'isSuperAdmin': False, 'isOwner': False, 'isAdmin': False, 'isModerator': False, 'permissions': {} }) @community_bp.route('/channels//admins', methods=['GET']) @login_required def get_channel_admins(channel_id): """获取频道管理员列表(包含超级管理员)""" try: import json admins = [] with get_db_engine().connect() as conn: # 1. 先获取全局超级管理员 # 注意:users 表可能没有 avatar 字段,使用 NULL 作为默认值 super_admin_sql = text(""" SELECT ca.user_id, u.username FROM community_admins ca LEFT JOIN user u ON ca.user_id = u.id WHERE ca.role = 'admin' """) super_admins = conn.execute(super_admin_sql).fetchall() for row in super_admins: admins.append({ 'userId': str(row.user_id), 'username': row.username or f'用户{row.user_id}', 'avatar': None, 'role': 'super_admin', # 特殊标记 'isSuperAdmin': True, 'permissions': { 'delete_channel': True, 'edit_channel': True, 'manage_admins': True, 'pin_messages': True, 'delete_messages': True, 'slow_mode': True, 'lock_channel': True }, 'createdAt': None }) # 2. 获取频道管理员(排除已在超级管理员列表中的用户) super_admin_ids = [str(row.user_id) for row in super_admins] # 检查 community_channel_admins 表是否存在 try: sql = text(""" SELECT cca.user_id, cca.role, cca.permissions, cca.created_at, u.username FROM community_channel_admins cca LEFT JOIN user u ON cca.user_id = u.id WHERE cca.channel_id = :channel_id ORDER BY CASE cca.role WHEN 'owner' THEN 1 WHEN 'admin' THEN 2 WHEN 'moderator' THEN 3 END """) result = conn.execute(sql, {'channel_id': channel_id}).fetchall() for row in result: # 跳过已在超级管理员列表中的用户 if str(row.user_id) in super_admin_ids: continue permissions = row.permissions if isinstance(permissions, str): permissions = json.loads(permissions) admins.append({ 'userId': str(row.user_id), 'username': row.username or f'用户{row.user_id}', 'avatar': None, 'role': row.role, 'isSuperAdmin': False, 'permissions': permissions or {}, 'createdAt': row.created_at.isoformat() if row.created_at else None }) except Exception as table_err: # 表不存在时忽略,只返回超级管理员 print(f"[Community API] 频道管理员表查询失败(可能表不存在): {table_err}") return api_response(admins) except Exception as e: print(f"[Community API] 获取频道管理员列表失败: {e}") import traceback traceback.print_exc() return api_error(f'获取管理员列表失败: {str(e)}', 500) @community_bp.route('/channels//admins', methods=['POST']) @login_required def add_channel_admin_api(channel_id): """添加频道管理员(需要 owner 或超级管理员权限)""" user = g.current_user # 检查权限 if not check_channel_permission(channel_id, user['id'], 'manage_admins'): return api_error('无权限管理频道管理员', 403) data = request.get_json() target_user_id = data.get('userId') role = data.get('role', 'moderator') if not target_user_id: return api_error('请指定用户ID') if role not in ['admin', 'moderator']: return api_error('无效的角色,只能设置 admin 或 moderator') # 不能设置为 owner(owner 只能是创建者) if add_channel_admin(channel_id, target_user_id, role): return api_response({'success': True, 'message': f'已添加 {role}'}) else: return api_error('添加管理员失败', 500) @community_bp.route('/channels//admins/', methods=['DELETE']) @login_required def remove_channel_admin_api(channel_id, user_id): """移除频道管理员(需要 owner 或超级管理员权限)""" current_user = g.current_user # 检查权限 if not check_channel_permission(channel_id, current_user['id'], 'manage_admins'): return api_error('无权限管理频道管理员', 403) # 不能移除 owner target_admin = get_channel_admin_info(channel_id, user_id) if target_admin and target_admin['isOwner'] and not target_admin['isSuperAdmin']: return api_error('不能移除频道创建者', 400) try: with get_db_engine().connect() as conn: sql = text(""" DELETE FROM community_channel_admins WHERE channel_id = :channel_id AND user_id = :user_id """) conn.execute(sql, { 'channel_id': channel_id, 'user_id': int(user_id) }) conn.commit() return api_response({'success': True, 'message': '已移除管理员'}) except Exception as e: print(f"[Community API] 移除频道管理员失败: {e}") return api_error(f'移除管理员失败: {str(e)}', 500) # ============================================================ # 频道相关 API # ============================================================ @community_bp.route('/channels', methods=['GET']) def get_channels(): """ 获取频道列表(按分类组织) 返回格式:[{ id, name, icon, channels: [...] }, ...] """ try: with get_db_engine().connect() as conn: # 查询分类 categories_sql = text(""" SELECT id, name, icon, position, is_collapsible, is_system FROM community_categories ORDER BY position """) categories_result = conn.execute(categories_sql).fetchall() # 查询频道 channels_sql = text(""" SELECT c.id, c.category_id, c.name, c.type, c.topic, c.position, c.concept_code, c.slow_mode, c.is_readonly, c.is_system, c.subscriber_count, c.message_count, c.last_message_at, cc.concept_name, cc.stock_count, cc.is_hot FROM community_channels c LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code WHERE c.is_visible = 1 ORDER BY c.position """) channels_result = conn.execute(channels_sql).fetchall() # 组装数据 channels_by_category = {} for ch in channels_result: cat_id = ch.category_id if cat_id not in channels_by_category: channels_by_category[cat_id] = [] channels_by_category[cat_id].append({ 'id': ch.id, 'categoryId': ch.category_id, 'name': ch.name, 'type': ch.type, 'topic': ch.topic, 'position': ch.position, 'conceptCode': ch.concept_code, 'slowMode': ch.slow_mode or 0, 'isReadonly': bool(ch.is_readonly), 'isSystem': bool(ch.is_system), 'subscriberCount': ch.subscriber_count or 0, 'messageCount': ch.message_count or 0, 'lastMessageAt': ch.last_message_at.isoformat() if ch.last_message_at else None, 'conceptName': ch.concept_name, 'stockCount': ch.stock_count, 'isHot': bool(ch.is_hot) if ch.is_hot is not None else False, }) result = [] for cat in categories_result: result.append({ 'id': cat.id, 'name': cat.name, 'icon': cat.icon or '', 'position': cat.position, 'isCollapsible': bool(cat.is_collapsible), 'isSystem': bool(cat.is_system), 'channels': channels_by_category.get(cat.id, []) }) return api_response(result) except Exception as e: print(f"[Community API] 获取频道列表失败: {e}") return api_error(f'获取频道列表失败: {str(e)}', 500) @community_bp.route('/channels/', methods=['GET']) def get_channel(channel_id): """获取单个频道详情""" try: with get_db_engine().connect() as conn: sql = text(""" SELECT c.*, cc.concept_name, cc.stock_count, cc.is_hot FROM community_channels c LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code WHERE c.id = :channel_id """) result = conn.execute(sql, {'channel_id': channel_id}).fetchone() if not result: return api_error('频道不存在', 404) return api_response({ 'id': result.id, 'categoryId': result.category_id, 'name': result.name, 'type': result.type, 'topic': result.topic, 'conceptCode': result.concept_code, 'slowMode': result.slow_mode or 0, 'isReadonly': bool(result.is_readonly), 'subscriberCount': result.subscriber_count or 0, 'messageCount': result.message_count or 0, 'conceptName': result.concept_name, 'stockCount': result.stock_count, 'isHot': bool(result.is_hot) if result.is_hot is not None else False, }) except Exception as e: return api_error(f'获取频道失败: {str(e)}', 500) @community_bp.route('/channels', methods=['POST']) @login_required def create_channel(): """ 创建新频道 请求体: { name, type, topic?, categoryId } """ try: user = g.current_user data = request.get_json() name = data.get('name', '').strip() channel_type = data.get('type', 'text') topic = data.get('topic', '').strip() category_id = data.get('categoryId') if not name: return api_error('频道名称不能为空') if len(name) > 50: return api_error('频道名称不能超过50个字符') if channel_type not in ['text', 'forum', 'voice', 'announcement', 'prediction']: return api_error('无效的频道类型') channel_id = f"ch_{generate_id()}" now = datetime.utcnow() with get_db_engine().connect() as conn: # 如果没有指定分类,使用默认分类(自由讨论) if not category_id: default_cat_sql = text(""" SELECT id FROM community_categories WHERE name = '自由讨论' OR name LIKE '%讨论%' ORDER BY position LIMIT 1 """) default_cat = conn.execute(default_cat_sql).fetchone() if default_cat: category_id = default_cat.id else: # 如果没有找到,使用第一个分类 first_cat_sql = text("SELECT id FROM community_categories ORDER BY position LIMIT 1") first_cat = conn.execute(first_cat_sql).fetchone() if first_cat: category_id = first_cat.id else: return api_error('没有可用的分类,请先创建分类') # 获取当前分类下最大的 position max_pos_sql = text(""" SELECT COALESCE(MAX(position), 0) as max_pos FROM community_channels WHERE category_id = :category_id """) max_pos_result = conn.execute(max_pos_sql, {'category_id': category_id}).fetchone() next_position = (max_pos_result.max_pos or 0) + 1 # 插入新频道 insert_sql = text(""" INSERT INTO community_channels (id, category_id, name, type, topic, position, slow_mode, is_readonly, is_visible, is_system, subscriber_count, message_count, created_at, updated_at) VALUES (:id, :category_id, :name, :type, :topic, :position, 0, 0, 1, 0, 0, 0, :now, :now) """) conn.execute(insert_sql, { 'id': channel_id, 'category_id': category_id, 'name': name, 'type': channel_type, 'topic': topic, 'position': next_position, 'now': now, }) conn.commit() # 将创建者设为频道管理员(owner) add_channel_admin(channel_id, user['id'], role='owner') # 返回创建的频道信息 response_data = { 'id': channel_id, 'categoryId': category_id, 'name': name, 'type': channel_type, 'topic': topic, 'position': next_position, 'slowMode': 0, 'isReadonly': False, 'isSystem': False, 'subscriberCount': 0, 'messageCount': 0, 'createdAt': now.isoformat(), 'isOwner': True, # 创建者自动是 owner } print(f"[Community API] 创建频道成功: {channel_id} - {name}, 创建者: {user['id']}") return api_response(response_data) except Exception as e: print(f"[Community API] 创建频道失败: {e}") import traceback traceback.print_exc() return api_error(f'创建频道失败: {str(e)}', 500) @community_bp.route('/channels//subscribe', methods=['POST']) @login_required def subscribe_channel(channel_id): """订阅频道""" try: user = g.current_user subscription_id = generate_id() with get_db_engine().connect() as conn: # 检查是否已订阅 check_sql = text(""" SELECT id FROM community_subscriptions WHERE user_id = :user_id AND channel_id = :channel_id """) existing = conn.execute(check_sql, { 'user_id': user['id'], 'channel_id': channel_id }).fetchone() if existing: return api_response(message='已订阅') # 创建订阅 insert_sql = text(""" INSERT INTO community_subscriptions (id, user_id, channel_id, notification_level) VALUES (:id, :user_id, :channel_id, 'all') """) conn.execute(insert_sql, { 'id': subscription_id, 'user_id': user['id'], 'channel_id': channel_id }) # 更新订阅数 update_sql = text(""" UPDATE community_channels SET subscriber_count = subscriber_count + 1 WHERE id = :channel_id """) conn.execute(update_sql, {'channel_id': channel_id}) conn.commit() return api_response(message='订阅成功') except Exception as e: return api_error(f'订阅失败: {str(e)}', 500) @community_bp.route('/channels//unsubscribe', methods=['POST']) @login_required def unsubscribe_channel(channel_id): """取消订阅频道""" try: user = g.current_user with get_db_engine().connect() as conn: # 删除订阅 delete_sql = text(""" DELETE FROM community_subscriptions WHERE user_id = :user_id AND channel_id = :channel_id """) result = conn.execute(delete_sql, { 'user_id': user['id'], 'channel_id': channel_id }) if result.rowcount > 0: # 更新订阅数 update_sql = text(""" UPDATE community_channels SET subscriber_count = GREATEST(subscriber_count - 1, 0) WHERE id = :channel_id """) conn.execute(update_sql, {'channel_id': channel_id}) conn.commit() return api_response(message='取消订阅成功') except Exception as e: return api_error(f'取消订阅失败: {str(e)}', 500) # ============================================================ # 消息相关 API(即时聊天) # ============================================================ @community_bp.route('/channels//messages', methods=['POST']) @login_required def send_message(channel_id): """发送消息""" try: user = g.current_user data = request.get_json() # 检查频道类型和权限 with get_db_engine().connect() as conn: channel_sql = text(""" SELECT type, is_readonly FROM community_channels WHERE id = :channel_id """) channel = conn.execute(channel_sql, {'channel_id': channel_id}).fetchone() if not channel: return api_error('频道不存在', 404) # 公告频道只有管理员可以发消息 if channel.type == 'announcement': admin_info = get_channel_admin_info(channel_id, user['id']) if not admin_info or not admin_info.get('isAdmin'): return api_error('只有管理员可以在公告频道发送消息', 403) # 只读频道不能发消息 if channel.is_readonly: admin_info = get_channel_admin_info(channel_id, user['id']) if not admin_info or not admin_info.get('isAdmin'): return api_error('该频道为只读频道', 403) content = data.get('content', '').strip() if not content: return api_error('消息内容不能为空') message_id = generate_id() now = datetime.utcnow() # 将 Markdown 图片语法转换为 HTML(支持 base64 图片) content_html = parse_markdown_images(content) # 构建消息文档 message_doc = { 'id': message_id, 'channel_id': channel_id, 'thread_id': data.get('threadId'), 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'content': content, 'content_html': content_html, # Markdown 转换后的 HTML 'type': 'text', 'mentioned_users': data.get('mentionedUsers', []), 'mentioned_stocks': data.get('mentionedStocks', []), 'mentioned_everyone': data.get('mentionedEveryone', False), 'reply_to': data.get('replyTo'), 'reactions': {}, 'reaction_count': 0, 'is_pinned': False, 'is_edited': False, 'is_deleted': False, 'created_at': now.isoformat(), } # 写入 ES es_client.index( index='community_messages', id=message_id, document=message_doc, refresh=True ) # 更新频道最后消息时间和消息数 with get_db_engine().connect() as conn: update_sql = text(""" UPDATE community_channels SET message_count = message_count + 1, last_message_id = :message_id, last_message_at = :now WHERE id = :channel_id """) conn.execute(update_sql, { 'message_id': message_id, 'now': now, 'channel_id': channel_id }) conn.commit() # 转换字段名为 camelCase response_data = { 'id': message_doc['id'], 'channelId': message_doc['channel_id'], 'threadId': message_doc['thread_id'], 'authorId': message_doc['author_id'], 'authorName': message_doc['author_name'], 'authorAvatar': message_doc['author_avatar'], 'content': message_doc['content'], 'type': message_doc['type'], 'mentionedUsers': message_doc['mentioned_users'], 'mentionedStocks': message_doc['mentioned_stocks'], 'replyTo': message_doc['reply_to'], 'reactions': message_doc['reactions'], 'reactionCount': message_doc['reaction_count'], 'isPinned': message_doc['is_pinned'], 'isEdited': message_doc['is_edited'], 'createdAt': message_doc['created_at'], } # 通过 WebSocket 广播新消息给频道内的其他用户 if socketio_instance: socketio_instance.emit( 'MESSAGE_CREATE', response_data, room=channel_id, namespace='/community' ) return api_response(response_data) except Exception as e: print(f"[Community API] 发送消息失败: {e}") return api_error(f'发送消息失败: {str(e)}', 500) @community_bp.route('/messages//reactions', methods=['POST']) @login_required def add_reaction(message_id): """添加表情反应""" try: user = g.current_user data = request.get_json() emoji = data.get('emoji') if not emoji: return api_error('请选择表情') # 获取当前消息 result = es_client.get(index='community_messages', id=message_id) message = result['_source'] # 更新 reactions reactions = message.get('reactions', {}) if emoji not in reactions: reactions[emoji] = [] if user['id'] not in reactions[emoji]: reactions[emoji].append(user['id']) # 计算总数 reaction_count = sum(len(users) for users in reactions.values()) # 更新 ES es_client.update( index='community_messages', id=message_id, doc={'reactions': reactions, 'reaction_count': reaction_count}, refresh=True ) return api_response(message='添加成功') except Exception as e: return api_error(f'添加表情失败: {str(e)}', 500) @community_bp.route('/messages//reactions/', methods=['DELETE']) @login_required def remove_reaction(message_id, emoji): """移除表情反应""" try: user = g.current_user # 获取当前消息 result = es_client.get(index='community_messages', id=message_id) message = result['_source'] # 更新 reactions reactions = message.get('reactions', {}) if emoji in reactions and user['id'] in reactions[emoji]: reactions[emoji].remove(user['id']) if not reactions[emoji]: del reactions[emoji] # 计算总数 reaction_count = sum(len(users) for users in reactions.values()) # 更新 ES es_client.update( index='community_messages', id=message_id, doc={'reactions': reactions, 'reaction_count': reaction_count}, refresh=True ) return api_response(message='移除成功') except Exception as e: return api_error(f'移除表情失败: {str(e)}', 500) # ============================================================ # Forum 帖子相关 API # ============================================================ def parse_markdown_images(content): """ 将 Markdown 图片语法转换为 HTML img 标签 支持 ![alt](url) 格式,包括 base64 data URL """ import re # 匹配 ![alt](url) 格式 pattern = r'!\[([^\]]*)\]\(([^)]+)\)' def replace_image(match): alt = match.group(1) or '图片' url = match.group(2) return f'{alt}' html = re.sub(pattern, replace_image, content) # 将换行转换为
html = html.replace('\n', '
') return html @community_bp.route('/channels//posts', methods=['POST']) @login_required def create_post(channel_id): """创建帖子""" try: user = g.current_user data = request.get_json() title = data.get('title', '').strip() content = data.get('content', '').strip() if not title: return api_error('标题不能为空') if not content: return api_error('内容不能为空') post_id = generate_id() now = datetime.utcnow() # 将 Markdown 图片语法转换为 HTML content_html = parse_markdown_images(content) # 构建帖子文档 post_doc = { 'id': post_id, 'channel_id': channel_id, 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'title': title, 'content': content, 'content_html': content_html, # Markdown 转换后的 HTML 'tags': data.get('tags', []), 'stock_symbols': data.get('stockSymbols', []), 'is_pinned': False, 'is_locked': False, 'is_deleted': False, 'reply_count': 0, 'view_count': 0, 'like_count': 0, 'last_reply_at': None, 'last_reply_by': None, 'created_at': now.isoformat(), 'updated_at': now.isoformat(), } # 写入 ES es_client.index( index='community_forum_posts', id=post_id, document=post_doc, refresh=True ) # 更新频道消息数 with get_db_engine().connect() as conn: update_sql = text(""" UPDATE community_channels SET message_count = message_count + 1, last_message_at = :now WHERE id = :channel_id """) conn.execute(update_sql, {'now': now, 'channel_id': channel_id}) conn.commit() # 转换字段名 response_data = { 'id': post_doc['id'], 'channelId': post_doc['channel_id'], 'authorId': post_doc['author_id'], 'authorName': post_doc['author_name'], 'authorAvatar': post_doc['author_avatar'], 'title': post_doc['title'], 'content': post_doc['content'], 'tags': post_doc['tags'], 'stockSymbols': post_doc['stock_symbols'], 'isPinned': post_doc['is_pinned'], 'isLocked': post_doc['is_locked'], 'replyCount': post_doc['reply_count'], 'viewCount': post_doc['view_count'], 'likeCount': post_doc['like_count'], 'createdAt': post_doc['created_at'], } return api_response(response_data) except Exception as e: print(f"[Community API] 创建帖子失败: {e}") return api_error(f'创建帖子失败: {str(e)}', 500) @community_bp.route('/posts//like', methods=['POST']) @login_required def like_post(post_id): """点赞帖子""" try: # 更新 ES(简单实现,生产环境应该用单独的点赞表防止重复) es_client.update( index='community_forum_posts', id=post_id, script={ 'source': 'ctx._source.like_count += 1', 'lang': 'painless' }, refresh=True ) return api_response(message='点赞成功') except Exception as e: return api_error(f'点赞失败: {str(e)}', 500) @community_bp.route('/posts//view', methods=['POST']) def increment_view(post_id): """增加帖子浏览量""" try: es_client.update( index='community_forum_posts', id=post_id, script={ 'source': 'ctx._source.view_count += 1', 'lang': 'painless' } ) return api_response(message='success') except Exception as e: # 浏览量统计失败不影响主流程 return api_response(message='success') @community_bp.route('/posts//replies', methods=['POST']) @login_required def create_reply(post_id): """创建帖子回复""" try: user = g.current_user data = request.get_json() content = data.get('content', '').strip() if not content: return api_error('回复内容不能为空') reply_id = generate_id() now = datetime.utcnow() # 获取帖子信息 post_result = es_client.get(index='community_forum_posts', id=post_id) post = post_result['_source'] # 将 Markdown 图片语法转换为 HTML content_html = parse_markdown_images(content) # 构建回复文档 reply_doc = { 'id': reply_id, 'post_id': post_id, 'channel_id': post['channel_id'], 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'content': content, 'content_html': content_html, # Markdown 转换后的 HTML 'reply_to': data.get('replyTo'), 'reactions': {}, 'like_count': 0, 'is_solution': False, 'is_deleted': False, 'created_at': now.isoformat(), } # 写入 ES es_client.index( index='community_forum_replies', id=reply_id, document=reply_doc, refresh=True ) # 更新帖子的回复数和最后回复时间 es_client.update( index='community_forum_posts', id=post_id, script={ 'source': ''' ctx._source.reply_count += 1; ctx._source.last_reply_at = params.now; ctx._source.last_reply_by = params.author_name; ''', 'lang': 'painless', 'params': { 'now': now.isoformat(), 'author_name': user['username'] } }, refresh=True ) # 转换字段名 response_data = { 'id': reply_doc['id'], 'postId': reply_doc['post_id'], 'channelId': reply_doc['channel_id'], 'authorId': reply_doc['author_id'], 'authorName': reply_doc['author_name'], 'authorAvatar': reply_doc['author_avatar'], 'content': reply_doc['content'], 'replyTo': reply_doc['reply_to'], 'likeCount': reply_doc['like_count'], 'createdAt': reply_doc['created_at'], } return api_response(response_data) except Exception as e: print(f"[Community API] 创建回复失败: {e}") return api_error(f'创建回复失败: {str(e)}', 500) # ============================================================ # 文件上传接口 # ============================================================ import os from werkzeug.utils import secure_filename # 允许的图片扩展名 ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} # 上传目录(相对于 Flask 应用根目录) UPLOAD_FOLDER = 'public/uploads/community' # 最大文件大小 10MB MAX_FILE_SIZE = 10 * 1024 * 1024 def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @community_bp.route('/upload/image', methods=['POST']) @login_required def upload_image(): """ 上传图片 返回 base64 编码的图片数据,可直接嵌入帖子内容中存储到 ES """ import base64 try: if 'file' not in request.files: return api_error('没有选择文件', 400) file = request.files['file'] if file.filename == '': return api_error('没有选择文件', 400) if not allowed_file(file.filename): return api_error('不支持的文件格式,仅支持 PNG、JPG、GIF、WebP', 400) # 读取文件内容 file_content = file.read() size = len(file_content) if size > MAX_FILE_SIZE: return api_error('文件大小超过限制(最大 10MB)', 400) # 获取文件扩展名和 MIME 类型 ext = file.filename.rsplit('.', 1)[1].lower() mime_types = { 'png': 'image/png', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'gif': 'image/gif', 'webp': 'image/webp' } mime_type = mime_types.get(ext, 'image/jpeg') # 转换为 base64 base64_data = base64.b64encode(file_content).decode('utf-8') data_url = f"data:{mime_type};base64,{base64_data}" # 生成唯一标识 filename = f"{generate_id()}.{ext}" return api_response({ 'url': data_url, # base64 data URL,可直接用于 img src 'filename': filename, 'size': size, 'type': mime_type }) except Exception as e: print(f"[Community API] 上传图片失败: {e}") import traceback traceback.print_exc() return api_error(f'上传失败: {str(e)}', 500) # ============================================================ # ES 代理接口(前端直接查询 ES) # ============================================================ @community_bp.route('/es//_search', methods=['POST']) def es_search_proxy(index): """ ES 搜索代理 允许的索引:community_messages, community_forum_posts, community_forum_replies """ allowed_indices = [ 'community_messages', 'community_forum_posts', 'community_forum_replies', 'community_notifications' ] if index not in allowed_indices: return api_error('不允许访问该索引', 403) try: body = request.get_json() print(f"[Community API] ES 搜索请求: index={index}, body={body}") # 尝试使用不同的 API 调用方式(兼容不同版本的 elasticsearch-py) try: # 新版本 API result = es_client.search(index=index, body=body) except TypeError: # 旧版本 API,参数可能不同 result = es_client.search(index=index, **body) # 处理结果格式 if hasattr(result, 'body'): result = result.body elif hasattr(result, '__getitem__'): pass # 已经是 dict else: result = dict(result) print(f"[Community API] ES 搜索成功: 返回 {result.get('hits', {}).get('total', {})} 条结果") return jsonify(result) except Exception as e: print(f"[Community API] ES 搜索失败: {e}") import traceback traceback.print_exc() # 返回更详细的错误信息 return jsonify({ 'error': str(e), 'type': type(e).__name__, 'index': index, 'body': body if 'body' in dir() else None }), 500 # ============================================================ # WebSocket 事件处理(需要在 app.py 中注册) # ============================================================ # 在线用户管理 # ============================================================ # 在线用户字典: {user_id: {sid: socket_id, username: str, avatar: str, connected_at: datetime}} online_users = {} # Socket ID 到用户 ID 的映射: {socket_id: user_id} sid_to_user = {} def get_online_members(): """获取当前在线用户列表""" return [ { 'userId': str(user_id), 'username': info['username'], 'avatar': info.get('avatar', ''), 'isOnline': True, 'connectedAt': info['connected_at'].isoformat() if info.get('connected_at') else None, } for user_id, info in online_users.items() ] def register_community_socketio(socketio): """ 注册社区 WebSocket 事件 在 app.py 中调用:register_community_socketio(socketio) """ global socketio_instance socketio_instance = socketio from flask_socketio import join_room, leave_room, emit from flask import request @socketio.on('connect', namespace='/community') def handle_connect(): user_id = session.get('user_id') username = session.get('username', '匿名用户') avatar = session.get('avatar', '') sid = request.sid if user_id: # 记录在线用户 online_users[user_id] = { 'sid': sid, 'username': username, 'avatar': avatar, 'connected_at': datetime.utcnow() } sid_to_user[sid] = user_id # 广播用户上线事件 emit('MEMBER_ONLINE', { 'userId': str(user_id), 'username': username, 'avatar': avatar, }, broadcast=True, include_self=False) print(f'[Community Socket] User {username}({user_id}) connected, online: {len(online_users)}') else: print('[Community Socket] Anonymous client connected') @socketio.on('disconnect', namespace='/community') def handle_disconnect(): sid = request.sid user_id = sid_to_user.pop(sid, None) if user_id and user_id in online_users: username = online_users[user_id].get('username', '匿名') del online_users[user_id] # 广播用户下线事件 emit('MEMBER_OFFLINE', { 'userId': str(user_id), }, broadcast=True) print(f'[Community Socket] User {username}({user_id}) disconnected, online: {len(online_users)}') @socketio.on('SUBSCRIBE_CHANNEL', namespace='/community') def handle_subscribe(data): channel_id = data.get('channelId') if channel_id: join_room(channel_id) print(f'[Community Socket] Joined room: {channel_id}') @socketio.on('UNSUBSCRIBE_CHANNEL', namespace='/community') def handle_unsubscribe(data): channel_id = data.get('channelId') if channel_id: leave_room(channel_id) print(f'[Community Socket] Left room: {channel_id}') @socketio.on('SEND_MESSAGE', namespace='/community') def handle_send_message(data): """通过 WebSocket 发送消息(实时广播)""" channel_id = data.get('channelId') # 广播给频道内所有用户 emit('MESSAGE_CREATE', data, room=channel_id, include_self=False) @socketio.on('START_TYPING', namespace='/community') def handle_start_typing(data): channel_id = data.get('channelId') user_id = session.get('user_id') user_name = session.get('username', '匿名') emit('TYPING_START', { 'channelId': channel_id, 'userId': user_id, 'userName': user_name }, room=channel_id, include_self=False) @socketio.on('STOP_TYPING', namespace='/community') def handle_stop_typing(data): channel_id = data.get('channelId') user_id = session.get('user_id') emit('TYPING_STOP', { 'channelId': channel_id, 'userId': user_id }, room=channel_id, include_self=False) print('✅ Community WebSocket 事件已注册')