个股论坛重做

This commit is contained in:
2026-01-06 08:13:01 +08:00
parent 11db27d58d
commit 12bf4c2f87
27 changed files with 5796 additions and 2 deletions

759
community_api.py Normal file
View File

@@ -0,0 +1,759 @@
# -*- coding: utf-8 -*-
"""
股票社区 API - Discord 风格
包含:频道、消息、帖子、回复、表情反应等接口
"""
import uuid
from datetime import datetime
from functools import wraps
from flask import Blueprint, request, jsonify, session, g
from elasticsearch import Elasticsearch
from sqlalchemy import create_engine, text
# ============================================================
# Blueprint 和数据库连接
# ============================================================
community_bp = Blueprint('community', __name__, url_prefix='/api/community')
# ES 客户端(与 app.py 共享配置)
es_client = Elasticsearch(
hosts=["http://222.128.1.157:19200"],
request_timeout=30,
max_retries=3,
retry_on_timeout=True
)
# MySQL 连接(与 app.py 共享配置)
DATABASE_URL = "mysql+pymysql://root:wangzhe66@222.128.1.157:3307/stock_analysis?charset=utf8mb4"
engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_recycle=3600)
# ============================================================
# 工具函数
# ============================================================
def generate_id():
"""生成唯一 ID"""
return str(uuid.uuid4()).replace('-', '')[:16]
def get_current_user():
"""获取当前登录用户"""
user_id = session.get('user_id')
if not user_id:
return None
return {
'id': str(user_id),
'username': session.get('username', '匿名用户'),
'avatar': session.get('avatar', ''),
}
def login_required(f):
"""登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'code': 401, 'message': '请先登录'}), 401
g.current_user = user
return f(*args, **kwargs)
return decorated_function
def api_response(data=None, message='success', code=200):
"""统一 API 响应格式"""
return jsonify({
'code': code,
'message': message,
'data': data
})
def api_error(message, code=400):
"""API 错误响应"""
return jsonify({
'code': code,
'message': message
}), code if code >= 400 else 400
# ============================================================
# 频道相关 API
# ============================================================
@community_bp.route('/channels', methods=['GET'])
def get_channels():
"""
获取频道列表(按分类组织)
返回格式:[{ id, name, icon, channels: [...] }, ...]
"""
try:
with engine.connect() as conn:
# 查询分类
categories_sql = text("""
SELECT id, name, icon, position, is_collapsible, is_system
FROM community_categories
ORDER BY position
""")
categories_result = conn.execute(categories_sql).fetchall()
# 查询频道
channels_sql = text("""
SELECT
c.id, c.category_id, c.name, c.type, c.topic, c.position,
c.concept_code, c.slow_mode, c.is_readonly, c.is_system,
c.subscriber_count, c.message_count, c.last_message_at,
cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.is_visible = 1
ORDER BY c.position
""")
channels_result = conn.execute(channels_sql).fetchall()
# 组装数据
channels_by_category = {}
for ch in channels_result:
cat_id = ch.category_id
if cat_id not in channels_by_category:
channels_by_category[cat_id] = []
channels_by_category[cat_id].append({
'id': ch.id,
'categoryId': ch.category_id,
'name': ch.name,
'type': ch.type,
'topic': ch.topic,
'position': ch.position,
'conceptCode': ch.concept_code,
'slowMode': ch.slow_mode or 0,
'isReadonly': bool(ch.is_readonly),
'isSystem': bool(ch.is_system),
'subscriberCount': ch.subscriber_count or 0,
'messageCount': ch.message_count or 0,
'lastMessageAt': ch.last_message_at.isoformat() if ch.last_message_at else None,
'conceptName': ch.concept_name,
'stockCount': ch.stock_count,
'isHot': bool(ch.is_hot) if ch.is_hot is not None else False,
})
result = []
for cat in categories_result:
result.append({
'id': cat.id,
'name': cat.name,
'icon': cat.icon or '',
'position': cat.position,
'isCollapsible': bool(cat.is_collapsible),
'isSystem': bool(cat.is_system),
'channels': channels_by_category.get(cat.id, [])
})
return api_response(result)
except Exception as e:
print(f"[Community API] 获取频道列表失败: {e}")
return api_error(f'获取频道列表失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>', methods=['GET'])
def get_channel(channel_id):
"""获取单个频道详情"""
try:
with engine.connect() as conn:
sql = text("""
SELECT
c.*, cc.concept_name, cc.stock_count, cc.is_hot
FROM community_channels c
LEFT JOIN community_concept_channels cc ON c.concept_code = cc.concept_code
WHERE c.id = :channel_id
""")
result = conn.execute(sql, {'channel_id': channel_id}).fetchone()
if not result:
return api_error('频道不存在', 404)
return api_response({
'id': result.id,
'categoryId': result.category_id,
'name': result.name,
'type': result.type,
'topic': result.topic,
'conceptCode': result.concept_code,
'slowMode': result.slow_mode or 0,
'isReadonly': bool(result.is_readonly),
'subscriberCount': result.subscriber_count or 0,
'messageCount': result.message_count or 0,
'conceptName': result.concept_name,
'stockCount': result.stock_count,
'isHot': bool(result.is_hot) if result.is_hot is not None else False,
})
except Exception as e:
return api_error(f'获取频道失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/subscribe', methods=['POST'])
@login_required
def subscribe_channel(channel_id):
"""订阅频道"""
try:
user = g.current_user
subscription_id = generate_id()
with engine.connect() as conn:
# 检查是否已订阅
check_sql = text("""
SELECT id FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
existing = conn.execute(check_sql, {
'user_id': user['id'],
'channel_id': channel_id
}).fetchone()
if existing:
return api_response(message='已订阅')
# 创建订阅
insert_sql = text("""
INSERT INTO community_subscriptions (id, user_id, channel_id, notification_level)
VALUES (:id, :user_id, :channel_id, 'all')
""")
conn.execute(insert_sql, {
'id': subscription_id,
'user_id': user['id'],
'channel_id': channel_id
})
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = subscriber_count + 1
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='订阅成功')
except Exception as e:
return api_error(f'订阅失败: {str(e)}', 500)
@community_bp.route('/channels/<channel_id>/unsubscribe', methods=['POST'])
@login_required
def unsubscribe_channel(channel_id):
"""取消订阅频道"""
try:
user = g.current_user
with engine.connect() as conn:
# 删除订阅
delete_sql = text("""
DELETE FROM community_subscriptions
WHERE user_id = :user_id AND channel_id = :channel_id
""")
result = conn.execute(delete_sql, {
'user_id': user['id'],
'channel_id': channel_id
})
if result.rowcount > 0:
# 更新订阅数
update_sql = text("""
UPDATE community_channels
SET subscriber_count = GREATEST(subscriber_count - 1, 0)
WHERE id = :channel_id
""")
conn.execute(update_sql, {'channel_id': channel_id})
conn.commit()
return api_response(message='取消订阅成功')
except Exception as e:
return api_error(f'取消订阅失败: {str(e)}', 500)
# ============================================================
# 消息相关 API即时聊天
# ============================================================
@community_bp.route('/channels/<channel_id>/messages', methods=['POST'])
@login_required
def send_message(channel_id):
"""发送消息"""
try:
user = g.current_user
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return api_error('消息内容不能为空')
message_id = generate_id()
now = datetime.utcnow()
# 构建消息文档
message_doc = {
'id': message_id,
'channel_id': channel_id,
'thread_id': data.get('threadId'),
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'type': 'text',
'mentioned_users': data.get('mentionedUsers', []),
'mentioned_stocks': data.get('mentionedStocks', []),
'mentioned_everyone': data.get('mentionedEveryone', False),
'reply_to': data.get('replyTo'),
'reactions': {},
'reaction_count': 0,
'is_pinned': False,
'is_edited': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_messages',
id=message_id,
document=message_doc,
refresh=True
)
# 更新频道最后消息时间和消息数
with engine.connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_id = :message_id,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {
'message_id': message_id,
'now': now,
'channel_id': channel_id
})
conn.commit()
# 转换字段名为 camelCase
response_data = {
'id': message_doc['id'],
'channelId': message_doc['channel_id'],
'threadId': message_doc['thread_id'],
'authorId': message_doc['author_id'],
'authorName': message_doc['author_name'],
'authorAvatar': message_doc['author_avatar'],
'content': message_doc['content'],
'type': message_doc['type'],
'mentionedUsers': message_doc['mentioned_users'],
'mentionedStocks': message_doc['mentioned_stocks'],
'replyTo': message_doc['reply_to'],
'reactions': message_doc['reactions'],
'reactionCount': message_doc['reaction_count'],
'isPinned': message_doc['is_pinned'],
'isEdited': message_doc['is_edited'],
'createdAt': message_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 发送消息失败: {e}")
return api_error(f'发送消息失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions', methods=['POST'])
@login_required
def add_reaction(message_id):
"""添加表情反应"""
try:
user = g.current_user
data = request.get_json()
emoji = data.get('emoji')
if not emoji:
return api_error('请选择表情')
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji not in reactions:
reactions[emoji] = []
if user['id'] not in reactions[emoji]:
reactions[emoji].append(user['id'])
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='添加成功')
except Exception as e:
return api_error(f'添加表情失败: {str(e)}', 500)
@community_bp.route('/messages/<message_id>/reactions/<emoji>', methods=['DELETE'])
@login_required
def remove_reaction(message_id, emoji):
"""移除表情反应"""
try:
user = g.current_user
# 获取当前消息
result = es_client.get(index='community_messages', id=message_id)
message = result['_source']
# 更新 reactions
reactions = message.get('reactions', {})
if emoji in reactions and user['id'] in reactions[emoji]:
reactions[emoji].remove(user['id'])
if not reactions[emoji]:
del reactions[emoji]
# 计算总数
reaction_count = sum(len(users) for users in reactions.values())
# 更新 ES
es_client.update(
index='community_messages',
id=message_id,
doc={'reactions': reactions, 'reaction_count': reaction_count},
refresh=True
)
return api_response(message='移除成功')
except Exception as e:
return api_error(f'移除表情失败: {str(e)}', 500)
# ============================================================
# Forum 帖子相关 API
# ============================================================
@community_bp.route('/channels/<channel_id>/posts', methods=['POST'])
@login_required
def create_post(channel_id):
"""创建帖子"""
try:
user = g.current_user
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return api_error('标题不能为空')
if not content:
return api_error('内容不能为空')
post_id = generate_id()
now = datetime.utcnow()
# 构建帖子文档
post_doc = {
'id': post_id,
'channel_id': channel_id,
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'title': title,
'content': content,
'content_html': content, # 可以后续添加 Markdown 渲染
'tags': data.get('tags', []),
'stock_symbols': data.get('stockSymbols', []),
'is_pinned': False,
'is_locked': False,
'is_deleted': False,
'reply_count': 0,
'view_count': 0,
'like_count': 0,
'last_reply_at': None,
'last_reply_by': None,
'created_at': now.isoformat(),
'updated_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_posts',
id=post_id,
document=post_doc,
refresh=True
)
# 更新频道消息数
with engine.connect() as conn:
update_sql = text("""
UPDATE community_channels
SET message_count = message_count + 1,
last_message_at = :now
WHERE id = :channel_id
""")
conn.execute(update_sql, {'now': now, 'channel_id': channel_id})
conn.commit()
# 转换字段名
response_data = {
'id': post_doc['id'],
'channelId': post_doc['channel_id'],
'authorId': post_doc['author_id'],
'authorName': post_doc['author_name'],
'authorAvatar': post_doc['author_avatar'],
'title': post_doc['title'],
'content': post_doc['content'],
'tags': post_doc['tags'],
'stockSymbols': post_doc['stock_symbols'],
'isPinned': post_doc['is_pinned'],
'isLocked': post_doc['is_locked'],
'replyCount': post_doc['reply_count'],
'viewCount': post_doc['view_count'],
'likeCount': post_doc['like_count'],
'createdAt': post_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建帖子失败: {e}")
return api_error(f'创建帖子失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/like', methods=['POST'])
@login_required
def like_post(post_id):
"""点赞帖子"""
try:
# 更新 ES简单实现生产环境应该用单独的点赞表防止重复
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.like_count += 1',
'lang': 'painless'
},
refresh=True
)
return api_response(message='点赞成功')
except Exception as e:
return api_error(f'点赞失败: {str(e)}', 500)
@community_bp.route('/posts/<post_id>/view', methods=['POST'])
def increment_view(post_id):
"""增加帖子浏览量"""
try:
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': 'ctx._source.view_count += 1',
'lang': 'painless'
}
)
return api_response(message='success')
except Exception as e:
# 浏览量统计失败不影响主流程
return api_response(message='success')
@community_bp.route('/posts/<post_id>/replies', methods=['POST'])
@login_required
def create_reply(post_id):
"""创建帖子回复"""
try:
user = g.current_user
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return api_error('回复内容不能为空')
reply_id = generate_id()
now = datetime.utcnow()
# 获取帖子信息
post_result = es_client.get(index='community_forum_posts', id=post_id)
post = post_result['_source']
# 构建回复文档
reply_doc = {
'id': reply_id,
'post_id': post_id,
'channel_id': post['channel_id'],
'author_id': user['id'],
'author_name': user['username'],
'author_avatar': user.get('avatar', ''),
'content': content,
'content_html': content,
'reply_to': data.get('replyTo'),
'reactions': {},
'like_count': 0,
'is_solution': False,
'is_deleted': False,
'created_at': now.isoformat(),
}
# 写入 ES
es_client.index(
index='community_forum_replies',
id=reply_id,
document=reply_doc,
refresh=True
)
# 更新帖子的回复数和最后回复时间
es_client.update(
index='community_forum_posts',
id=post_id,
script={
'source': '''
ctx._source.reply_count += 1;
ctx._source.last_reply_at = params.now;
ctx._source.last_reply_by = params.author_name;
''',
'lang': 'painless',
'params': {
'now': now.isoformat(),
'author_name': user['username']
}
},
refresh=True
)
# 转换字段名
response_data = {
'id': reply_doc['id'],
'postId': reply_doc['post_id'],
'channelId': reply_doc['channel_id'],
'authorId': reply_doc['author_id'],
'authorName': reply_doc['author_name'],
'authorAvatar': reply_doc['author_avatar'],
'content': reply_doc['content'],
'replyTo': reply_doc['reply_to'],
'likeCount': reply_doc['like_count'],
'createdAt': reply_doc['created_at'],
}
return api_response(response_data)
except Exception as e:
print(f"[Community API] 创建回复失败: {e}")
return api_error(f'创建回复失败: {str(e)}', 500)
# ============================================================
# ES 代理接口(前端直接查询 ES
# ============================================================
@community_bp.route('/es/<index>/_search', methods=['POST'])
def es_search_proxy(index):
"""
ES 搜索代理
允许的索引community_messages, community_forum_posts, community_forum_replies
"""
allowed_indices = [
'community_messages',
'community_forum_posts',
'community_forum_replies',
'community_notifications'
]
if index not in allowed_indices:
return api_error('不允许访问该索引', 403)
try:
body = request.get_json()
result = es_client.search(index=index, body=body)
return jsonify(result)
except Exception as e:
print(f"[Community API] ES 搜索失败: {e}")
return api_error(f'搜索失败: {str(e)}', 500)
# ============================================================
# WebSocket 事件处理(需要在 app.py 中注册)
# ============================================================
def register_community_socketio(socketio):
"""
注册社区 WebSocket 事件
在 app.py 中调用register_community_socketio(socketio)
"""
from flask_socketio import join_room, leave_room, emit
@socketio.on('connect', namespace='/community')
def handle_connect():
print('[Community Socket] Client connected')
@socketio.on('disconnect', namespace='/community')
def handle_disconnect():
print('[Community Socket] Client disconnected')
@socketio.on('SUBSCRIBE_CHANNEL', namespace='/community')
def handle_subscribe(data):
channel_id = data.get('channelId')
if channel_id:
join_room(channel_id)
print(f'[Community Socket] Joined room: {channel_id}')
@socketio.on('UNSUBSCRIBE_CHANNEL', namespace='/community')
def handle_unsubscribe(data):
channel_id = data.get('channelId')
if channel_id:
leave_room(channel_id)
print(f'[Community Socket] Left room: {channel_id}')
@socketio.on('SEND_MESSAGE', namespace='/community')
def handle_send_message(data):
"""通过 WebSocket 发送消息(实时广播)"""
channel_id = data.get('channelId')
# 广播给频道内所有用户
emit('MESSAGE_CREATE', data, room=channel_id, include_self=False)
@socketio.on('START_TYPING', namespace='/community')
def handle_start_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
user_name = session.get('username', '匿名')
emit('TYPING_START', {
'channelId': channel_id,
'userId': user_id,
'userName': user_name
}, room=channel_id, include_self=False)
@socketio.on('STOP_TYPING', namespace='/community')
def handle_stop_typing(data):
channel_id = data.get('channelId')
user_id = session.get('user_id')
emit('TYPING_STOP', {
'channelId': channel_id,
'userId': user_id
}, room=channel_id, include_self=False)
print('✅ Community WebSocket 事件已注册')