# -*- coding: utf-8 -*- """ 股票社区 API - Discord 风格 包含:频道、消息、帖子、回复、表情反应等接口 """ import uuid from datetime import datetime from functools import wraps from flask import Blueprint, request, jsonify, session, g, current_app from elasticsearch import Elasticsearch from sqlalchemy import text # ============================================================ # Blueprint 定义 # ============================================================ community_bp = Blueprint('community', __name__, url_prefix='/api/community') # ES 客户端 es_client = Elasticsearch( hosts=["http://222.128.1.157:19200"], request_timeout=30, max_retries=3, retry_on_timeout=True ) # 注意:数据库连接从 app.py 的 db 对象获取,不再独立创建 # engine 变量会在 Blueprint 注册后通过 app context 获取 engine = None # 将在第一次请求时初始化 def get_db_engine(): """获取数据库引擎(延迟初始化)""" global engine if engine is None: from flask import current_app # 从 app.py 的 SQLAlchemy 实例获取 engine from app import db engine = db.engine return engine # ============================================================ # 工具函数 # ============================================================ def generate_id(): """生成唯一 ID""" return str(uuid.uuid4()).replace('-', '')[:16] def get_current_user(): """获取当前登录用户""" user_id = session.get('user_id') if not user_id: return None return { 'id': str(user_id), 'username': session.get('username', '匿名用户'), 'avatar': session.get('avatar', ''), } def login_required(f): """登录验证装饰器""" @wraps(f) def decorated_function(*args, **kwargs): user = get_current_user() if not user: return jsonify({'code': 401, 'message': '请先登录'}), 401 g.current_user = user return f(*args, **kwargs) return decorated_function def api_response(data=None, message='success', code=200): """统一 API 响应格式""" return jsonify({ 'code': code, 'message': message, 'data': data }) def api_error(message, code=400): """API 错误响应""" return jsonify({ 'code': code, 'message': message }), code if code >= 400 else 400 # ============================================================ # 频道相关 API # ============================================================ @community_bp.route('/channels', methods=['GET']) def get_channels(): """ 获取频道列表(按分类组织) 返回格式:[{ id, name, icon, channels: [...] }, ...] """ try: with get_db_engine().connect() as conn: # 查询分类 categories_sql = text(""" SELECT id, name, icon, position, is_collapsible, is_system FROM community_categories ORDER BY position """) categories_result = conn.execute(categories_sql).fetchall() # 查询频道 channels_sql = text(""" SELECT c.id, c.category_id, c.name, c.type, c.topic, c.position, c.concept_code, c.slow_mode, c.is_readonly, c.is_system, c.subscriber_count, c.message_count, c.last_message_at, cc.concept_name, cc.stock_count, cc.is_hot FROM community_channels c LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code WHERE c.is_visible = 1 ORDER BY c.position """) channels_result = conn.execute(channels_sql).fetchall() # 组装数据 channels_by_category = {} for ch in channels_result: cat_id = ch.category_id if cat_id not in channels_by_category: channels_by_category[cat_id] = [] channels_by_category[cat_id].append({ 'id': ch.id, 'categoryId': ch.category_id, 'name': ch.name, 'type': ch.type, 'topic': ch.topic, 'position': ch.position, 'conceptCode': ch.concept_code, 'slowMode': ch.slow_mode or 0, 'isReadonly': bool(ch.is_readonly), 'isSystem': bool(ch.is_system), 'subscriberCount': ch.subscriber_count or 0, 'messageCount': ch.message_count or 0, 'lastMessageAt': ch.last_message_at.isoformat() if ch.last_message_at else None, 'conceptName': ch.concept_name, 'stockCount': ch.stock_count, 'isHot': bool(ch.is_hot) if ch.is_hot is not None else False, }) result = [] for cat in categories_result: result.append({ 'id': cat.id, 'name': cat.name, 'icon': cat.icon or '', 'position': cat.position, 'isCollapsible': bool(cat.is_collapsible), 'isSystem': bool(cat.is_system), 'channels': channels_by_category.get(cat.id, []) }) return api_response(result) except Exception as e: print(f"[Community API] 获取频道列表失败: {e}") return api_error(f'获取频道列表失败: {str(e)}', 500) @community_bp.route('/channels/', methods=['GET']) def get_channel(channel_id): """获取单个频道详情""" try: with get_db_engine().connect() as conn: sql = text(""" SELECT c.*, cc.concept_name, cc.stock_count, cc.is_hot FROM community_channels c LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code WHERE c.id = :channel_id """) result = conn.execute(sql, {'channel_id': channel_id}).fetchone() if not result: return api_error('频道不存在', 404) return api_response({ 'id': result.id, 'categoryId': result.category_id, 'name': result.name, 'type': result.type, 'topic': result.topic, 'conceptCode': result.concept_code, 'slowMode': result.slow_mode or 0, 'isReadonly': bool(result.is_readonly), 'subscriberCount': result.subscriber_count or 0, 'messageCount': result.message_count or 0, 'conceptName': result.concept_name, 'stockCount': result.stock_count, 'isHot': bool(result.is_hot) if result.is_hot is not None else False, }) except Exception as e: return api_error(f'获取频道失败: {str(e)}', 500) @community_bp.route('/channels', methods=['POST']) @login_required def create_channel(): """ 创建新频道 请求体: { name, type, topic?, categoryId } """ try: user = g.current_user data = request.get_json() name = data.get('name', '').strip() channel_type = data.get('type', 'text') topic = data.get('topic', '').strip() category_id = data.get('categoryId') if not name: return api_error('频道名称不能为空') if len(name) > 50: return api_error('频道名称不能超过50个字符') if channel_type not in ['text', 'forum', 'voice', 'announcement', 'prediction']: return api_error('无效的频道类型') channel_id = f"ch_{generate_id()}" now = datetime.utcnow() with get_db_engine().connect() as conn: # 如果没有指定分类,使用默认分类(自由讨论) if not category_id: default_cat_sql = text(""" SELECT id FROM community_categories WHERE name = '自由讨论' OR name LIKE '%讨论%' ORDER BY position LIMIT 1 """) default_cat = conn.execute(default_cat_sql).fetchone() if default_cat: category_id = default_cat.id else: # 如果没有找到,使用第一个分类 first_cat_sql = text("SELECT id FROM community_categories ORDER BY position LIMIT 1") first_cat = conn.execute(first_cat_sql).fetchone() if first_cat: category_id = first_cat.id else: return api_error('没有可用的分类,请先创建分类') # 获取当前分类下最大的 position max_pos_sql = text(""" SELECT COALESCE(MAX(position), 0) as max_pos FROM community_channels WHERE category_id = :category_id """) max_pos_result = conn.execute(max_pos_sql, {'category_id': category_id}).fetchone() next_position = (max_pos_result.max_pos or 0) + 1 # 插入新频道 insert_sql = text(""" INSERT INTO community_channels (id, category_id, name, type, topic, position, slow_mode, is_readonly, is_visible, is_system, subscriber_count, message_count, created_at, updated_at) VALUES (:id, :category_id, :name, :type, :topic, :position, 0, 0, 1, 0, 0, 0, :now, :now) """) conn.execute(insert_sql, { 'id': channel_id, 'category_id': category_id, 'name': name, 'type': channel_type, 'topic': topic, 'position': next_position, 'now': now, }) conn.commit() # 返回创建的频道信息 response_data = { 'id': channel_id, 'categoryId': category_id, 'name': name, 'type': channel_type, 'topic': topic, 'position': next_position, 'slowMode': 0, 'isReadonly': False, 'isSystem': False, 'subscriberCount': 0, 'messageCount': 0, 'createdAt': now.isoformat(), } print(f"[Community API] 创建频道成功: {channel_id} - {name}") return api_response(response_data) except Exception as e: print(f"[Community API] 创建频道失败: {e}") import traceback traceback.print_exc() return api_error(f'创建频道失败: {str(e)}', 500) @community_bp.route('/channels//subscribe', methods=['POST']) @login_required def subscribe_channel(channel_id): """订阅频道""" try: user = g.current_user subscription_id = generate_id() with get_db_engine().connect() as conn: # 检查是否已订阅 check_sql = text(""" SELECT id FROM community_subscriptions WHERE user_id = :user_id AND channel_id = :channel_id """) existing = conn.execute(check_sql, { 'user_id': user['id'], 'channel_id': channel_id }).fetchone() if existing: return api_response(message='已订阅') # 创建订阅 insert_sql = text(""" INSERT INTO community_subscriptions (id, user_id, channel_id, notification_level) VALUES (:id, :user_id, :channel_id, 'all') """) conn.execute(insert_sql, { 'id': subscription_id, 'user_id': user['id'], 'channel_id': channel_id }) # 更新订阅数 update_sql = text(""" UPDATE community_channels SET subscriber_count = subscriber_count + 1 WHERE id = :channel_id """) conn.execute(update_sql, {'channel_id': channel_id}) conn.commit() return api_response(message='订阅成功') except Exception as e: return api_error(f'订阅失败: {str(e)}', 500) @community_bp.route('/channels//unsubscribe', methods=['POST']) @login_required def unsubscribe_channel(channel_id): """取消订阅频道""" try: user = g.current_user with get_db_engine().connect() as conn: # 删除订阅 delete_sql = text(""" DELETE FROM community_subscriptions WHERE user_id = :user_id AND channel_id = :channel_id """) result = conn.execute(delete_sql, { 'user_id': user['id'], 'channel_id': channel_id }) if result.rowcount > 0: # 更新订阅数 update_sql = text(""" UPDATE community_channels SET subscriber_count = GREATEST(subscriber_count - 1, 0) WHERE id = :channel_id """) conn.execute(update_sql, {'channel_id': channel_id}) conn.commit() return api_response(message='取消订阅成功') except Exception as e: return api_error(f'取消订阅失败: {str(e)}', 500) # ============================================================ # 消息相关 API(即时聊天) # ============================================================ @community_bp.route('/channels//messages', methods=['POST']) @login_required def send_message(channel_id): """发送消息""" try: user = g.current_user data = request.get_json() content = data.get('content', '').strip() if not content: return api_error('消息内容不能为空') message_id = generate_id() now = datetime.utcnow() # 构建消息文档 message_doc = { 'id': message_id, 'channel_id': channel_id, 'thread_id': data.get('threadId'), 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'content': content, 'type': 'text', 'mentioned_users': data.get('mentionedUsers', []), 'mentioned_stocks': data.get('mentionedStocks', []), 'mentioned_everyone': data.get('mentionedEveryone', False), 'reply_to': data.get('replyTo'), 'reactions': {}, 'reaction_count': 0, 'is_pinned': False, 'is_edited': False, 'is_deleted': False, 'created_at': now.isoformat(), } # 写入 ES es_client.index( index='community_messages', id=message_id, document=message_doc, refresh=True ) # 更新频道最后消息时间和消息数 with get_db_engine().connect() as conn: update_sql = text(""" UPDATE community_channels SET message_count = message_count + 1, last_message_id = :message_id, last_message_at = :now WHERE id = :channel_id """) conn.execute(update_sql, { 'message_id': message_id, 'now': now, 'channel_id': channel_id }) conn.commit() # 转换字段名为 camelCase response_data = { 'id': message_doc['id'], 'channelId': message_doc['channel_id'], 'threadId': message_doc['thread_id'], 'authorId': message_doc['author_id'], 'authorName': message_doc['author_name'], 'authorAvatar': message_doc['author_avatar'], 'content': message_doc['content'], 'type': message_doc['type'], 'mentionedUsers': message_doc['mentioned_users'], 'mentionedStocks': message_doc['mentioned_stocks'], 'replyTo': message_doc['reply_to'], 'reactions': message_doc['reactions'], 'reactionCount': message_doc['reaction_count'], 'isPinned': message_doc['is_pinned'], 'isEdited': message_doc['is_edited'], 'createdAt': message_doc['created_at'], } return api_response(response_data) except Exception as e: print(f"[Community API] 发送消息失败: {e}") return api_error(f'发送消息失败: {str(e)}', 500) @community_bp.route('/messages//reactions', methods=['POST']) @login_required def add_reaction(message_id): """添加表情反应""" try: user = g.current_user data = request.get_json() emoji = data.get('emoji') if not emoji: return api_error('请选择表情') # 获取当前消息 result = es_client.get(index='community_messages', id=message_id) message = result['_source'] # 更新 reactions reactions = message.get('reactions', {}) if emoji not in reactions: reactions[emoji] = [] if user['id'] not in reactions[emoji]: reactions[emoji].append(user['id']) # 计算总数 reaction_count = sum(len(users) for users in reactions.values()) # 更新 ES es_client.update( index='community_messages', id=message_id, doc={'reactions': reactions, 'reaction_count': reaction_count}, refresh=True ) return api_response(message='添加成功') except Exception as e: return api_error(f'添加表情失败: {str(e)}', 500) @community_bp.route('/messages//reactions/', methods=['DELETE']) @login_required def remove_reaction(message_id, emoji): """移除表情反应""" try: user = g.current_user # 获取当前消息 result = es_client.get(index='community_messages', id=message_id) message = result['_source'] # 更新 reactions reactions = message.get('reactions', {}) if emoji in reactions and user['id'] in reactions[emoji]: reactions[emoji].remove(user['id']) if not reactions[emoji]: del reactions[emoji] # 计算总数 reaction_count = sum(len(users) for users in reactions.values()) # 更新 ES es_client.update( index='community_messages', id=message_id, doc={'reactions': reactions, 'reaction_count': reaction_count}, refresh=True ) return api_response(message='移除成功') except Exception as e: return api_error(f'移除表情失败: {str(e)}', 500) # ============================================================ # Forum 帖子相关 API # ============================================================ @community_bp.route('/channels//posts', methods=['POST']) @login_required def create_post(channel_id): """创建帖子""" try: user = g.current_user data = request.get_json() title = data.get('title', '').strip() content = data.get('content', '').strip() if not title: return api_error('标题不能为空') if not content: return api_error('内容不能为空') post_id = generate_id() now = datetime.utcnow() # 构建帖子文档 post_doc = { 'id': post_id, 'channel_id': channel_id, 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'title': title, 'content': content, 'content_html': content, # 可以后续添加 Markdown 渲染 'tags': data.get('tags', []), 'stock_symbols': data.get('stockSymbols', []), 'is_pinned': False, 'is_locked': False, 'is_deleted': False, 'reply_count': 0, 'view_count': 0, 'like_count': 0, 'last_reply_at': None, 'last_reply_by': None, 'created_at': now.isoformat(), 'updated_at': now.isoformat(), } # 写入 ES es_client.index( index='community_forum_posts', id=post_id, document=post_doc, refresh=True ) # 更新频道消息数 with get_db_engine().connect() as conn: update_sql = text(""" UPDATE community_channels SET message_count = message_count + 1, last_message_at = :now WHERE id = :channel_id """) conn.execute(update_sql, {'now': now, 'channel_id': channel_id}) conn.commit() # 转换字段名 response_data = { 'id': post_doc['id'], 'channelId': post_doc['channel_id'], 'authorId': post_doc['author_id'], 'authorName': post_doc['author_name'], 'authorAvatar': post_doc['author_avatar'], 'title': post_doc['title'], 'content': post_doc['content'], 'tags': post_doc['tags'], 'stockSymbols': post_doc['stock_symbols'], 'isPinned': post_doc['is_pinned'], 'isLocked': post_doc['is_locked'], 'replyCount': post_doc['reply_count'], 'viewCount': post_doc['view_count'], 'likeCount': post_doc['like_count'], 'createdAt': post_doc['created_at'], } return api_response(response_data) except Exception as e: print(f"[Community API] 创建帖子失败: {e}") return api_error(f'创建帖子失败: {str(e)}', 500) @community_bp.route('/posts//like', methods=['POST']) @login_required def like_post(post_id): """点赞帖子""" try: # 更新 ES(简单实现,生产环境应该用单独的点赞表防止重复) es_client.update( index='community_forum_posts', id=post_id, script={ 'source': 'ctx._source.like_count += 1', 'lang': 'painless' }, refresh=True ) return api_response(message='点赞成功') except Exception as e: return api_error(f'点赞失败: {str(e)}', 500) @community_bp.route('/posts//view', methods=['POST']) def increment_view(post_id): """增加帖子浏览量""" try: es_client.update( index='community_forum_posts', id=post_id, script={ 'source': 'ctx._source.view_count += 1', 'lang': 'painless' } ) return api_response(message='success') except Exception as e: # 浏览量统计失败不影响主流程 return api_response(message='success') @community_bp.route('/posts//replies', methods=['POST']) @login_required def create_reply(post_id): """创建帖子回复""" try: user = g.current_user data = request.get_json() content = data.get('content', '').strip() if not content: return api_error('回复内容不能为空') reply_id = generate_id() now = datetime.utcnow() # 获取帖子信息 post_result = es_client.get(index='community_forum_posts', id=post_id) post = post_result['_source'] # 构建回复文档 reply_doc = { 'id': reply_id, 'post_id': post_id, 'channel_id': post['channel_id'], 'author_id': user['id'], 'author_name': user['username'], 'author_avatar': user.get('avatar', ''), 'content': content, 'content_html': content, 'reply_to': data.get('replyTo'), 'reactions': {}, 'like_count': 0, 'is_solution': False, 'is_deleted': False, 'created_at': now.isoformat(), } # 写入 ES es_client.index( index='community_forum_replies', id=reply_id, document=reply_doc, refresh=True ) # 更新帖子的回复数和最后回复时间 es_client.update( index='community_forum_posts', id=post_id, script={ 'source': ''' ctx._source.reply_count += 1; ctx._source.last_reply_at = params.now; ctx._source.last_reply_by = params.author_name; ''', 'lang': 'painless', 'params': { 'now': now.isoformat(), 'author_name': user['username'] } }, refresh=True ) # 转换字段名 response_data = { 'id': reply_doc['id'], 'postId': reply_doc['post_id'], 'channelId': reply_doc['channel_id'], 'authorId': reply_doc['author_id'], 'authorName': reply_doc['author_name'], 'authorAvatar': reply_doc['author_avatar'], 'content': reply_doc['content'], 'replyTo': reply_doc['reply_to'], 'likeCount': reply_doc['like_count'], 'createdAt': reply_doc['created_at'], } return api_response(response_data) except Exception as e: print(f"[Community API] 创建回复失败: {e}") return api_error(f'创建回复失败: {str(e)}', 500) # ============================================================ # ES 代理接口(前端直接查询 ES) # ============================================================ @community_bp.route('/es//_search', methods=['POST']) def es_search_proxy(index): """ ES 搜索代理 允许的索引:community_messages, community_forum_posts, community_forum_replies """ allowed_indices = [ 'community_messages', 'community_forum_posts', 'community_forum_replies', 'community_notifications' ] if index not in allowed_indices: return api_error('不允许访问该索引', 403) try: body = request.get_json() result = es_client.search(index=index, body=body) return jsonify(result) except Exception as e: print(f"[Community API] ES 搜索失败: {e}") return api_error(f'搜索失败: {str(e)}', 500) # ============================================================ # WebSocket 事件处理(需要在 app.py 中注册) # ============================================================ def register_community_socketio(socketio): """ 注册社区 WebSocket 事件 在 app.py 中调用:register_community_socketio(socketio) """ from flask_socketio import join_room, leave_room, emit @socketio.on('connect', namespace='/community') def handle_connect(): print('[Community Socket] Client connected') @socketio.on('disconnect', namespace='/community') def handle_disconnect(): print('[Community Socket] Client disconnected') @socketio.on('SUBSCRIBE_CHANNEL', namespace='/community') def handle_subscribe(data): channel_id = data.get('channelId') if channel_id: join_room(channel_id) print(f'[Community Socket] Joined room: {channel_id}') @socketio.on('UNSUBSCRIBE_CHANNEL', namespace='/community') def handle_unsubscribe(data): channel_id = data.get('channelId') if channel_id: leave_room(channel_id) print(f'[Community Socket] Left room: {channel_id}') @socketio.on('SEND_MESSAGE', namespace='/community') def handle_send_message(data): """通过 WebSocket 发送消息(实时广播)""" channel_id = data.get('channelId') # 广播给频道内所有用户 emit('MESSAGE_CREATE', data, room=channel_id, include_self=False) @socketio.on('START_TYPING', namespace='/community') def handle_start_typing(data): channel_id = data.get('channelId') user_id = session.get('user_id') user_name = session.get('username', '匿名') emit('TYPING_START', { 'channelId': channel_id, 'userId': user_id, 'userName': user_name }, room=channel_id, include_self=False) @socketio.on('STOP_TYPING', namespace='/community') def handle_stop_typing(data): channel_id = data.get('channelId') user_id = session.get('user_id') emit('TYPING_STOP', { 'channelId': channel_id, 'userId': user_id }, room=channel_id, include_self=False) print('✅ Community WebSocket 事件已注册')