Files
vf_react/community_api.py
2026-01-06 16:25:06 +08:00

1505 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
股票社区 API - Discord 风格
包含:频道、消息、帖子、回复、表情反应等接口
"""
import uuid
from datetime import datetime
from functools import wraps
from flask import Blueprint, request, jsonify, session, g, current_app
from elasticsearch import Elasticsearch
from sqlalchemy import text
# ============================================================
# Blueprint 定义
# ============================================================
community_bp = Blueprint('community', __name__, url_prefix='/api/community')
# ES 客户端
es_client = Elasticsearch(
hosts=["http://222.128.1.157:19200"],
request_timeout=30,
max_retries=3,
retry_on_timeout=True
)
# 注意:数据库连接从 app.py 的 db 对象获取,不再独立创建
# engine 变量会在 Blueprint 注册后通过 app context 获取
engine = None # 将在第一次请求时初始化
socketio_instance = None # 将在注册时设置
def get_db_engine():
"""获取数据库引擎(延迟初始化)"""
global engine
if engine is None:
from flask import current_app
# 从 app.py 的 SQLAlchemy 实例获取 engine
from app import db
engine = db.engine
return engine
# ============================================================
# 工具函数
# ============================================================
def generate_id():
"""生成唯一 ID"""
return str(uuid.uuid4()).replace('-', '')[:16]
def get_current_user():
"""获取当前登录用户"""
user_id = session.get('user_id')
if not user_id:
return None
return {
'id': str(user_id),
'username': session.get('username', '匿名用户'),
'avatar': session.get('avatar', ''),
}
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'code': 401, 'message': '请先登录'}), 401
g.current_user = user
return f(*args, **kwargs)
return decorated_function
def get_user_admin_info(user_id):
"""获取用户管理员信息"""
try:
with get_db_engine().connect() as conn:
sql = text("""
SELECT role, permissions
FROM community_admins
WHERE user_id = :user_id
""")
print(f"[Community API] 查询管理员信息: user_id={user_id}, type={type(user_id)}")
result = conn.execute(sql, {'user_id': int(user_id)}).fetchone()
print(f"[Community API] 查询结果: {result}")
if result:
import json
permissions = result.permissions
if isinstance(permissions, str):
permissions = json.loads(permissions)
admin_info = {
'role': result.role,
'permissions': permissions or {},
'isAdmin': result.role == 'admin',
'isModerator': result.role in ['admin', 'moderator']
}
print(f"[Community API] 返回管理员信息: {admin_info}")
return admin_info
except Exception as e:
print(f"[Community API] 获取管理员信息失败: {e}")
import traceback
traceback.print_exc()
return None
def check_permission(user_id, permission):
"""检查用户是否有指定权限"""
admin_info = get_user_admin_info(user_id)
if not admin_info:
return False
if admin_info['isAdmin']:
return True
return admin_info['permissions'].get(permission, False)
def admin_required(permission=None):
"""管理员权限验证装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'code': 401, 'message': '请先登录'}), 401
g.current_user = user
# 检查权限
if permission:
if not check_permission(user['id'], permission):
return jsonify({'code': 403, 'message': '无权限执行此操作'}), 403
else:
admin_info = get_user_admin_info(user['id'])
if not admin_info:
return jsonify({'code': 403, 'message': '需要管理员权限'}), 403
g.admin_info = get_user_admin_info(user['id'])
return f(*args, **kwargs)
return decorated_function
return decorator
def get_channel_admin_info(channel_id, user_id):
"""获取用户在指定频道的管理员信息"""
# 先检查是否是超级管理员
super_admin = get_user_admin_info(user_id)
if super_admin and super_admin['isAdmin']:
return {
'role': 'super_admin',
'isSuperAdmin': True,
'isOwner': True,
'isAdmin': True,
'isModerator': True,
'permissions': {
'delete_channel': True,
'edit_channel': True,
'manage_admins': True,
'pin_messages': True,
'delete_messages': True,
'slow_mode': True,
'lock_channel': True
}
}
# 检查频道管理员
try:
with get_db_engine().connect() as conn:
sql = text("""
SELECT role, permissions
FROM community_channel_admins
WHERE channel_id = :channel_id AND user_id = :user_id
""")
result = conn.execute(sql, {
'channel_id': channel_id,
'user_id': int(user_id)
}).fetchone()
if result:
import json
permissions = result.permissions
if isinstance(permissions, str):
permissions = json.loads(permissions)
role = result.role
return {
'role': role,
'isSuperAdmin': False,
'isOwner': role == 'owner',
'isAdmin': role in ['owner', 'admin'],
'isModerator': role in ['owner', 'admin', 'moderator'],
'permissions': permissions or {}
}
except Exception as e:
print(f"[Community API] 获取频道管理员信息失败: {e}")
return None
def check_channel_permission(channel_id, user_id, permission):
"""检查用户是否有指定频道的权限"""
admin_info = get_channel_admin_info(channel_id, user_id)
if not admin_info:
return False
if admin_info['isSuperAdmin'] or admin_info['isOwner']:
return True
return admin_info['permissions'].get(permission, False)
def add_channel_admin(channel_id, user_id, role='owner'):
"""添加频道管理员"""
try:
with get_db_engine().connect() as conn:
# 根据角色设置默认权限
permissions = {}
if role == 'owner':
permissions = {
'delete_channel': True,
'edit_channel': True,
'manage_admins': True,
'pin_messages': True,
'delete_messages': True,
'slow_mode': True,
'lock_channel': True
}
elif role == 'admin':
permissions = {
'edit_channel': True,
'pin_messages': True,
'delete_messages': True,
'slow_mode': True
}
elif role == 'moderator':
permissions = {
'pin_messages': True,
'delete_messages': True
}
import json
sql = text("""
INSERT INTO community_channel_admins (channel_id, user_id, role, permissions)
VALUES (:channel_id, :user_id, :role, :permissions)
ON DUPLICATE KEY UPDATE role = :role, permissions = :permissions
""")
conn.execute(sql, {
'channel_id': channel_id,
'user_id': int(user_id),
'role': role,
'permissions': json.dumps(permissions)
})
conn.commit()
return True
except Exception as e:
print(f"[Community API] 添加频道管理员失败: {e}")
return False
def api_response(data=None, message='success', code=200):
"""统一 API 响应格式"""
return jsonify({
'code': code,
'message': message,
'data': data
})
def api_error(message, code=400):
"""API 错误响应"""
return jsonify({
'code': code,
'message': message
}), code if code >= 400 else 400
# ============================================================
# 在线成员 API
# ============================================================
@community_bp.route('/members/online', methods=['GET'])
def get_online_members_api():
"""获取当前在线成员列表"""
members = get_online_members()
return api_response(members)
# ============================================================
# 用户管理员状态 API
# ============================================================
@community_bp.route('/me/admin-status', methods=['GET'])
@login_required
def get_my_admin_status():
"""获取当前用户的管理员状态"""
user = g.current_user
print(f"[Community API] /me/admin-status 请求, user={user}")
admin_info = get_user_admin_info(user['id'])
print(f"[Community API] /me/admin-status admin_info={admin_info}")
if admin_info:
return api_response({
'isAdmin': admin_info['isAdmin'],
'isModerator': admin_info['isModerator'],
'role': admin_info['role'],
'permissions': admin_info['permissions']
})
else:
return api_response({
'isAdmin': False,
'isModerator': False,
'role': None,
'permissions': {}
})
@community_bp.route('/channels/<channel_id>/admin-status', methods=['GET'])
@login_required
def get_channel_admin_status(channel_id):
"""获取当前用户在指定频道的管理员状态"""
user = g.current_user
admin_info = get_channel_admin_info(channel_id, user['id'])
if admin_info:
return api_response({
'role': admin_info['role'],
'isSuperAdmin': admin_info['isSuperAdmin'],
'isOwner': admin_info['isOwner'],
'isAdmin': admin_info['isAdmin'],
'isModerator': admin_info['isModerator'],
'permissions': admin_info['permissions']
})
else:
return api_response({
'role': None,
'isSuperAdmin': False,
'isOwner': False,
'isAdmin': False,
'isModerator': False,
'permissions': {}
})
@community_bp.route('/channels/<channel_id>/admins', methods=['GET'])
@login_required
def get_channel_admins(channel_id):
"""获取频道管理员列表(包含超级管理员)"""
try:
import json
admins = []
with get_db_engine().connect() as conn:
# 1. 先获取全局超级管理员
# 注意users 表可能没有 avatar 字段,使用 NULL 作为默认值
super_admin_sql = text("""
SELECT ca.user_id, u.username
FROM community_admins ca
LEFT JOIN user u ON ca.user_id = u.id
WHERE ca.role = 'admin'
""")
super_admins = conn.execute(super_admin_sql).fetchall()
for row in super_admins:
admins.append({
'userId': str(row.user_id),
'username': row.username or f'用户{row.user_id}',
'avatar': None,
'role': 'super_admin', # 特殊标记
'isSuperAdmin': True,
'permissions': {
'delete_channel': True,
'edit_channel': True,
'manage_admins': True,
'pin_messages': True,
'delete_messages': True,
'slow_mode': True,
'lock_channel': True
},
'createdAt': None
})
# 2. 获取频道管理员(排除已在超级管理员列表中的用户)
super_admin_ids = [str(row.user_id) for row in super_admins]
# 检查 community_channel_admins 表是否存在
try:
sql = text("""
SELECT cca.user_id, cca.role, cca.permissions, cca.created_at,
u.username
FROM community_channel_admins cca
LEFT JOIN user u ON cca.user_id = u.id
WHERE cca.channel_id = :channel_id
ORDER BY
CASE cca.role
WHEN 'owner' THEN 1
WHEN 'admin' THEN 2
WHEN 'moderator' THEN 3
END
""")
result = conn.execute(sql, {'channel_id': channel_id}).fetchall()
for row in result:
# 跳过已在超级管理员列表中的用户
if str(row.user_id) in super_admin_ids:
continue
permissions = row.permissions
if isinstance(permissions, str):
permissions = json.loads(permissions)
admins.append({
'userId': str(row.user_id),
'username': row.username or f'用户{row.user_id}',
'avatar': None,
'role': row.role,
'isSuperAdmin': False,
'permissions': permissions or {},
'createdAt': row.created_at.isoformat() if row.created_at else None
})
except Exception as table_err:
# 表不存在时忽略,只返回超级管理员
print(f"[Community API] 频道管理员表查询失败(可能表不存在): {table_err}")
return api_response(admins)
except Exception as e:
print(f"[Community API] 获取频道管理员列表失败: {e}")
import traceback
traceback.print_exc()
return api_error(f'获取管理员列表失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/admins', methods=['POST'])
@login_required
def add_channel_admin_api(channel_id):
"""添加频道管理员(需要 owner 或超级管理员权限)"""
user = g.current_user
# 检查权限
if not check_channel_permission(channel_id, user['id'], 'manage_admins'):
return api_error('无权限管理频道管理员', 403)
data = request.get_json()
target_user_id = data.get('userId')
role = data.get('role', 'moderator')
if not target_user_id:
return api_error('请指定用户ID')
if role not in ['admin', 'moderator']:
return api_error('无效的角色,只能设置 admin 或 moderator')
# 不能设置为 ownerowner 只能是创建者)
if add_channel_admin(channel_id, target_user_id, role):
return api_response({'success': True, 'message': f'已添加 {role}'})
else:
return api_error('添加管理员失败', 500)
@community_bp.route('/channels/<channel_id>/admins/<user_id>', methods=['DELETE'])
@login_required
def remove_channel_admin_api(channel_id, user_id):
"""移除频道管理员(需要 owner 或超级管理员权限)"""
current_user = g.current_user
# 检查权限
if not check_channel_permission(channel_id, current_user['id'], 'manage_admins'):
return api_error('无权限管理频道管理员', 403)
# 不能移除 owner
target_admin = get_channel_admin_info(channel_id, user_id)
if target_admin and target_admin['isOwner'] and not target_admin['isSuperAdmin']:
return api_error('不能移除频道创建者', 400)
try:
with get_db_engine().connect() as conn:
sql = text("""
DELETE FROM community_channel_admins
WHERE channel_id = :channel_id AND user_id = :user_id
""")
conn.execute(sql, {
'channel_id': channel_id,
'user_id': int(user_id)
})
conn.commit()
return api_response({'success': True, 'message': '已移除管理员'})
except Exception as e:
print(f"[Community API] 移除频道管理员失败: {e}")
return api_error(f'移除管理员失败: {str(e)}', 500)
# ============================================================
# 频道相关 API
# ============================================================
@community_bp.route('/channels', methods=['GET'])
def get_channels():
"""
获取频道列表(按分类组织)
返回格式:[{ id, name, icon, channels: [...] }, ...]
"""
try:
with get_db_engine().connect() as conn:
# 查询分类
categories_sql = text("""
SELECT id, name, icon, position, is_collapsible, is_system
FROM community_categories
ORDER BY position
""")
categories_result = conn.execute(categories_sql).fetchall()
# 查询频道
channels_sql = text("""
SELECT
c.id, c.category_id, c.name, c.type, c.topic, c.position,
c.concept_code, c.slow_mode, c.is_readonly, c.is_system,
c.subscriber_count, c.message_count, c.last_message_at,
cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.is_visible = 1
ORDER BY c.position
""")
channels_result = conn.execute(channels_sql).fetchall()
# 组装数据
channels_by_category = {}
for ch in channels_result:
cat_id = ch.category_id
if cat_id not in channels_by_category:
channels_by_category[cat_id] = []
channels_by_category[cat_id].append({
'id': ch.id,
'categoryId': ch.category_id,
'name': ch.name,
'type': ch.type,
'topic': ch.topic,
'position': ch.position,
'conceptCode': ch.concept_code,
'slowMode': ch.slow_mode or 0,
'isReadonly': bool(ch.is_readonly),
'isSystem': bool(ch.is_system),
'subscriberCount': ch.subscriber_count or 0,
'messageCount': ch.message_count or 0,
'lastMessageAt': ch.last_message_at.isoformat() if ch.last_message_at else None,
'conceptName': ch.concept_name,
'stockCount': ch.stock_count,
'isHot': bool(ch.is_hot) if ch.is_hot is not None else False,
})
result = []
for cat in categories_result:
result.append({
'id': cat.id,
'name': cat.name,
'icon': cat.icon or '',
'position': cat.position,
'isCollapsible': bool(cat.is_collapsible),
'isSystem': bool(cat.is_system),
'channels': channels_by_category.get(cat.id, [])
})
return api_response(result)
except Exception as e:
print(f"[Community API] 获取频道列表失败: {e}")
return api_error(f'获取频道列表失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>', methods=['GET'])
def get_channel(channel_id):
"""获取单个频道详情"""
try:
with get_db_engine().connect() as conn:
sql = text("""
SELECT
c.*, cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.id = :channel_id
""")
result = conn.execute(sql, {'channel_id': channel_id}).fetchone()
if not result:
return api_error('频道不存在', 404)
return api_response({
'id': result.id,
'categoryId': result.category_id,
'name': result.name,
'type': result.type,
'topic': result.topic,
'conceptCode': result.concept_code,
'slowMode': result.slow_mode or 0,
'isReadonly': bool(result.is_readonly),
'subscriberCount': result.subscriber_count or 0,
'messageCount': result.message_count or 0,
'conceptName': result.concept_name,
'stockCount': result.stock_count,
'isHot': bool(result.is_hot) if result.is_hot is not None else False,
})
except Exception as e:
return api_error(f'获取频道失败: {str(e)}', 500)
@community_bp.route('/channels', methods=['POST'])
@login_required
def create_channel():
"""
创建新频道
请求体: { name, type, topic?, categoryId }
"""
try:
user = g.current_user
data = request.get_json()
name = data.get('name', '').strip()
channel_type = data.get('type', 'text')
topic = data.get('topic', '').strip()
category_id = data.get('categoryId')
if not name:
return api_error('频道名称不能为空')
if len(name) > 50:
return api_error('频道名称不能超过50个字符')
if channel_type not in ['text', 'forum', 'voice', 'announcement', 'prediction']:
return api_error('无效的频道类型')
channel_id = f"ch_{generate_id()}"
now = datetime.utcnow()
with get_db_engine().connect() as conn:
# 如果没有指定分类,使用默认分类(自由讨论)
if not category_id:
default_cat_sql = text("""
SELECT id FROM community_categories
WHERE name = '自由讨论' OR name LIKE '%讨论%'
ORDER BY position
LIMIT 1
""")
default_cat = conn.execute(default_cat_sql).fetchone()
if default_cat:
category_id = default_cat.id
else:
# 如果没有找到,使用第一个分类
first_cat_sql = text("SELECT id FROM community_categories ORDER BY position LIMIT 1")
first_cat = conn.execute(first_cat_sql).fetchone()
if first_cat:
category_id = first_cat.id
else:
return api_error('没有可用的分类,请先创建分类')
# 获取当前分类下最大的 position
max_pos_sql = text("""
SELECT COALESCE(MAX(position), 0) as max_pos
FROM community_channels
WHERE category_id = :category_id
""")
max_pos_result = conn.execute(max_pos_sql, {'category_id': category_id}).fetchone()
next_position = (max_pos_result.max_pos or 0) + 1
# 插入新频道
insert_sql = text("""
INSERT INTO community_channels
(id, category_id, name, type, topic, position, slow_mode, is_readonly,
is_visible, is_system, subscriber_count, message_count, created_at, updated_at)
VALUES
(:id, :category_id, :name, :type, :topic, :position, 0, 0,
1, 0, 0, 0, :now, :now)
""")
conn.execute(insert_sql, {
'id': channel_id,
'category_id': category_id,
'name': name,
'type': channel_type,
'topic': topic,
'position': next_position,
'now': now,
})
conn.commit()
# 将创建者设为频道管理员owner
add_channel_admin(channel_id, user['id'], role='owner')
# 返回创建的频道信息
response_data = {
'id': channel_id,
'categoryId': category_id,
'name': name,
'type': channel_type,
'topic': topic,
'position': next_position,
'slowMode': 0,
'isReadonly': False,
'isSystem': False,
'subscriberCount': 0,
'messageCount': 0,
'createdAt': now.isoformat(),
'isOwner': True, # 创建者自动是 owner
}
print(f"[Community API] 创建频道成功: {channel_id} - {name}, 创建者: {user['id']}")
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建频道失败: {e}")
import traceback
traceback.print_exc()
return api_error(f'创建频道失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/subscribe', methods=['POST'])
@login_required
def subscribe_channel(channel_id):
"""订阅频道"""
try:
user = g.current_user
subscription_id = generate_id()
with get_db_engine().connect() as conn:
# 检查是否已订阅
check_sql = text("""
SELECT id FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
existing = conn.execute(check_sql, {
'user_id': user['id'],
'channel_id': channel_id
}).fetchone()
if existing:
return api_response(message='已订阅')
# 创建订阅
insert_sql = text("""
INSERT INTO community_subscriptions (id, user_id, channel_id, notification_level)
VALUES (:id, :user_id, :channel_id, 'all')
""")
conn.execute(insert_sql, {
'id': subscription_id,
'user_id': user['id'],
'channel_id': channel_id
})
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = subscriber_count + 1
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='订阅成功')
except Exception as e:
return api_error(f'订阅失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/unsubscribe', methods=['POST'])
@login_required
def unsubscribe_channel(channel_id):
"""取消订阅频道"""
try:
user = g.current_user
with get_db_engine().connect() as conn:
# 删除订阅
delete_sql = text("""
DELETE FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
result = conn.execute(delete_sql, {
'user_id': user['id'],
'channel_id': channel_id
})
if result.rowcount > 0:
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = GREATEST(subscriber_count - 1, 0)
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='取消订阅成功')
except Exception as e:
return api_error(f'取消订阅失败: {str(e)}', 500)
# ============================================================
# 消息相关 API即时聊天
# ============================================================
@community_bp.route('/channels/<channel_id>/messages', methods=['POST'])
@login_required
def send_message(channel_id):
"""发送消息"""
try:
user = g.current_user
data = request.get_json()
# 检查频道类型和权限
with get_db_engine().connect() as conn:
channel_sql = text("""
SELECT type, is_readonly FROM community_channels WHERE id = :channel_id
""")
channel = conn.execute(channel_sql, {'channel_id': channel_id}).fetchone()
if not channel:
return api_error('频道不存在', 404)
# 公告频道只有管理员可以发消息
if channel.type == 'announcement':
admin_info = get_channel_admin_info(channel_id, user['id'])
if not admin_info or not admin_info.get('isAdmin'):
return api_error('只有管理员可以在公告频道发送消息', 403)
# 只读频道不能发消息
if channel.is_readonly:
admin_info = get_channel_admin_info(channel_id, user['id'])
if not admin_info or not admin_info.get('isAdmin'):
return api_error('该频道为只读频道', 403)
content = data.get('content', '').strip()
if not content:
return api_error('消息内容不能为空')
message_id = generate_id()
now = datetime.utcnow()
# 将 Markdown 图片语法转换为 HTML支持 base64 图片)
content_html = parse_markdown_images(content)
# 构建消息文档
message_doc = {
'id': message_id,
'channel_id': channel_id,
'thread_id': data.get('threadId'),
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'content_html': content_html, # Markdown 转换后的 HTML
'type': 'text',
'mentioned_users': data.get('mentionedUsers', []),
'mentioned_stocks': data.get('mentionedStocks', []),
'mentioned_everyone': data.get('mentionedEveryone', False),
'reply_to': data.get('replyTo'),
'reactions': {},
'reaction_count': 0,
'is_pinned': False,
'is_edited': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_messages',
id=message_id,
document=message_doc,
refresh=True
)
# 更新频道最后消息时间和消息数
with get_db_engine().connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_id = :message_id,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {
'message_id': message_id,
'now': now,
'channel_id': channel_id
})
conn.commit()
# 转换字段名为 camelCase
response_data = {
'id': message_doc['id'],
'channelId': message_doc['channel_id'],
'threadId': message_doc['thread_id'],
'authorId': message_doc['author_id'],
'authorName': message_doc['author_name'],
'authorAvatar': message_doc['author_avatar'],
'content': message_doc['content'],
'type': message_doc['type'],
'mentionedUsers': message_doc['mentioned_users'],
'mentionedStocks': message_doc['mentioned_stocks'],
'replyTo': message_doc['reply_to'],
'reactions': message_doc['reactions'],
'reactionCount': message_doc['reaction_count'],
'isPinned': message_doc['is_pinned'],
'isEdited': message_doc['is_edited'],
'createdAt': message_doc['created_at'],
}
# 通过 WebSocket 广播新消息给频道内的其他用户
if socketio_instance:
socketio_instance.emit(
'MESSAGE_CREATE',
response_data,
room=channel_id,
namespace='/community'
)
return api_response(response_data)
except Exception as e:
print(f"[Community API] 发送消息失败: {e}")
return api_error(f'发送消息失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions', methods=['POST'])
@login_required
def add_reaction(message_id):
"""添加表情反应"""
try:
user = g.current_user
data = request.get_json()
emoji = data.get('emoji')
if not emoji:
return api_error('请选择表情')
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji not in reactions:
reactions[emoji] = []
if user['id'] not in reactions[emoji]:
reactions[emoji].append(user['id'])
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='添加成功')
except Exception as e:
return api_error(f'添加表情失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions/<emoji>', methods=['DELETE'])
@login_required
def remove_reaction(message_id, emoji):
"""移除表情反应"""
try:
user = g.current_user
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji in reactions and user['id'] in reactions[emoji]:
reactions[emoji].remove(user['id'])
if not reactions[emoji]:
del reactions[emoji]
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='移除成功')
except Exception as e:
return api_error(f'移除表情失败: {str(e)}', 500)
# ============================================================
# Forum 帖子相关 API
# ============================================================
def parse_markdown_images(content):
"""
将 Markdown 图片语法转换为 HTML img 标签
支持 ![alt](url) 格式,包括 base64 data URL
"""
import re
# 匹配 ![alt](url) 格式
pattern = r'!\[([^\]]*)\]\(([^)]+)\)'
def replace_image(match):
alt = match.group(1) or '图片'
url = match.group(2)
return f'<img src="{url}" alt="{alt}" style="max-width: 100%; border-radius: 8px; margin: 16px 0;" />'
html = re.sub(pattern, replace_image, content)
# 将换行转换为 <br>
html = html.replace('\n', '<br />')
return html
@community_bp.route('/channels/<channel_id>/posts', methods=['POST'])
@login_required
def create_post(channel_id):
"""创建帖子"""
try:
user = g.current_user
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return api_error('标题不能为空')
if not content:
return api_error('内容不能为空')
post_id = generate_id()
now = datetime.utcnow()
# 将 Markdown 图片语法转换为 HTML
content_html = parse_markdown_images(content)
# 构建帖子文档
post_doc = {
'id': post_id,
'channel_id': channel_id,
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'title': title,
'content': content,
'content_html': content_html, # Markdown 转换后的 HTML
'tags': data.get('tags', []),
'stock_symbols': data.get('stockSymbols', []),
'is_pinned': False,
'is_locked': False,
'is_deleted': False,
'reply_count': 0,
'view_count': 0,
'like_count': 0,
'last_reply_at': None,
'last_reply_by': None,
'created_at': now.isoformat(),
'updated_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_posts',
id=post_id,
document=post_doc,
refresh=True
)
# 更新频道消息数
with get_db_engine().connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {'now': now, 'channel_id': channel_id})
conn.commit()
# 转换字段名
response_data = {
'id': post_doc['id'],
'channelId': post_doc['channel_id'],
'authorId': post_doc['author_id'],
'authorName': post_doc['author_name'],
'authorAvatar': post_doc['author_avatar'],
'title': post_doc['title'],
'content': post_doc['content'],
'tags': post_doc['tags'],
'stockSymbols': post_doc['stock_symbols'],
'isPinned': post_doc['is_pinned'],
'isLocked': post_doc['is_locked'],
'replyCount': post_doc['reply_count'],
'viewCount': post_doc['view_count'],
'likeCount': post_doc['like_count'],
'createdAt': post_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建帖子失败: {e}")
return api_error(f'创建帖子失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/like', methods=['POST'])
@login_required
def like_post(post_id):
"""点赞帖子"""
try:
# 更新 ES简单实现生产环境应该用单独的点赞表防止重复
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.like_count += 1',
'lang': 'painless'
},
refresh=True
)
return api_response(message='点赞成功')
except Exception as e:
return api_error(f'点赞失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/view', methods=['POST'])
def increment_view(post_id):
"""增加帖子浏览量"""
try:
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.view_count += 1',
'lang': 'painless'
}
)
return api_response(message='success')
except Exception as e:
# 浏览量统计失败不影响主流程
return api_response(message='success')
@community_bp.route('/posts/<post_id>/replies', methods=['POST'])
@login_required
def create_reply(post_id):
"""创建帖子回复"""
try:
user = g.current_user
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return api_error('回复内容不能为空')
reply_id = generate_id()
now = datetime.utcnow()
# 获取帖子信息
post_result = es_client.get(index='community_forum_posts', id=post_id)
post = post_result['_source']
# 将 Markdown 图片语法转换为 HTML
content_html = parse_markdown_images(content)
# 构建回复文档
reply_doc = {
'id': reply_id,
'post_id': post_id,
'channel_id': post['channel_id'],
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'content_html': content_html, # Markdown 转换后的 HTML
'reply_to': data.get('replyTo'),
'reactions': {},
'like_count': 0,
'is_solution': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_replies',
id=reply_id,
document=reply_doc,
refresh=True
)
# 更新帖子的回复数和最后回复时间
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': '''
ctx._source.reply_count += 1;
ctx._source.last_reply_at = params.now;
ctx._source.last_reply_by = params.author_name;
''',
'lang': 'painless',
'params': {
'now': now.isoformat(),
'author_name': user['username']
}
},
refresh=True
)
# 转换字段名
response_data = {
'id': reply_doc['id'],
'postId': reply_doc['post_id'],
'channelId': reply_doc['channel_id'],
'authorId': reply_doc['author_id'],
'authorName': reply_doc['author_name'],
'authorAvatar': reply_doc['author_avatar'],
'content': reply_doc['content'],
'replyTo': reply_doc['reply_to'],
'likeCount': reply_doc['like_count'],
'createdAt': reply_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建回复失败: {e}")
return api_error(f'创建回复失败: {str(e)}', 500)
# ============================================================
# 文件上传接口
# ============================================================
import os
from werkzeug.utils import secure_filename
# 允许的图片扩展名
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# 上传目录(相对于 Flask 应用根目录)
UPLOAD_FOLDER = 'public/uploads/community'
# 最大文件大小 10MB
MAX_FILE_SIZE = 10 * 1024 * 1024
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@community_bp.route('/upload/image', methods=['POST'])
@login_required
def upload_image():
"""
上传图片
返回 base64 编码的图片数据,可直接嵌入帖子内容中存储到 ES
"""
import base64
try:
if 'file' not in request.files:
return api_error('没有选择文件', 400)
file = request.files['file']
if file.filename == '':
return api_error('没有选择文件', 400)
if not allowed_file(file.filename):
return api_error('不支持的文件格式,仅支持 PNG、JPG、GIF、WebP', 400)
# 读取文件内容
file_content = file.read()
size = len(file_content)
if size > MAX_FILE_SIZE:
return api_error('文件大小超过限制(最大 10MB', 400)
# 获取文件扩展名和 MIME 类型
ext = file.filename.rsplit('.', 1)[1].lower()
mime_types = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp'
}
mime_type = mime_types.get(ext, 'image/jpeg')
# 转换为 base64
base64_data = base64.b64encode(file_content).decode('utf-8')
data_url = f"data:{mime_type};base64,{base64_data}"
# 生成唯一标识
filename = f"{generate_id()}.{ext}"
return api_response({
'url': data_url, # base64 data URL可直接用于 img src
'filename': filename,
'size': size,
'type': mime_type
})
except Exception as e:
print(f"[Community API] 上传图片失败: {e}")
import traceback
traceback.print_exc()
return api_error(f'上传失败: {str(e)}', 500)
# ============================================================
# ES 代理接口(前端直接查询 ES
# ============================================================
@community_bp.route('/es/<index>/_search', methods=['POST'])
def es_search_proxy(index):
"""
ES 搜索代理
允许的索引community_messages, community_forum_posts, community_forum_replies
"""
allowed_indices = [
'community_messages',
'community_forum_posts',
'community_forum_replies',
'community_notifications'
]
if index not in allowed_indices:
return api_error('不允许访问该索引', 403)
try:
body = request.get_json()
print(f"[Community API] ES 搜索请求: index={index}, body={body}")
# 尝试使用不同的 API 调用方式(兼容不同版本的 elasticsearch-py
try:
# 新版本 API
result = es_client.search(index=index, body=body)
except TypeError:
# 旧版本 API参数可能不同
result = es_client.search(index=index, **body)
# 处理结果格式
if hasattr(result, 'body'):
result = result.body
elif hasattr(result, '__getitem__'):
pass # 已经是 dict
else:
result = dict(result)
print(f"[Community API] ES 搜索成功: 返回 {result.get('hits', {}).get('total', {})} 条结果")
return jsonify(result)
except Exception as e:
print(f"[Community API] ES 搜索失败: {e}")
import traceback
traceback.print_exc()
# 返回更详细的错误信息
return jsonify({
'error': str(e),
'type': type(e).__name__,
'index': index,
'body': body if 'body' in dir() else None
}), 500
# ============================================================
# WebSocket 事件处理(需要在 app.py 中注册)
# ============================================================
# 在线用户管理
# ============================================================
# 在线用户字典: {user_id: {sid: socket_id, username: str, avatar: str, connected_at: datetime}}
online_users = {}
# Socket ID 到用户 ID 的映射: {socket_id: user_id}
sid_to_user = {}
def get_online_members():
"""获取当前在线用户列表"""
return [
{
'userId': str(user_id),
'username': info['username'],
'avatar': info.get('avatar', ''),
'isOnline': True,
'connectedAt': info['connected_at'].isoformat() if info.get('connected_at') else None,
}
for user_id, info in online_users.items()
]
def register_community_socketio(socketio):
"""
注册社区 WebSocket 事件
在 app.py 中调用register_community_socketio(socketio)
"""
global socketio_instance
socketio_instance = socketio
from flask_socketio import join_room, leave_room, emit
from flask import request
@socketio.on('connect', namespace='/community')
def handle_connect():
user_id = session.get('user_id')
username = session.get('username', '匿名用户')
avatar = session.get('avatar', '')
sid = request.sid
if user_id:
# 记录在线用户
online_users[user_id] = {
'sid': sid,
'username': username,
'avatar': avatar,
'connected_at': datetime.utcnow()
}
sid_to_user[sid] = user_id
# 广播用户上线事件
emit('MEMBER_ONLINE', {
'userId': str(user_id),
'username': username,
'avatar': avatar,
}, broadcast=True, include_self=False)
print(f'[Community Socket] User {username}({user_id}) connected, online: {len(online_users)}')
else:
print('[Community Socket] Anonymous client connected')
@socketio.on('disconnect', namespace='/community')
def handle_disconnect():
sid = request.sid
user_id = sid_to_user.pop(sid, None)
if user_id and user_id in online_users:
username = online_users[user_id].get('username', '匿名')
del online_users[user_id]
# 广播用户下线事件
emit('MEMBER_OFFLINE', {
'userId': str(user_id),
}, broadcast=True)
print(f'[Community Socket] User {username}({user_id}) disconnected, online: {len(online_users)}')
@socketio.on('SUBSCRIBE_CHANNEL', namespace='/community')
def handle_subscribe(data):
channel_id = data.get('channelId')
if channel_id:
join_room(channel_id)
print(f'[Community Socket] Joined room: {channel_id}')
@socketio.on('UNSUBSCRIBE_CHANNEL', namespace='/community')
def handle_unsubscribe(data):
channel_id = data.get('channelId')
if channel_id:
leave_room(channel_id)
print(f'[Community Socket] Left room: {channel_id}')
@socketio.on('SEND_MESSAGE', namespace='/community')
def handle_send_message(data):
"""通过 WebSocket 发送消息(实时广播)"""
channel_id = data.get('channelId')
# 广播给频道内所有用户
emit('MESSAGE_CREATE', data, room=channel_id, include_self=False)
@socketio.on('START_TYPING', namespace='/community')
def handle_start_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
user_name = session.get('username', '匿名')
emit('TYPING_START', {
'channelId': channel_id,
'userId': user_id,
'userName': user_name
}, room=channel_id, include_self=False)
@socketio.on('STOP_TYPING', namespace='/community')
def handle_stop_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
emit('TYPING_STOP', {
'channelId': channel_id,
'userId': user_id
}, room=channel_id, include_self=False)
print('✅ Community WebSocket 事件已注册')