diff --git a/app.py b/app.py index e1107abc..a99ce982 100755 --- a/app.py +++ b/app.py @@ -444,11 +444,12 @@ _async_mode = _detect_async_mode() print(f"📡 Flask-SocketIO async_mode: {_async_mode}") # Redis 消息队列 URL(支持多 Worker 之间的消息同步) -# 注意:如果只用单 Worker,可以不配置 message_queue -SOCKETIO_MESSAGE_QUEUE = os.environ.get('SOCKETIO_REDIS_URL', 'redis://localhost:6379/2') +# 使用 127.0.0.1 而非 localhost,避免 eventlet DNS 问题 +SOCKETIO_MESSAGE_QUEUE = os.environ.get('SOCKETIO_REDIS_URL', f'redis://{_REDIS_HOST}:{_REDIS_PORT}/2') -# 检测是否需要启用消息队列(多 Worker 模式需要) -_use_message_queue = os.environ.get('SOCKETIO_USE_QUEUE', 'false').lower() == 'true' +# 检测是否需要启用消息队列 +# 默认启用(多 Worker 模式需要,单 Worker 模式也兼容) +_use_message_queue = os.environ.get('SOCKETIO_USE_QUEUE', 'true').lower() == 'true' socketio = SocketIO( app, @@ -10404,36 +10405,73 @@ def broadcast_new_event(event): # ==================== WebSocket 轮询机制(检测新事件) ==================== -# 内存变量:记录近24小时内已知的事件ID集合和最大ID -known_event_ids_in_24h = set() # 近24小时内已知的所有事件ID -last_max_event_id = 0 # 已知的最大事件ID +# Redis Key 用于多 Worker 协调 +REDIS_KEY_LAST_MAX_EVENT_ID = 'vf:event_polling:last_max_id' +REDIS_KEY_POLLING_LOCK = 'vf:event_polling:lock' + +# 本地缓存(减少 Redis 查询) +_local_last_max_event_id = 0 +_polling_initialized = False + + +def _get_last_max_event_id(): + """从 Redis 获取最大事件 ID""" + try: + val = redis_client.get(REDIS_KEY_LAST_MAX_EVENT_ID) + return int(val) if val else 0 + except Exception as e: + print(f'[轮询 WARN] 读取 Redis 失败: {e}') + return _local_last_max_event_id + + +def _set_last_max_event_id(new_id): + """设置最大事件 ID 到 Redis""" + global _local_last_max_event_id + try: + redis_client.set(REDIS_KEY_LAST_MAX_EVENT_ID, str(new_id)) + _local_last_max_event_id = new_id + except Exception as e: + print(f'[轮询 WARN] 写入 Redis 失败: {e}') + _local_last_max_event_id = new_id + def poll_new_events(): """ 定期轮询数据库,检查是否有新事件 每 30 秒执行一次 - 新的设计思路(修复 created_at 不是入库时间的问题): - 1. 查询近24小时内的所有活跃事件(按 created_at,因为这是事件发生时间) - 2. 通过对比事件ID(自增ID)来判断是否为新插入的事件 - 3. 推送 ID > last_max_event_id 的事件 - 4. 更新已知事件ID集合和最大ID + 多 Worker 协调机制: + 1. 使用 Redis 分布式锁,确保同一时刻只有一个 Worker 执行轮询 + 2. 使用 Redis 存储 last_max_event_id,所有 Worker 共享状态 + 3. 通过 Redis 消息队列广播到所有 Worker 的客户端 """ - global known_event_ids_in_24h, last_max_event_id + import os try: + # 尝试获取分布式锁(30秒超时,防止死锁) + lock_acquired = redis_client.set( + REDIS_KEY_POLLING_LOCK, + os.getpid(), + nx=True, # 只在不存在时设置 + ex=30 # 30秒后自动过期 + ) + + if not lock_acquired: + # 其他 Worker 正在轮询,跳过本次 + return + with app.app_context(): from datetime import datetime, timedelta current_time = datetime.now() - print(f'\n[轮询 DEBUG] ========== 开始轮询 ==========') - print(f'[轮询 DEBUG] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}') - print(f'[轮询 DEBUG] 已知事件ID数量: {len(known_event_ids_in_24h)}') - print(f'[轮询 DEBUG] 当前最大事件ID: {last_max_event_id}') + last_max_event_id = _get_last_max_event_id() + + print(f'\n[轮询] ========== 开始轮询 (PID: {os.getpid()}) ==========') + print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}') + print(f'[轮询] 当前最大事件ID: {last_max_event_id}') # 查询近24小时内的所有活跃事件(按事件发生时间 created_at) time_24h_ago = current_time - timedelta(hours=24) - print(f'[轮询 DEBUG] 查询时间范围: 近24小时({time_24h_ago.strftime("%Y-%m-%d %H:%M:%S")} ~ 现在)') # 查询所有近24小时内的活跃事件 events_in_24h = Event.query.filter( @@ -10441,7 +10479,7 @@ def poll_new_events(): Event.status == 'active' ).order_by(Event.id.asc()).all() - print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(events_in_24h)} 个近24小时内的事件') + print(f'[轮询] 数据库查询: 找到 {len(events_in_24h)} 个近24小时内的事件') # 找出新插入的事件(ID > last_max_event_id) new_events = [ @@ -10449,7 +10487,7 @@ def poll_new_events(): if event.id > last_max_event_id ] - print(f'[轮询 DEBUG] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个') + print(f'[轮询] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个') if new_events: print(f'[轮询] 发现 {len(new_events)} 个新事件') @@ -10459,68 +10497,65 @@ def poll_new_events(): # 检查事件是否有关联股票(只推送有关联股票的事件) related_stocks_count = event.related_stocks.count() - print(f'[轮询 DEBUG] 新事件详情:') - print(f'[轮询 DEBUG] - ID: {event.id}') - print(f'[轮询 DEBUG] - 标题: {event.title}') - print(f'[轮询 DEBUG] - 事件发生时间(created_at): {event.created_at}') - print(f'[轮询 DEBUG] - 事件类型: {event.event_type}') - print(f'[轮询 DEBUG] - 关联股票数量: {related_stocks_count}') + print(f'[轮询] 事件 ID={event.id}: {event.title} (关联股票: {related_stocks_count})') # 只推送有关联股票的事件 if related_stocks_count > 0: - print(f'[轮询 DEBUG] 准备推送事件 ID={event.id}(有 {related_stocks_count} 个关联股票)') broadcast_new_event(event) pushed_count += 1 - print(f'[轮询] ✓ 已推送事件 ID={event.id}, 标题={event.title}') + print(f'[轮询] ✓ 已推送事件 ID={event.id}') else: - print(f'[轮询 DEBUG] 跳过事件 ID={event.id}(暂无关联股票)') + print(f'[轮询] - 跳过(暂无关联股票)') print(f'[轮询] 本轮共推送 {pushed_count}/{len(new_events)} 个事件') - # 更新已知事件ID集合(所有近24小时内的事件ID) - known_event_ids_in_24h = set(event.id for event in events_in_24h) - # 更新最大事件ID new_max_id = max(event.id for event in events_in_24h) - print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {new_max_id}') - last_max_event_id = new_max_id - - print(f'[轮询 DEBUG] 更新后已知事件ID数量: {len(known_event_ids_in_24h)}') + _set_last_max_event_id(new_max_id) + print(f'[轮询] 更新最大事件ID: {last_max_event_id} -> {new_max_id}') else: - print(f'[轮询 DEBUG] 没有新事件需要推送') - - # 即使没有新事件,也要更新已知事件集合(清理超过24小时的) + # 即使没有新事件,也要更新最大ID(防止状态不同步) if events_in_24h: - known_event_ids_in_24h = set(event.id for event in events_in_24h) current_max_id = max(event.id for event in events_in_24h) if current_max_id != last_max_event_id: - print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {current_max_id}') - last_max_event_id = current_max_id + _set_last_max_event_id(current_max_id) - print(f'[轮询 DEBUG] ========== 轮询结束 ==========\n') + print(f'[轮询] ========== 轮询结束 ==========\n') except Exception as e: print(f'[轮询 ERROR] 检查新事件时出错: {e}') import traceback traceback.print_exc() + finally: + # 释放锁 + try: + redis_client.delete(REDIS_KEY_POLLING_LOCK) + except: + pass def initialize_event_polling(): """ 初始化事件轮询机制 - 在应用启动时调用 + 在应用启动时调用(支持 gunicorn 多 Worker) """ - global known_event_ids_in_24h, last_max_event_id + global _polling_initialized + + # 防止重复初始化 + if _polling_initialized: + print('[轮询] 已经初始化过,跳过') + return try: from datetime import datetime, timedelta + import os with app.app_context(): current_time = datetime.now() time_24h_ago = current_time - timedelta(hours=24) - print(f'\n[轮询] ========== 初始化事件轮询 ==========') + print(f'\n[轮询] ========== 初始化事件轮询 (PID: {os.getpid()}) ==========') print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}') # 查询近24小时内的所有活跃事件 @@ -10529,24 +10564,22 @@ def initialize_event_polling(): Event.status == 'active' ).order_by(Event.id.asc()).all() - # 初始化已知事件ID集合 - known_event_ids_in_24h = set(event.id for event in events_in_24h) - - # 初始化最大事件ID + # 初始化最大事件ID(只有当 Redis 中没有值时才设置) + current_redis_max = _get_last_max_event_id() if events_in_24h: - last_max_event_id = max(event.id for event in events_in_24h) + db_max_id = max(event.id for event in events_in_24h) + if db_max_id > current_redis_max: + _set_last_max_event_id(db_max_id) + print(f'[轮询] 初始化最大事件ID: {db_max_id}') + else: + print(f'[轮询] 使用 Redis 中的最大事件ID: {current_redis_max}') print(f'[轮询] 近24小时内共有 {len(events_in_24h)} 个活跃事件') - print(f'[轮询] 初始最大事件ID: {last_max_event_id}') - print(f'[轮询] 事件ID范围: {min(event.id for event in events_in_24h)} ~ {last_max_event_id}') else: - last_max_event_id = 0 print(f'[轮询] 近24小时内没有活跃事件') - print(f'[轮询] 初始最大事件ID: 0') # 统计数据库中的事件总数 total_events = Event.query.filter_by(status='active').count() print(f'[轮询] 数据库中共有 {total_events} 个活跃事件(所有时间)') - print(f'[轮询] 只会推送 ID > {last_max_event_id} 的新事件') print(f'[轮询] ========== 初始化完成 ==========\n') # 创建后台调度器 @@ -10561,10 +10594,38 @@ def initialize_event_polling(): replace_existing=True ) scheduler.start() - print('[轮询] 调度器已启动,每 30 秒检查一次新事件') + print(f'[轮询] 调度器已启动 (PID: {os.getpid()}),每 30 秒检查一次新事件') + + _polling_initialized = True except Exception as e: print(f'[轮询] 初始化失败: {e}') + import traceback + traceback.print_exc() + + +# ==================== Gunicorn 兼容:自动初始化轮询 ==================== + +def _auto_init_polling(): + """ + 自动初始化事件轮询(兼容 gunicorn) + 使用延迟初始化,在第一个请求到来时初始化 + """ + global _polling_initialized + if not _polling_initialized: + try: + initialize_event_polling() + except Exception as e: + print(f'[轮询] 自动初始化失败: {e}') + + +# 注册 before_request 钩子,确保 gunicorn 启动后也能初始化轮询 +@app.before_request +def ensure_polling_initialized(): + """确保轮询机制已初始化(只执行一次)""" + global _polling_initialized + if not _polling_initialized: + _auto_init_polling() # ==================== 结束 WebSocket 部分 ====================