添加socketservice

This commit is contained in:
2025-10-21 15:13:11 +08:00
parent cfb00ba895
commit 74968d5bc8
4 changed files with 935 additions and 21 deletions

86
app.py
View File

@@ -7579,34 +7579,73 @@ def broadcast_new_event(event):
# ==================== WebSocket 轮询机制(检测新事件) ====================
# 内存变量:记录上次检查的最大事件 ID
last_checked_event_id = 0
# 内存变量:记录上次检查的时间戳和已推送的事件 ID 集合
last_checked_time = None
pushed_event_ids = set() # 已推送的事件 ID 集合,防止重复推送
MAX_PUSHED_IDS_SIZE = 1000 # 已推送 ID 集合的最大容量
def poll_new_events():
"""
定期轮询数据库,检查是否有新事件
每 30 秒执行一次
设计思路:
1. 使用时间戳查询created_at而不是 ID
2. 维护已推送事件 ID 集合,避免重复推送
3. 使用重叠时间窗口向前多查60秒捕获延迟写入的事件
4. 定期清理已推送集合,防止内存泄漏
"""
global last_checked_event_id
global last_checked_time, pushed_event_ids
try:
with app.app_context():
# 查询比上次检查 ID 更大的所有新事件
from datetime import datetime, timedelta
current_time = datetime.now()
# 如果是第一次运行,只查询最近 30 秒的事件
if last_checked_time is None:
query_start_time = current_time - timedelta(seconds=30)
else:
# 向前多查 60 秒(重叠窗口),防止漏掉延迟写入的事件
query_start_time = last_checked_time - timedelta(seconds=60)
# 查询时间范围内的新事件
new_events = Event.query.filter(
Event.id > last_checked_event_id,
Event.created_at >= query_start_time,
Event.created_at <= current_time,
Event.status == 'active'
).order_by(Event.id.asc()).all()
).order_by(Event.created_at.asc()).all()
if new_events:
print(f'[轮询] 发现 {len(new_events)} 个新事件')
# 过滤掉已经推送过的事件
unpushed_events = [
event for event in new_events
if event.id not in pushed_event_ids
]
for event in new_events:
# 推送每个新事件
if unpushed_events:
print(f'[轮询] 发现 {len(unpushed_events)} 个新事件(查询到 {len(new_events)} 个,已过滤 {len(new_events) - len(unpushed_events)} 个重复)')
for event in unpushed_events:
# 推送新事件
broadcast_new_event(event)
# 更新最后检查的 ID
last_checked_event_id = event.id
# 记录已推送
pushed_event_ids.add(event.id)
print(f'[轮询] 推送事件 ID={event.id}, 标题={event.title}')
print(f'[轮询] 已推送新事件,最新 ID: {last_checked_event_id}')
# 更新检查时间
last_checked_time = current_time
# 清理已推送集合(防止无限增长)
if len(pushed_event_ids) > MAX_PUSHED_IDS_SIZE:
# 只保留最新的一半
sorted_ids = sorted(pushed_event_ids)
pushed_event_ids = set(sorted_ids[-MAX_PUSHED_IDS_SIZE//2:])
print(f'[轮询] 已清理推送记录,当前保留 {len(pushed_event_ids)}')
else:
# 没有新事件,也要更新检查时间
last_checked_time = current_time
except Exception as e:
print(f'[轮询] 检查新事件时出错: {e}')
@@ -7617,17 +7656,22 @@ def initialize_event_polling():
初始化事件轮询机制
在应用启动时调用
"""
global last_checked_event_id
global last_checked_time, pushed_event_ids
try:
from datetime import datetime
with app.app_context():
# 获取当前数据库中最新的事件 ID作为起点
latest_event = Event.query.order_by(Event.id.desc()).first()
if latest_event:
last_checked_event_id = latest_event.id
print(f'[轮询] 初始化完成,起始事件 ID: {last_checked_event_id}')
else:
print('[轮询] 数据库中暂无事件')
# 设置初始检查时间为当前时间
# 这样启动后只会推送新创建的事件,不会推送历史事件
last_checked_time = datetime.now()
pushed_event_ids.clear()
# 统计数据库中的事件总数
total_events = Event.query.filter_by(status='active').count()
print(f'[轮询] 初始化完成,数据库中共有 {total_events} 个活跃事件')
print(f'[轮询] 起始时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S")}')
print(f'[轮询] 只会推送此时间之后创建的新事件')
# 创建后台调度器
scheduler = BackgroundScheduler()