diff --git a/app.py b/app.py index edc4e934..73e2006e 100755 --- a/app.py +++ b/app.py @@ -7642,23 +7642,22 @@ def broadcast_new_event(event): # ==================== WebSocket 轮询机制(检测新事件) ==================== -# 内存变量:记录上次检查的时间戳和已推送的事件 ID 集合 -last_checked_time = None -pushed_event_ids = set() # 已推送的事件 ID 集合,防止重复推送 -MAX_PUSHED_IDS_SIZE = 1000 # 已推送 ID 集合的最大容量 +# 内存变量:记录近24小时内已知的事件ID集合和最大ID +known_event_ids_in_24h = set() # 近24小时内已知的所有事件ID +last_max_event_id = 0 # 已知的最大事件ID def poll_new_events(): """ 定期轮询数据库,检查是否有新事件 每 30 秒执行一次 - 设计思路: - 1. 使用时间戳查询(created_at),而不是 ID - 2. 维护已推送事件 ID 集合,避免重复推送 - 3. 使用重叠时间窗口(向前多查60秒),捕获延迟写入的事件 - 4. 定期清理已推送集合,防止内存泄漏 + 新的设计思路(修复 created_at 不是入库时间的问题): + 1. 查询近24小时内的所有活跃事件(按 created_at,因为这是事件发生时间) + 2. 通过对比事件ID(自增ID)来判断是否为新插入的事件 + 3. 推送 ID > last_max_event_id 的事件 + 4. 更新已知事件ID集合和最大ID """ - global last_checked_time, pushed_event_ids + global known_event_ids_in_24h, last_max_event_id try: with app.app_context(): @@ -7667,63 +7666,64 @@ def poll_new_events(): current_time = datetime.now() print(f'\n[轮询 DEBUG] ========== 开始轮询 ==========') print(f'[轮询 DEBUG] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}') - print(f'[轮询 DEBUG] 上次检查时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S") if last_checked_time else "None"}') - print(f'[轮询 DEBUG] 已推送事件数量: {len(pushed_event_ids)}') + print(f'[轮询 DEBUG] 已知事件ID数量: {len(known_event_ids_in_24h)}') + print(f'[轮询 DEBUG] 当前最大事件ID: {last_max_event_id}') - # 如果是第一次运行,只查询最近 30 秒的事件 - if last_checked_time is None: - query_start_time = current_time - timedelta(seconds=30) - else: - # 向前多查 60 秒(重叠窗口),防止漏掉延迟写入的事件 - query_start_time = last_checked_time - timedelta(seconds=60) + # 查询近24小时内的所有活跃事件(按事件发生时间 created_at) + time_24h_ago = current_time - timedelta(hours=24) + print(f'[轮询 DEBUG] 查询时间范围: 近24小时({time_24h_ago.strftime("%Y-%m-%d %H:%M:%S")} ~ 现在)') - print(f'[轮询 DEBUG] 查询时间范围: {query_start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {current_time.strftime("%Y-%m-%d %H:%M:%S")}') - - # 查询时间范围内的新事件 - new_events = Event.query.filter( - Event.created_at >= query_start_time, - Event.created_at <= current_time, + # 查询所有近24小时内的活跃事件 + events_in_24h = Event.query.filter( + Event.created_at >= time_24h_ago, Event.status == 'active' - ).order_by(Event.created_at.asc()).all() + ).order_by(Event.id.asc()).all() - print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(new_events)} 个事件') - if new_events: - for evt in new_events: - print(f'[轮询 DEBUG] - ID={evt.id}, 标题={evt.title}, 创建时间={evt.created_at}, 已推送={evt.id in pushed_event_ids}') + print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(events_in_24h)} 个近24小时内的事件') - # 过滤掉已经推送过的事件 - unpushed_events = [ - event for event in new_events - if event.id not in pushed_event_ids + # 找出新插入的事件(ID > last_max_event_id) + new_events = [ + event for event in events_in_24h + if event.id > last_max_event_id ] - print(f'[轮询 DEBUG] 过滤后未推送事件: {len(unpushed_events)} 个') + print(f'[轮询 DEBUG] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个') - if unpushed_events: - print(f'[轮询] 发现 {len(unpushed_events)} 个新事件(查询到 {len(new_events)} 个,已过滤 {len(new_events) - len(unpushed_events)} 个重复)') + if new_events: + print(f'[轮询] 发现 {len(new_events)} 个新事件') + + for event in new_events: + print(f'[轮询 DEBUG] 新事件详情:') + print(f'[轮询 DEBUG] - ID: {event.id}') + print(f'[轮询 DEBUG] - 标题: {event.title}') + print(f'[轮询 DEBUG] - 事件发生时间(created_at): {event.created_at}') + print(f'[轮询 DEBUG] - 事件类型: {event.event_type}') - for event in unpushed_events: - print(f'[轮询 DEBUG] 准备推送事件: ID={event.id}, 标题={event.title}') # 推送新事件 + print(f'[轮询 DEBUG] 准备推送事件 ID={event.id}') broadcast_new_event(event) - # 记录已推送 - pushed_event_ids.add(event.id) - print(f'[轮询] 推送事件 ID={event.id}, 标题={event.title}') + print(f'[轮询] ✓ 已推送事件 ID={event.id}, 标题={event.title}') - # 更新检查时间 - last_checked_time = current_time + # 更新已知事件ID集合(所有近24小时内的事件ID) + known_event_ids_in_24h = set(event.id for event in events_in_24h) - # 清理已推送集合(防止无限增长) - if len(pushed_event_ids) > MAX_PUSHED_IDS_SIZE: - # 只保留最新的一半 - sorted_ids = sorted(pushed_event_ids) - pushed_event_ids = set(sorted_ids[-MAX_PUSHED_IDS_SIZE//2:]) - print(f'[轮询] 已清理推送记录,当前保留 {len(pushed_event_ids)} 个') + # 更新最大事件ID + new_max_id = max(event.id for event in events_in_24h) + print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {new_max_id}') + last_max_event_id = new_max_id + + print(f'[轮询 DEBUG] 更新后已知事件ID数量: {len(known_event_ids_in_24h)}') else: print(f'[轮询 DEBUG] 没有新事件需要推送') - # 没有新事件,也要更新检查时间 - last_checked_time = current_time + + # 即使没有新事件,也要更新已知事件集合(清理超过24小时的) + if events_in_24h: + known_event_ids_in_24h = set(event.id for event in events_in_24h) + current_max_id = max(event.id for event in events_in_24h) + if current_max_id != last_max_event_id: + print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {current_max_id}') + last_max_event_id = current_max_id print(f'[轮询 DEBUG] ========== 轮询结束 ==========\n') @@ -7738,22 +7738,43 @@ def initialize_event_polling(): 初始化事件轮询机制 在应用启动时调用 """ - global last_checked_time, pushed_event_ids + global known_event_ids_in_24h, last_max_event_id try: - from datetime import datetime + from datetime import datetime, timedelta with app.app_context(): - # 设置初始检查时间为当前时间 - # 这样启动后只会推送新创建的事件,不会推送历史事件 - last_checked_time = datetime.now() - pushed_event_ids.clear() + current_time = datetime.now() + time_24h_ago = current_time - timedelta(hours=24) + + print(f'\n[轮询] ========== 初始化事件轮询 ==========') + print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}') + + # 查询近24小时内的所有活跃事件 + events_in_24h = Event.query.filter( + Event.created_at >= time_24h_ago, + Event.status == 'active' + ).order_by(Event.id.asc()).all() + + # 初始化已知事件ID集合 + known_event_ids_in_24h = set(event.id for event in events_in_24h) + + # 初始化最大事件ID + if events_in_24h: + last_max_event_id = max(event.id for event in events_in_24h) + print(f'[轮询] 近24小时内共有 {len(events_in_24h)} 个活跃事件') + print(f'[轮询] 初始最大事件ID: {last_max_event_id}') + print(f'[轮询] 事件ID范围: {min(event.id for event in events_in_24h)} ~ {last_max_event_id}') + else: + last_max_event_id = 0 + print(f'[轮询] 近24小时内没有活跃事件') + print(f'[轮询] 初始最大事件ID: 0') # 统计数据库中的事件总数 total_events = Event.query.filter_by(status='active').count() - print(f'[轮询] 初始化完成,数据库中共有 {total_events} 个活跃事件') - print(f'[轮询] 起始时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S")}') - print(f'[轮询] 只会推送此时间之后创建的新事件') + print(f'[轮询] 数据库中共有 {total_events} 个活跃事件(所有时间)') + print(f'[轮询] 只会推送 ID > {last_max_event_id} 的新事件') + print(f'[轮询] ========== 初始化完成 ==========\n') # 创建后台调度器 scheduler = BackgroundScheduler()