diff --git a/app.py b/app.py index e798b4d7..7e4b7f20 100755 --- a/app.py +++ b/app.py @@ -8,6 +8,7 @@ import uuid from functools import wraps import qrcode from flask_mail import Mail, Message +from flask_socketio import SocketIO, emit, join_room, leave_room import pytz import requests from celery import Celery @@ -40,6 +41,7 @@ from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentClo from sqlalchemy import text, desc, and_ import pandas as pd from decimal import Decimal +from apscheduler.schedulers.background import BackgroundScheduler # 交易日数据缓存 trading_days = [] @@ -242,6 +244,16 @@ db = SQLAlchemy(app) # 初始化邮件服务 mail = Mail(app) +# 初始化 Flask-SocketIO(用于实时事件推送) +socketio = SocketIO( + app, + cors_allowed_origins=["http://localhost:3000", "http://127.0.0.1:3000", "http://localhost:5173", + "https://valuefrontier.cn", "http://valuefrontier.cn"], + async_mode='gevent', + logger=True, + engineio_logger=False +) + @login_manager.user_loader def load_user(user_id): @@ -7443,6 +7455,199 @@ def add_event_comment(event_id): }), 500 +# ==================== WebSocket 事件处理器(实时事件推送) ==================== + +@socketio.on('connect') +def handle_connect(): + """客户端连接事件""" + print(f'[WebSocket] 客户端已连接: {request.sid}') + emit('connection_response', { + 'status': 'connected', + 'sid': request.sid, + 'message': '已连接到事件推送服务' + }) + + +@socketio.on('subscribe_events') +def handle_subscribe(data): + """ + 客户端订阅事件推送 + data: { + 'event_type': 'all' | 'policy' | 'market' | 'tech' | ..., + 'importance': 'all' | 'S' | 'A' | 'B' | 'C', + 'filters': {...} # 可选的其他筛选条件 + } + """ + try: + event_type = data.get('event_type', 'all') + importance = data.get('importance', 'all') + + # 加入对应的房间 + room_name = f"events_{event_type}" + join_room(room_name) + + print(f'[WebSocket] 客户端 {request.sid} 订阅了房间: {room_name}') + + emit('subscription_confirmed', { + 'success': True, + 'room': room_name, + 'event_type': event_type, + 'importance': importance, + 'message': f'已订阅 {event_type} 类型的事件推送' + }) + + except Exception as e: + print(f'[WebSocket] 订阅失败: {e}') + emit('subscription_error', { + 'success': False, + 'error': str(e) + }) + + +@socketio.on('unsubscribe_events') +def handle_unsubscribe(data): + """取消订阅事件推送""" + try: + event_type = data.get('event_type', 'all') + room_name = f"events_{event_type}" + leave_room(room_name) + + print(f'[WebSocket] 客户端 {request.sid} 取消订阅房间: {room_name}') + + emit('unsubscription_confirmed', { + 'success': True, + 'room': room_name, + 'message': f'已取消订阅 {event_type} 类型的事件推送' + }) + + except Exception as e: + print(f'[WebSocket] 取消订阅失败: {e}') + emit('unsubscription_error', { + 'success': False, + 'error': str(e) + }) + + +@socketio.on('disconnect') +def handle_disconnect(): + """客户端断开连接事件""" + print(f'[WebSocket] 客户端已断开: {request.sid}') + + +# ==================== WebSocket 辅助函数 ==================== + +def broadcast_new_event(event): + """ + 广播新事件到所有订阅的客户端 + 在创建新事件时调用此函数 + + Args: + event: Event 模型实例 + """ + try: + event_data = { + 'id': event.id, + 'title': event.title, + 'description': event.description, + 'event_type': event.event_type, + 'importance': event.importance, + 'status': event.status, + 'created_at': event.created_at.isoformat() if event.created_at else None, + 'hot_score': event.hot_score, + 'view_count': event.view_count, + 'related_avg_chg': event.related_avg_chg, + 'related_max_chg': event.related_max_chg, + 'keywords': event.keywords_list if hasattr(event, 'keywords_list') else event.keywords, + } + + # 发送到所有订阅者(all 房间) + socketio.emit('new_event', event_data, room='events_all', namespace='/') + + # 发送到特定类型订阅者 + if event.event_type: + room_name = f"events_{event.event_type}" + socketio.emit('new_event', event_data, room=room_name, namespace='/') + print(f'[WebSocket] 已推送新事件到房间: events_all, {room_name}') + else: + print(f'[WebSocket] 已推送新事件到房间: events_all') + + except Exception as e: + print(f'[WebSocket] 推送新事件失败: {e}') + + +# ==================== WebSocket 轮询机制(检测新事件) ==================== + +# 内存变量:记录上次检查的最大事件 ID +last_checked_event_id = 0 + +def poll_new_events(): + """ + 定期轮询数据库,检查是否有新事件 + 每 2 分钟执行一次 + """ + global last_checked_event_id + + try: + with app.app_context(): + # 查询比上次检查 ID 更大的所有新事件 + new_events = Event.query.filter( + Event.id > last_checked_event_id, + Event.status == 'active' + ).order_by(Event.id.asc()).all() + + if new_events: + print(f'[轮询] 发现 {len(new_events)} 个新事件') + + for event in new_events: + # 推送每个新事件 + broadcast_new_event(event) + # 更新最后检查的 ID + last_checked_event_id = event.id + + print(f'[轮询] 已推送新事件,最新 ID: {last_checked_event_id}') + + except Exception as e: + print(f'[轮询] 检查新事件时出错: {e}') + + +def initialize_event_polling(): + """ + 初始化事件轮询机制 + 在应用启动时调用 + """ + global last_checked_event_id + + try: + with app.app_context(): + # 获取当前数据库中最新的事件 ID,作为起点 + latest_event = Event.query.order_by(Event.id.desc()).first() + if latest_event: + last_checked_event_id = latest_event.id + print(f'[轮询] 初始化完成,起始事件 ID: {last_checked_event_id}') + else: + print('[轮询] 数据库中暂无事件') + + # 创建后台调度器 + scheduler = BackgroundScheduler() + # 每 2 分钟执行一次轮询 + scheduler.add_job( + func=poll_new_events, + trigger='interval', + minutes=2, + id='poll_new_events', + name='检查新事件并推送', + replace_existing=True + ) + scheduler.start() + print('[轮询] 调度器已启动,每 2 分钟检查一次新事件') + + except Exception as e: + print(f'[轮询] 初始化失败: {e}') + + +# ==================== 结束 WebSocket 部分 ==================== + + @app.route('/api/posts//like', methods=['POST']) @login_required def like_post(post_id): @@ -11581,4 +11786,8 @@ if __name__ == '__main__': except Exception as e: app.logger.error(f"数据库初始化失败: {e}") - app.run(host='0.0.0.0', port=5001, debug=False) \ No newline at end of file + # 初始化事件轮询机制(WebSocket 推送) + initialize_event_polling() + + # 使用 socketio.run 替代 app.run 以支持 WebSocket + socketio.run(app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True) \ No newline at end of file