Files
vf_react/community_api.py
2026-01-06 10:25:12 +08:00

771 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
股票社区 API - Discord 风格
包含:频道、消息、帖子、回复、表情反应等接口
"""
import uuid
from datetime import datetime
from functools import wraps
from flask import Blueprint, request, jsonify, session, g, current_app
from elasticsearch import Elasticsearch
from sqlalchemy import text
# ============================================================
# Blueprint 定义
# ============================================================
community_bp = Blueprint('community', __name__, url_prefix='/api/community')
# ES 客户端
es_client = Elasticsearch(
hosts=["http://222.128.1.157:19200"],
request_timeout=30,
max_retries=3,
retry_on_timeout=True
)
# 注意:数据库连接从 app.py 的 db 对象获取,不再独立创建
# engine 变量会在 Blueprint 注册后通过 app context 获取
engine = None # 将在第一次请求时初始化
def get_db_engine():
"""获取数据库引擎(延迟初始化)"""
global engine
if engine is None:
from flask import current_app
# 从 app.py 的 SQLAlchemy 实例获取 engine
from app import db
engine = db.engine
return engine
# ============================================================
# 工具函数
# ============================================================
def generate_id():
"""生成唯一 ID"""
return str(uuid.uuid4()).replace('-', '')[:16]
def get_current_user():
"""获取当前登录用户"""
user_id = session.get('user_id')
if not user_id:
return None
return {
'id': str(user_id),
'username': session.get('username', '匿名用户'),
'avatar': session.get('avatar', ''),
}
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'code': 401, 'message': '请先登录'}), 401
g.current_user = user
return f(*args, **kwargs)
return decorated_function
def api_response(data=None, message='success', code=200):
"""统一 API 响应格式"""
return jsonify({
'code': code,
'message': message,
'data': data
})
def api_error(message, code=400):
"""API 错误响应"""
return jsonify({
'code': code,
'message': message
}), code if code >= 400 else 400
# ============================================================
# 频道相关 API
# ============================================================
@community_bp.route('/channels', methods=['GET'])
def get_channels():
"""
获取频道列表(按分类组织)
返回格式:[{ id, name, icon, channels: [...] }, ...]
"""
try:
with get_db_engine().connect() as conn:
# 查询分类
categories_sql = text("""
SELECT id, name, icon, position, is_collapsible, is_system
FROM community_categories
ORDER BY position
""")
categories_result = conn.execute(categories_sql).fetchall()
# 查询频道
channels_sql = text("""
SELECT
c.id, c.category_id, c.name, c.type, c.topic, c.position,
c.concept_code, c.slow_mode, c.is_readonly, c.is_system,
c.subscriber_count, c.message_count, c.last_message_at,
cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.is_visible = 1
ORDER BY c.position
""")
channels_result = conn.execute(channels_sql).fetchall()
# 组装数据
channels_by_category = {}
for ch in channels_result:
cat_id = ch.category_id
if cat_id not in channels_by_category:
channels_by_category[cat_id] = []
channels_by_category[cat_id].append({
'id': ch.id,
'categoryId': ch.category_id,
'name': ch.name,
'type': ch.type,
'topic': ch.topic,
'position': ch.position,
'conceptCode': ch.concept_code,
'slowMode': ch.slow_mode or 0,
'isReadonly': bool(ch.is_readonly),
'isSystem': bool(ch.is_system),
'subscriberCount': ch.subscriber_count or 0,
'messageCount': ch.message_count or 0,
'lastMessageAt': ch.last_message_at.isoformat() if ch.last_message_at else None,
'conceptName': ch.concept_name,
'stockCount': ch.stock_count,
'isHot': bool(ch.is_hot) if ch.is_hot is not None else False,
})
result = []
for cat in categories_result:
result.append({
'id': cat.id,
'name': cat.name,
'icon': cat.icon or '',
'position': cat.position,
'isCollapsible': bool(cat.is_collapsible),
'isSystem': bool(cat.is_system),
'channels': channels_by_category.get(cat.id, [])
})
return api_response(result)
except Exception as e:
print(f"[Community API] 获取频道列表失败: {e}")
return api_error(f'获取频道列表失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>', methods=['GET'])
def get_channel(channel_id):
"""获取单个频道详情"""
try:
with get_db_engine().connect() as conn:
sql = text("""
SELECT
c.*, cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.id = :channel_id
""")
result = conn.execute(sql, {'channel_id': channel_id}).fetchone()
if not result:
return api_error('频道不存在', 404)
return api_response({
'id': result.id,
'categoryId': result.category_id,
'name': result.name,
'type': result.type,
'topic': result.topic,
'conceptCode': result.concept_code,
'slowMode': result.slow_mode or 0,
'isReadonly': bool(result.is_readonly),
'subscriberCount': result.subscriber_count or 0,
'messageCount': result.message_count or 0,
'conceptName': result.concept_name,
'stockCount': result.stock_count,
'isHot': bool(result.is_hot) if result.is_hot is not None else False,
})
except Exception as e:
return api_error(f'获取频道失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/subscribe', methods=['POST'])
@login_required
def subscribe_channel(channel_id):
"""订阅频道"""
try:
user = g.current_user
subscription_id = generate_id()
with get_db_engine().connect() as conn:
# 检查是否已订阅
check_sql = text("""
SELECT id FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
existing = conn.execute(check_sql, {
'user_id': user['id'],
'channel_id': channel_id
}).fetchone()
if existing:
return api_response(message='已订阅')
# 创建订阅
insert_sql = text("""
INSERT INTO community_subscriptions (id, user_id, channel_id, notification_level)
VALUES (:id, :user_id, :channel_id, 'all')
""")
conn.execute(insert_sql, {
'id': subscription_id,
'user_id': user['id'],
'channel_id': channel_id
})
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = subscriber_count + 1
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='订阅成功')
except Exception as e:
return api_error(f'订阅失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/unsubscribe', methods=['POST'])
@login_required
def unsubscribe_channel(channel_id):
"""取消订阅频道"""
try:
user = g.current_user
with get_db_engine().connect() as conn:
# 删除订阅
delete_sql = text("""
DELETE FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
result = conn.execute(delete_sql, {
'user_id': user['id'],
'channel_id': channel_id
})
if result.rowcount > 0:
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = GREATEST(subscriber_count - 1, 0)
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='取消订阅成功')
except Exception as e:
return api_error(f'取消订阅失败: {str(e)}', 500)
# ============================================================
# 消息相关 API即时聊天
# ============================================================
@community_bp.route('/channels/<channel_id>/messages', methods=['POST'])
@login_required
def send_message(channel_id):
"""发送消息"""
try:
user = g.current_user
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return api_error('消息内容不能为空')
message_id = generate_id()
now = datetime.utcnow()
# 构建消息文档
message_doc = {
'id': message_id,
'channel_id': channel_id,
'thread_id': data.get('threadId'),
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'type': 'text',
'mentioned_users': data.get('mentionedUsers', []),
'mentioned_stocks': data.get('mentionedStocks', []),
'mentioned_everyone': data.get('mentionedEveryone', False),
'reply_to': data.get('replyTo'),
'reactions': {},
'reaction_count': 0,
'is_pinned': False,
'is_edited': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_messages',
id=message_id,
document=message_doc,
refresh=True
)
# 更新频道最后消息时间和消息数
with get_db_engine().connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_id = :message_id,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {
'message_id': message_id,
'now': now,
'channel_id': channel_id
})
conn.commit()
# 转换字段名为 camelCase
response_data = {
'id': message_doc['id'],
'channelId': message_doc['channel_id'],
'threadId': message_doc['thread_id'],
'authorId': message_doc['author_id'],
'authorName': message_doc['author_name'],
'authorAvatar': message_doc['author_avatar'],
'content': message_doc['content'],
'type': message_doc['type'],
'mentionedUsers': message_doc['mentioned_users'],
'mentionedStocks': message_doc['mentioned_stocks'],
'replyTo': message_doc['reply_to'],
'reactions': message_doc['reactions'],
'reactionCount': message_doc['reaction_count'],
'isPinned': message_doc['is_pinned'],
'isEdited': message_doc['is_edited'],
'createdAt': message_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 发送消息失败: {e}")
return api_error(f'发送消息失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions', methods=['POST'])
@login_required
def add_reaction(message_id):
"""添加表情反应"""
try:
user = g.current_user
data = request.get_json()
emoji = data.get('emoji')
if not emoji:
return api_error('请选择表情')
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji not in reactions:
reactions[emoji] = []
if user['id'] not in reactions[emoji]:
reactions[emoji].append(user['id'])
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='添加成功')
except Exception as e:
return api_error(f'添加表情失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions/<emoji>', methods=['DELETE'])
@login_required
def remove_reaction(message_id, emoji):
"""移除表情反应"""
try:
user = g.current_user
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji in reactions and user['id'] in reactions[emoji]:
reactions[emoji].remove(user['id'])
if not reactions[emoji]:
del reactions[emoji]
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='移除成功')
except Exception as e:
return api_error(f'移除表情失败: {str(e)}', 500)
# ============================================================
# Forum 帖子相关 API
# ============================================================
@community_bp.route('/channels/<channel_id>/posts', methods=['POST'])
@login_required
def create_post(channel_id):
"""创建帖子"""
try:
user = g.current_user
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return api_error('标题不能为空')
if not content:
return api_error('内容不能为空')
post_id = generate_id()
now = datetime.utcnow()
# 构建帖子文档
post_doc = {
'id': post_id,
'channel_id': channel_id,
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'title': title,
'content': content,
'content_html': content, # 可以后续添加 Markdown 渲染
'tags': data.get('tags', []),
'stock_symbols': data.get('stockSymbols', []),
'is_pinned': False,
'is_locked': False,
'is_deleted': False,
'reply_count': 0,
'view_count': 0,
'like_count': 0,
'last_reply_at': None,
'last_reply_by': None,
'created_at': now.isoformat(),
'updated_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_posts',
id=post_id,
document=post_doc,
refresh=True
)
# 更新频道消息数
with get_db_engine().connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {'now': now, 'channel_id': channel_id})
conn.commit()
# 转换字段名
response_data = {
'id': post_doc['id'],
'channelId': post_doc['channel_id'],
'authorId': post_doc['author_id'],
'authorName': post_doc['author_name'],
'authorAvatar': post_doc['author_avatar'],
'title': post_doc['title'],
'content': post_doc['content'],
'tags': post_doc['tags'],
'stockSymbols': post_doc['stock_symbols'],
'isPinned': post_doc['is_pinned'],
'isLocked': post_doc['is_locked'],
'replyCount': post_doc['reply_count'],
'viewCount': post_doc['view_count'],
'likeCount': post_doc['like_count'],
'createdAt': post_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建帖子失败: {e}")
return api_error(f'创建帖子失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/like', methods=['POST'])
@login_required
def like_post(post_id):
"""点赞帖子"""
try:
# 更新 ES简单实现生产环境应该用单独的点赞表防止重复
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.like_count += 1',
'lang': 'painless'
},
refresh=True
)
return api_response(message='点赞成功')
except Exception as e:
return api_error(f'点赞失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/view', methods=['POST'])
def increment_view(post_id):
"""增加帖子浏览量"""
try:
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.view_count += 1',
'lang': 'painless'
}
)
return api_response(message='success')
except Exception as e:
# 浏览量统计失败不影响主流程
return api_response(message='success')
@community_bp.route('/posts/<post_id>/replies', methods=['POST'])
@login_required
def create_reply(post_id):
"""创建帖子回复"""
try:
user = g.current_user
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return api_error('回复内容不能为空')
reply_id = generate_id()
now = datetime.utcnow()
# 获取帖子信息
post_result = es_client.get(index='community_forum_posts', id=post_id)
post = post_result['_source']
# 构建回复文档
reply_doc = {
'id': reply_id,
'post_id': post_id,
'channel_id': post['channel_id'],
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'content_html': content,
'reply_to': data.get('replyTo'),
'reactions': {},
'like_count': 0,
'is_solution': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_replies',
id=reply_id,
document=reply_doc,
refresh=True
)
# 更新帖子的回复数和最后回复时间
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': '''
ctx._source.reply_count += 1;
ctx._source.last_reply_at = params.now;
ctx._source.last_reply_by = params.author_name;
''',
'lang': 'painless',
'params': {
'now': now.isoformat(),
'author_name': user['username']
}
},
refresh=True
)
# 转换字段名
response_data = {
'id': reply_doc['id'],
'postId': reply_doc['post_id'],
'channelId': reply_doc['channel_id'],
'authorId': reply_doc['author_id'],
'authorName': reply_doc['author_name'],
'authorAvatar': reply_doc['author_avatar'],
'content': reply_doc['content'],
'replyTo': reply_doc['reply_to'],
'likeCount': reply_doc['like_count'],
'createdAt': reply_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建回复失败: {e}")
return api_error(f'创建回复失败: {str(e)}', 500)
# ============================================================
# ES 代理接口(前端直接查询 ES
# ============================================================
@community_bp.route('/es/<index>/_search', methods=['POST'])
def es_search_proxy(index):
"""
ES 搜索代理
允许的索引community_messages, community_forum_posts, community_forum_replies
"""
allowed_indices = [
'community_messages',
'community_forum_posts',
'community_forum_replies',
'community_notifications'
]
if index not in allowed_indices:
return api_error('不允许访问该索引', 403)
try:
body = request.get_json()
result = es_client.search(index=index, body=body)
return jsonify(result)
except Exception as e:
print(f"[Community API] ES 搜索失败: {e}")
return api_error(f'搜索失败: {str(e)}', 500)
# ============================================================
# WebSocket 事件处理(需要在 app.py 中注册)
# ============================================================
def register_community_socketio(socketio):
"""
注册社区 WebSocket 事件
在 app.py 中调用register_community_socketio(socketio)
"""
from flask_socketio import join_room, leave_room, emit
@socketio.on('connect', namespace='/community')
def handle_connect():
print('[Community Socket] Client connected')
@socketio.on('disconnect', namespace='/community')
def handle_disconnect():
print('[Community Socket] Client disconnected')
@socketio.on('SUBSCRIBE_CHANNEL', namespace='/community')
def handle_subscribe(data):
channel_id = data.get('channelId')
if channel_id:
join_room(channel_id)
print(f'[Community Socket] Joined room: {channel_id}')
@socketio.on('UNSUBSCRIBE_CHANNEL', namespace='/community')
def handle_unsubscribe(data):
channel_id = data.get('channelId')
if channel_id:
leave_room(channel_id)
print(f'[Community Socket] Left room: {channel_id}')
@socketio.on('SEND_MESSAGE', namespace='/community')
def handle_send_message(data):
"""通过 WebSocket 发送消息(实时广播)"""
channel_id = data.get('channelId')
# 广播给频道内所有用户
emit('MESSAGE_CREATE', data, room=channel_id, include_self=False)
@socketio.on('START_TYPING', namespace='/community')
def handle_start_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
user_name = session.get('username', '匿名')
emit('TYPING_START', {
'channelId': channel_id,
'userId': user_id,
'userName': user_name
}, room=channel_id, include_self=False)
@socketio.on('STOP_TYPING', namespace='/community')
def handle_stop_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
emit('TYPING_STOP', {
'channelId': channel_id,
'userId': user_id
}, room=channel_id, include_self=False)
print('✅ Community WebSocket 事件已注册')