C:/Program Files/Git/api/events加入socketio机制

This commit is contained in:
2025-10-21 14:43:18 +08:00
parent 5a3a3ad42b
commit 4b6d86e923

211
app.py
View File

@@ -8,6 +8,7 @@ import uuid
from functools import wraps from functools import wraps
import qrcode import qrcode
from flask_mail import Mail, Message from flask_mail import Mail, Message
from flask_socketio import SocketIO, emit, join_room, leave_room
import pytz import pytz
import requests import requests
from celery import Celery from celery import Celery
@@ -40,6 +41,7 @@ from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentClo
from sqlalchemy import text, desc, and_ from sqlalchemy import text, desc, and_
import pandas as pd import pandas as pd
from decimal import Decimal from decimal import Decimal
from apscheduler.schedulers.background import BackgroundScheduler
# 交易日数据缓存 # 交易日数据缓存
trading_days = [] trading_days = []
@@ -242,6 +244,16 @@ db = SQLAlchemy(app)
# 初始化邮件服务 # 初始化邮件服务
mail = Mail(app) mail = Mail(app)
# 初始化 Flask-SocketIO用于实时事件推送
socketio = SocketIO(
app,
cors_allowed_origins=["http://localhost:3000", "http://127.0.0.1:3000", "http://localhost:5173",
"https://valuefrontier.cn", "http://valuefrontier.cn"],
async_mode='gevent',
logger=True,
engineio_logger=False
)
@login_manager.user_loader @login_manager.user_loader
def load_user(user_id): def load_user(user_id):
@@ -7443,6 +7455,199 @@ def add_event_comment(event_id):
}), 500 }), 500
# ==================== WebSocket 事件处理器(实时事件推送) ====================
@socketio.on('connect')
def handle_connect():
"""客户端连接事件"""
print(f'[WebSocket] 客户端已连接: {request.sid}')
emit('connection_response', {
'status': 'connected',
'sid': request.sid,
'message': '已连接到事件推送服务'
})
@socketio.on('subscribe_events')
def handle_subscribe(data):
"""
客户端订阅事件推送
data: {
'event_type': 'all' | 'policy' | 'market' | 'tech' | ...,
'importance': 'all' | 'S' | 'A' | 'B' | 'C',
'filters': {...} # 可选的其他筛选条件
}
"""
try:
event_type = data.get('event_type', 'all')
importance = data.get('importance', 'all')
# 加入对应的房间
room_name = f"events_{event_type}"
join_room(room_name)
print(f'[WebSocket] 客户端 {request.sid} 订阅了房间: {room_name}')
emit('subscription_confirmed', {
'success': True,
'room': room_name,
'event_type': event_type,
'importance': importance,
'message': f'已订阅 {event_type} 类型的事件推送'
})
except Exception as e:
print(f'[WebSocket] 订阅失败: {e}')
emit('subscription_error', {
'success': False,
'error': str(e)
})
@socketio.on('unsubscribe_events')
def handle_unsubscribe(data):
"""取消订阅事件推送"""
try:
event_type = data.get('event_type', 'all')
room_name = f"events_{event_type}"
leave_room(room_name)
print(f'[WebSocket] 客户端 {request.sid} 取消订阅房间: {room_name}')
emit('unsubscription_confirmed', {
'success': True,
'room': room_name,
'message': f'已取消订阅 {event_type} 类型的事件推送'
})
except Exception as e:
print(f'[WebSocket] 取消订阅失败: {e}')
emit('unsubscription_error', {
'success': False,
'error': str(e)
})
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接事件"""
print(f'[WebSocket] 客户端已断开: {request.sid}')
# ==================== WebSocket 辅助函数 ====================
def broadcast_new_event(event):
"""
广播新事件到所有订阅的客户端
在创建新事件时调用此函数
Args:
event: Event 模型实例
"""
try:
event_data = {
'id': event.id,
'title': event.title,
'description': event.description,
'event_type': event.event_type,
'importance': event.importance,
'status': event.status,
'created_at': event.created_at.isoformat() if event.created_at else None,
'hot_score': event.hot_score,
'view_count': event.view_count,
'related_avg_chg': event.related_avg_chg,
'related_max_chg': event.related_max_chg,
'keywords': event.keywords_list if hasattr(event, 'keywords_list') else event.keywords,
}
# 发送到所有订阅者all 房间)
socketio.emit('new_event', event_data, room='events_all', namespace='/')
# 发送到特定类型订阅者
if event.event_type:
room_name = f"events_{event.event_type}"
socketio.emit('new_event', event_data, room=room_name, namespace='/')
print(f'[WebSocket] 已推送新事件到房间: events_all, {room_name}')
else:
print(f'[WebSocket] 已推送新事件到房间: events_all')
except Exception as e:
print(f'[WebSocket] 推送新事件失败: {e}')
# ==================== WebSocket 轮询机制(检测新事件) ====================
# 内存变量:记录上次检查的最大事件 ID
last_checked_event_id = 0
def poll_new_events():
"""
定期轮询数据库,检查是否有新事件
每 2 分钟执行一次
"""
global last_checked_event_id
try:
with app.app_context():
# 查询比上次检查 ID 更大的所有新事件
new_events = Event.query.filter(
Event.id > last_checked_event_id,
Event.status == 'active'
).order_by(Event.id.asc()).all()
if new_events:
print(f'[轮询] 发现 {len(new_events)} 个新事件')
for event in new_events:
# 推送每个新事件
broadcast_new_event(event)
# 更新最后检查的 ID
last_checked_event_id = event.id
print(f'[轮询] 已推送新事件,最新 ID: {last_checked_event_id}')
except Exception as e:
print(f'[轮询] 检查新事件时出错: {e}')
def initialize_event_polling():
"""
初始化事件轮询机制
在应用启动时调用
"""
global last_checked_event_id
try:
with app.app_context():
# 获取当前数据库中最新的事件 ID作为起点
latest_event = Event.query.order_by(Event.id.desc()).first()
if latest_event:
last_checked_event_id = latest_event.id
print(f'[轮询] 初始化完成,起始事件 ID: {last_checked_event_id}')
else:
print('[轮询] 数据库中暂无事件')
# 创建后台调度器
scheduler = BackgroundScheduler()
# 每 2 分钟执行一次轮询
scheduler.add_job(
func=poll_new_events,
trigger='interval',
minutes=2,
id='poll_new_events',
name='检查新事件并推送',
replace_existing=True
)
scheduler.start()
print('[轮询] 调度器已启动,每 2 分钟检查一次新事件')
except Exception as e:
print(f'[轮询] 初始化失败: {e}')
# ==================== 结束 WebSocket 部分 ====================
@app.route('/api/posts/<int:post_id>/like', methods=['POST']) @app.route('/api/posts/<int:post_id>/like', methods=['POST'])
@login_required @login_required
def like_post(post_id): def like_post(post_id):
@@ -11581,4 +11786,8 @@ if __name__ == '__main__':
except Exception as e: except Exception as e:
app.logger.error(f"数据库初始化失败: {e}") app.logger.error(f"数据库初始化失败: {e}")
app.run(host='0.0.0.0', port=5001, debug=False) # 初始化事件轮询机制WebSocket 推送)
initialize_event_polling()
# 使用 socketio.run 替代 app.run 以支持 WebSocket
socketio.run(app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True)