update /api/events/<int:event_id>/stocks resp format
This commit is contained in:
141
app.py
141
app.py
@@ -7642,23 +7642,22 @@ def broadcast_new_event(event):
|
||||
|
||||
# ==================== WebSocket 轮询机制(检测新事件) ====================
|
||||
|
||||
# 内存变量:记录上次检查的时间戳和已推送的事件 ID 集合
|
||||
last_checked_time = None
|
||||
pushed_event_ids = set() # 已推送的事件 ID 集合,防止重复推送
|
||||
MAX_PUSHED_IDS_SIZE = 1000 # 已推送 ID 集合的最大容量
|
||||
# 内存变量:记录近24小时内已知的事件ID集合和最大ID
|
||||
known_event_ids_in_24h = set() # 近24小时内已知的所有事件ID
|
||||
last_max_event_id = 0 # 已知的最大事件ID
|
||||
|
||||
def poll_new_events():
|
||||
"""
|
||||
定期轮询数据库,检查是否有新事件
|
||||
每 30 秒执行一次
|
||||
|
||||
设计思路:
|
||||
1. 使用时间戳查询(created_at),而不是 ID
|
||||
2. 维护已推送事件 ID 集合,避免重复推送
|
||||
3. 使用重叠时间窗口(向前多查60秒),捕获延迟写入的事件
|
||||
4. 定期清理已推送集合,防止内存泄漏
|
||||
新的设计思路(修复 created_at 不是入库时间的问题):
|
||||
1. 查询近24小时内的所有活跃事件(按 created_at,因为这是事件发生时间)
|
||||
2. 通过对比事件ID(自增ID)来判断是否为新插入的事件
|
||||
3. 推送 ID > last_max_event_id 的事件
|
||||
4. 更新已知事件ID集合和最大ID
|
||||
"""
|
||||
global last_checked_time, pushed_event_ids
|
||||
global known_event_ids_in_24h, last_max_event_id
|
||||
|
||||
try:
|
||||
with app.app_context():
|
||||
@@ -7667,63 +7666,64 @@ def poll_new_events():
|
||||
current_time = datetime.now()
|
||||
print(f'\n[轮询 DEBUG] ========== 开始轮询 ==========')
|
||||
print(f'[轮询 DEBUG] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||
print(f'[轮询 DEBUG] 上次检查时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S") if last_checked_time else "None"}')
|
||||
print(f'[轮询 DEBUG] 已推送事件数量: {len(pushed_event_ids)}')
|
||||
print(f'[轮询 DEBUG] 已知事件ID数量: {len(known_event_ids_in_24h)}')
|
||||
print(f'[轮询 DEBUG] 当前最大事件ID: {last_max_event_id}')
|
||||
|
||||
# 如果是第一次运行,只查询最近 30 秒的事件
|
||||
if last_checked_time is None:
|
||||
query_start_time = current_time - timedelta(seconds=30)
|
||||
else:
|
||||
# 向前多查 60 秒(重叠窗口),防止漏掉延迟写入的事件
|
||||
query_start_time = last_checked_time - timedelta(seconds=60)
|
||||
# 查询近24小时内的所有活跃事件(按事件发生时间 created_at)
|
||||
time_24h_ago = current_time - timedelta(hours=24)
|
||||
print(f'[轮询 DEBUG] 查询时间范围: 近24小时({time_24h_ago.strftime("%Y-%m-%d %H:%M:%S")} ~ 现在)')
|
||||
|
||||
print(f'[轮询 DEBUG] 查询时间范围: {query_start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||
|
||||
# 查询时间范围内的新事件
|
||||
new_events = Event.query.filter(
|
||||
Event.created_at >= query_start_time,
|
||||
Event.created_at <= current_time,
|
||||
# 查询所有近24小时内的活跃事件
|
||||
events_in_24h = Event.query.filter(
|
||||
Event.created_at >= time_24h_ago,
|
||||
Event.status == 'active'
|
||||
).order_by(Event.created_at.asc()).all()
|
||||
).order_by(Event.id.asc()).all()
|
||||
|
||||
print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(new_events)} 个事件')
|
||||
if new_events:
|
||||
for evt in new_events:
|
||||
print(f'[轮询 DEBUG] - ID={evt.id}, 标题={evt.title}, 创建时间={evt.created_at}, 已推送={evt.id in pushed_event_ids}')
|
||||
print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(events_in_24h)} 个近24小时内的事件')
|
||||
|
||||
# 过滤掉已经推送过的事件
|
||||
unpushed_events = [
|
||||
event for event in new_events
|
||||
if event.id not in pushed_event_ids
|
||||
# 找出新插入的事件(ID > last_max_event_id)
|
||||
new_events = [
|
||||
event for event in events_in_24h
|
||||
if event.id > last_max_event_id
|
||||
]
|
||||
|
||||
print(f'[轮询 DEBUG] 过滤后未推送事件: {len(unpushed_events)} 个')
|
||||
print(f'[轮询 DEBUG] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个')
|
||||
|
||||
if unpushed_events:
|
||||
print(f'[轮询] 发现 {len(unpushed_events)} 个新事件(查询到 {len(new_events)} 个,已过滤 {len(new_events) - len(unpushed_events)} 个重复)')
|
||||
if new_events:
|
||||
print(f'[轮询] 发现 {len(new_events)} 个新事件')
|
||||
|
||||
for event in new_events:
|
||||
print(f'[轮询 DEBUG] 新事件详情:')
|
||||
print(f'[轮询 DEBUG] - ID: {event.id}')
|
||||
print(f'[轮询 DEBUG] - 标题: {event.title}')
|
||||
print(f'[轮询 DEBUG] - 事件发生时间(created_at): {event.created_at}')
|
||||
print(f'[轮询 DEBUG] - 事件类型: {event.event_type}')
|
||||
|
||||
for event in unpushed_events:
|
||||
print(f'[轮询 DEBUG] 准备推送事件: ID={event.id}, 标题={event.title}')
|
||||
# 推送新事件
|
||||
print(f'[轮询 DEBUG] 准备推送事件 ID={event.id}')
|
||||
broadcast_new_event(event)
|
||||
# 记录已推送
|
||||
pushed_event_ids.add(event.id)
|
||||
print(f'[轮询] 推送事件 ID={event.id}, 标题={event.title}')
|
||||
print(f'[轮询] ✓ 已推送事件 ID={event.id}, 标题={event.title}')
|
||||
|
||||
# 更新检查时间
|
||||
last_checked_time = current_time
|
||||
# 更新已知事件ID集合(所有近24小时内的事件ID)
|
||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
||||
|
||||
# 清理已推送集合(防止无限增长)
|
||||
if len(pushed_event_ids) > MAX_PUSHED_IDS_SIZE:
|
||||
# 只保留最新的一半
|
||||
sorted_ids = sorted(pushed_event_ids)
|
||||
pushed_event_ids = set(sorted_ids[-MAX_PUSHED_IDS_SIZE//2:])
|
||||
print(f'[轮询] 已清理推送记录,当前保留 {len(pushed_event_ids)} 个')
|
||||
# 更新最大事件ID
|
||||
new_max_id = max(event.id for event in events_in_24h)
|
||||
print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {new_max_id}')
|
||||
last_max_event_id = new_max_id
|
||||
|
||||
print(f'[轮询 DEBUG] 更新后已知事件ID数量: {len(known_event_ids_in_24h)}')
|
||||
|
||||
else:
|
||||
print(f'[轮询 DEBUG] 没有新事件需要推送')
|
||||
# 没有新事件,也要更新检查时间
|
||||
last_checked_time = current_time
|
||||
|
||||
# 即使没有新事件,也要更新已知事件集合(清理超过24小时的)
|
||||
if events_in_24h:
|
||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
||||
current_max_id = max(event.id for event in events_in_24h)
|
||||
if current_max_id != last_max_event_id:
|
||||
print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {current_max_id}')
|
||||
last_max_event_id = current_max_id
|
||||
|
||||
print(f'[轮询 DEBUG] ========== 轮询结束 ==========\n')
|
||||
|
||||
@@ -7738,22 +7738,43 @@ def initialize_event_polling():
|
||||
初始化事件轮询机制
|
||||
在应用启动时调用
|
||||
"""
|
||||
global last_checked_time, pushed_event_ids
|
||||
global known_event_ids_in_24h, last_max_event_id
|
||||
|
||||
try:
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
with app.app_context():
|
||||
# 设置初始检查时间为当前时间
|
||||
# 这样启动后只会推送新创建的事件,不会推送历史事件
|
||||
last_checked_time = datetime.now()
|
||||
pushed_event_ids.clear()
|
||||
current_time = datetime.now()
|
||||
time_24h_ago = current_time - timedelta(hours=24)
|
||||
|
||||
print(f'\n[轮询] ========== 初始化事件轮询 ==========')
|
||||
print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||
|
||||
# 查询近24小时内的所有活跃事件
|
||||
events_in_24h = Event.query.filter(
|
||||
Event.created_at >= time_24h_ago,
|
||||
Event.status == 'active'
|
||||
).order_by(Event.id.asc()).all()
|
||||
|
||||
# 初始化已知事件ID集合
|
||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
||||
|
||||
# 初始化最大事件ID
|
||||
if events_in_24h:
|
||||
last_max_event_id = max(event.id for event in events_in_24h)
|
||||
print(f'[轮询] 近24小时内共有 {len(events_in_24h)} 个活跃事件')
|
||||
print(f'[轮询] 初始最大事件ID: {last_max_event_id}')
|
||||
print(f'[轮询] 事件ID范围: {min(event.id for event in events_in_24h)} ~ {last_max_event_id}')
|
||||
else:
|
||||
last_max_event_id = 0
|
||||
print(f'[轮询] 近24小时内没有活跃事件')
|
||||
print(f'[轮询] 初始最大事件ID: 0')
|
||||
|
||||
# 统计数据库中的事件总数
|
||||
total_events = Event.query.filter_by(status='active').count()
|
||||
print(f'[轮询] 初始化完成,数据库中共有 {total_events} 个活跃事件')
|
||||
print(f'[轮询] 起始时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||
print(f'[轮询] 只会推送此时间之后创建的新事件')
|
||||
print(f'[轮询] 数据库中共有 {total_events} 个活跃事件(所有时间)')
|
||||
print(f'[轮询] 只会推送 ID > {last_max_event_id} 的新事件')
|
||||
print(f'[轮询] ========== 初始化完成 ==========\n')
|
||||
|
||||
# 创建后台调度器
|
||||
scheduler = BackgroundScheduler()
|
||||
|
||||
Reference in New Issue
Block a user