update pay ui
This commit is contained in:
177
app.py
177
app.py
@@ -444,11 +444,12 @@ _async_mode = _detect_async_mode()
|
|||||||
print(f"📡 Flask-SocketIO async_mode: {_async_mode}")
|
print(f"📡 Flask-SocketIO async_mode: {_async_mode}")
|
||||||
|
|
||||||
# Redis 消息队列 URL(支持多 Worker 之间的消息同步)
|
# Redis 消息队列 URL(支持多 Worker 之间的消息同步)
|
||||||
# 注意:如果只用单 Worker,可以不配置 message_queue
|
# 使用 127.0.0.1 而非 localhost,避免 eventlet DNS 问题
|
||||||
SOCKETIO_MESSAGE_QUEUE = os.environ.get('SOCKETIO_REDIS_URL', 'redis://localhost:6379/2')
|
SOCKETIO_MESSAGE_QUEUE = os.environ.get('SOCKETIO_REDIS_URL', f'redis://{_REDIS_HOST}:{_REDIS_PORT}/2')
|
||||||
|
|
||||||
# 检测是否需要启用消息队列(多 Worker 模式需要)
|
# 检测是否需要启用消息队列
|
||||||
_use_message_queue = os.environ.get('SOCKETIO_USE_QUEUE', 'false').lower() == 'true'
|
# 默认启用(多 Worker 模式需要,单 Worker 模式也兼容)
|
||||||
|
_use_message_queue = os.environ.get('SOCKETIO_USE_QUEUE', 'true').lower() == 'true'
|
||||||
|
|
||||||
socketio = SocketIO(
|
socketio = SocketIO(
|
||||||
app,
|
app,
|
||||||
@@ -10404,36 +10405,73 @@ def broadcast_new_event(event):
|
|||||||
|
|
||||||
# ==================== WebSocket 轮询机制(检测新事件) ====================
|
# ==================== WebSocket 轮询机制(检测新事件) ====================
|
||||||
|
|
||||||
# 内存变量:记录近24小时内已知的事件ID集合和最大ID
|
# Redis Key 用于多 Worker 协调
|
||||||
known_event_ids_in_24h = set() # 近24小时内已知的所有事件ID
|
REDIS_KEY_LAST_MAX_EVENT_ID = 'vf:event_polling:last_max_id'
|
||||||
last_max_event_id = 0 # 已知的最大事件ID
|
REDIS_KEY_POLLING_LOCK = 'vf:event_polling:lock'
|
||||||
|
|
||||||
|
# 本地缓存(减少 Redis 查询)
|
||||||
|
_local_last_max_event_id = 0
|
||||||
|
_polling_initialized = False
|
||||||
|
|
||||||
|
|
||||||
|
def _get_last_max_event_id():
|
||||||
|
"""从 Redis 获取最大事件 ID"""
|
||||||
|
try:
|
||||||
|
val = redis_client.get(REDIS_KEY_LAST_MAX_EVENT_ID)
|
||||||
|
return int(val) if val else 0
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[轮询 WARN] 读取 Redis 失败: {e}')
|
||||||
|
return _local_last_max_event_id
|
||||||
|
|
||||||
|
|
||||||
|
def _set_last_max_event_id(new_id):
|
||||||
|
"""设置最大事件 ID 到 Redis"""
|
||||||
|
global _local_last_max_event_id
|
||||||
|
try:
|
||||||
|
redis_client.set(REDIS_KEY_LAST_MAX_EVENT_ID, str(new_id))
|
||||||
|
_local_last_max_event_id = new_id
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[轮询 WARN] 写入 Redis 失败: {e}')
|
||||||
|
_local_last_max_event_id = new_id
|
||||||
|
|
||||||
|
|
||||||
def poll_new_events():
|
def poll_new_events():
|
||||||
"""
|
"""
|
||||||
定期轮询数据库,检查是否有新事件
|
定期轮询数据库,检查是否有新事件
|
||||||
每 30 秒执行一次
|
每 30 秒执行一次
|
||||||
|
|
||||||
新的设计思路(修复 created_at 不是入库时间的问题):
|
多 Worker 协调机制:
|
||||||
1. 查询近24小时内的所有活跃事件(按 created_at,因为这是事件发生时间)
|
1. 使用 Redis 分布式锁,确保同一时刻只有一个 Worker 执行轮询
|
||||||
2. 通过对比事件ID(自增ID)来判断是否为新插入的事件
|
2. 使用 Redis 存储 last_max_event_id,所有 Worker 共享状态
|
||||||
3. 推送 ID > last_max_event_id 的事件
|
3. 通过 Redis 消息队列广播到所有 Worker 的客户端
|
||||||
4. 更新已知事件ID集合和最大ID
|
|
||||||
"""
|
"""
|
||||||
global known_event_ids_in_24h, last_max_event_id
|
import os
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# 尝试获取分布式锁(30秒超时,防止死锁)
|
||||||
|
lock_acquired = redis_client.set(
|
||||||
|
REDIS_KEY_POLLING_LOCK,
|
||||||
|
os.getpid(),
|
||||||
|
nx=True, # 只在不存在时设置
|
||||||
|
ex=30 # 30秒后自动过期
|
||||||
|
)
|
||||||
|
|
||||||
|
if not lock_acquired:
|
||||||
|
# 其他 Worker 正在轮询,跳过本次
|
||||||
|
return
|
||||||
|
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
current_time = datetime.now()
|
current_time = datetime.now()
|
||||||
print(f'\n[轮询 DEBUG] ========== 开始轮询 ==========')
|
last_max_event_id = _get_last_max_event_id()
|
||||||
print(f'[轮询 DEBUG] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
|
||||||
print(f'[轮询 DEBUG] 已知事件ID数量: {len(known_event_ids_in_24h)}')
|
print(f'\n[轮询] ========== 开始轮询 (PID: {os.getpid()}) ==========')
|
||||||
print(f'[轮询 DEBUG] 当前最大事件ID: {last_max_event_id}')
|
print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||||
|
print(f'[轮询] 当前最大事件ID: {last_max_event_id}')
|
||||||
|
|
||||||
# 查询近24小时内的所有活跃事件(按事件发生时间 created_at)
|
# 查询近24小时内的所有活跃事件(按事件发生时间 created_at)
|
||||||
time_24h_ago = current_time - timedelta(hours=24)
|
time_24h_ago = current_time - timedelta(hours=24)
|
||||||
print(f'[轮询 DEBUG] 查询时间范围: 近24小时({time_24h_ago.strftime("%Y-%m-%d %H:%M:%S")} ~ 现在)')
|
|
||||||
|
|
||||||
# 查询所有近24小时内的活跃事件
|
# 查询所有近24小时内的活跃事件
|
||||||
events_in_24h = Event.query.filter(
|
events_in_24h = Event.query.filter(
|
||||||
@@ -10441,7 +10479,7 @@ def poll_new_events():
|
|||||||
Event.status == 'active'
|
Event.status == 'active'
|
||||||
).order_by(Event.id.asc()).all()
|
).order_by(Event.id.asc()).all()
|
||||||
|
|
||||||
print(f'[轮询 DEBUG] 数据库查询结果: 找到 {len(events_in_24h)} 个近24小时内的事件')
|
print(f'[轮询] 数据库查询: 找到 {len(events_in_24h)} 个近24小时内的事件')
|
||||||
|
|
||||||
# 找出新插入的事件(ID > last_max_event_id)
|
# 找出新插入的事件(ID > last_max_event_id)
|
||||||
new_events = [
|
new_events = [
|
||||||
@@ -10449,7 +10487,7 @@ def poll_new_events():
|
|||||||
if event.id > last_max_event_id
|
if event.id > last_max_event_id
|
||||||
]
|
]
|
||||||
|
|
||||||
print(f'[轮询 DEBUG] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个')
|
print(f'[轮询] 新事件数量(ID > {last_max_event_id}): {len(new_events)} 个')
|
||||||
|
|
||||||
if new_events:
|
if new_events:
|
||||||
print(f'[轮询] 发现 {len(new_events)} 个新事件')
|
print(f'[轮询] 发现 {len(new_events)} 个新事件')
|
||||||
@@ -10459,68 +10497,65 @@ def poll_new_events():
|
|||||||
# 检查事件是否有关联股票(只推送有关联股票的事件)
|
# 检查事件是否有关联股票(只推送有关联股票的事件)
|
||||||
related_stocks_count = event.related_stocks.count()
|
related_stocks_count = event.related_stocks.count()
|
||||||
|
|
||||||
print(f'[轮询 DEBUG] 新事件详情:')
|
print(f'[轮询] 事件 ID={event.id}: {event.title} (关联股票: {related_stocks_count})')
|
||||||
print(f'[轮询 DEBUG] - ID: {event.id}')
|
|
||||||
print(f'[轮询 DEBUG] - 标题: {event.title}')
|
|
||||||
print(f'[轮询 DEBUG] - 事件发生时间(created_at): {event.created_at}')
|
|
||||||
print(f'[轮询 DEBUG] - 事件类型: {event.event_type}')
|
|
||||||
print(f'[轮询 DEBUG] - 关联股票数量: {related_stocks_count}')
|
|
||||||
|
|
||||||
# 只推送有关联股票的事件
|
# 只推送有关联股票的事件
|
||||||
if related_stocks_count > 0:
|
if related_stocks_count > 0:
|
||||||
print(f'[轮询 DEBUG] 准备推送事件 ID={event.id}(有 {related_stocks_count} 个关联股票)')
|
|
||||||
broadcast_new_event(event)
|
broadcast_new_event(event)
|
||||||
pushed_count += 1
|
pushed_count += 1
|
||||||
print(f'[轮询] ✓ 已推送事件 ID={event.id}, 标题={event.title}')
|
print(f'[轮询] ✓ 已推送事件 ID={event.id}')
|
||||||
else:
|
else:
|
||||||
print(f'[轮询 DEBUG] 跳过事件 ID={event.id}(暂无关联股票)')
|
print(f'[轮询] - 跳过(暂无关联股票)')
|
||||||
|
|
||||||
print(f'[轮询] 本轮共推送 {pushed_count}/{len(new_events)} 个事件')
|
print(f'[轮询] 本轮共推送 {pushed_count}/{len(new_events)} 个事件')
|
||||||
|
|
||||||
# 更新已知事件ID集合(所有近24小时内的事件ID)
|
|
||||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
|
||||||
|
|
||||||
# 更新最大事件ID
|
# 更新最大事件ID
|
||||||
new_max_id = max(event.id for event in events_in_24h)
|
new_max_id = max(event.id for event in events_in_24h)
|
||||||
print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {new_max_id}')
|
_set_last_max_event_id(new_max_id)
|
||||||
last_max_event_id = new_max_id
|
print(f'[轮询] 更新最大事件ID: {last_max_event_id} -> {new_max_id}')
|
||||||
|
|
||||||
print(f'[轮询 DEBUG] 更新后已知事件ID数量: {len(known_event_ids_in_24h)}')
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print(f'[轮询 DEBUG] 没有新事件需要推送')
|
# 即使没有新事件,也要更新最大ID(防止状态不同步)
|
||||||
|
|
||||||
# 即使没有新事件,也要更新已知事件集合(清理超过24小时的)
|
|
||||||
if events_in_24h:
|
if events_in_24h:
|
||||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
|
||||||
current_max_id = max(event.id for event in events_in_24h)
|
current_max_id = max(event.id for event in events_in_24h)
|
||||||
if current_max_id != last_max_event_id:
|
if current_max_id != last_max_event_id:
|
||||||
print(f'[轮询 DEBUG] 更新最大事件ID: {last_max_event_id} -> {current_max_id}')
|
_set_last_max_event_id(current_max_id)
|
||||||
last_max_event_id = current_max_id
|
|
||||||
|
|
||||||
print(f'[轮询 DEBUG] ========== 轮询结束 ==========\n')
|
print(f'[轮询] ========== 轮询结束 ==========\n')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'[轮询 ERROR] 检查新事件时出错: {e}')
|
print(f'[轮询 ERROR] 检查新事件时出错: {e}')
|
||||||
import traceback
|
import traceback
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
finally:
|
||||||
|
# 释放锁
|
||||||
|
try:
|
||||||
|
redis_client.delete(REDIS_KEY_POLLING_LOCK)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def initialize_event_polling():
|
def initialize_event_polling():
|
||||||
"""
|
"""
|
||||||
初始化事件轮询机制
|
初始化事件轮询机制
|
||||||
在应用启动时调用
|
在应用启动时调用(支持 gunicorn 多 Worker)
|
||||||
"""
|
"""
|
||||||
global known_event_ids_in_24h, last_max_event_id
|
global _polling_initialized
|
||||||
|
|
||||||
|
# 防止重复初始化
|
||||||
|
if _polling_initialized:
|
||||||
|
print('[轮询] 已经初始化过,跳过')
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
import os
|
||||||
|
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
current_time = datetime.now()
|
current_time = datetime.now()
|
||||||
time_24h_ago = current_time - timedelta(hours=24)
|
time_24h_ago = current_time - timedelta(hours=24)
|
||||||
|
|
||||||
print(f'\n[轮询] ========== 初始化事件轮询 ==========')
|
print(f'\n[轮询] ========== 初始化事件轮询 (PID: {os.getpid()}) ==========')
|
||||||
print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
print(f'[轮询] 当前时间: {current_time.strftime("%Y-%m-%d %H:%M:%S")}')
|
||||||
|
|
||||||
# 查询近24小时内的所有活跃事件
|
# 查询近24小时内的所有活跃事件
|
||||||
@@ -10529,24 +10564,22 @@ def initialize_event_polling():
|
|||||||
Event.status == 'active'
|
Event.status == 'active'
|
||||||
).order_by(Event.id.asc()).all()
|
).order_by(Event.id.asc()).all()
|
||||||
|
|
||||||
# 初始化已知事件ID集合
|
# 初始化最大事件ID(只有当 Redis 中没有值时才设置)
|
||||||
known_event_ids_in_24h = set(event.id for event in events_in_24h)
|
current_redis_max = _get_last_max_event_id()
|
||||||
|
|
||||||
# 初始化最大事件ID
|
|
||||||
if events_in_24h:
|
if events_in_24h:
|
||||||
last_max_event_id = max(event.id for event in events_in_24h)
|
db_max_id = max(event.id for event in events_in_24h)
|
||||||
print(f'[轮询] 近24小时内共有 {len(events_in_24h)} 个活跃事件')
|
if db_max_id > current_redis_max:
|
||||||
print(f'[轮询] 初始最大事件ID: {last_max_event_id}')
|
_set_last_max_event_id(db_max_id)
|
||||||
print(f'[轮询] 事件ID范围: {min(event.id for event in events_in_24h)} ~ {last_max_event_id}')
|
print(f'[轮询] 初始化最大事件ID: {db_max_id}')
|
||||||
|
else:
|
||||||
|
print(f'[轮询] 使用 Redis 中的最大事件ID: {current_redis_max}')
|
||||||
|
print(f'[轮询] 近24小时内共有 {len(events_in_24h)} 个活跃事件')
|
||||||
else:
|
else:
|
||||||
last_max_event_id = 0
|
|
||||||
print(f'[轮询] 近24小时内没有活跃事件')
|
print(f'[轮询] 近24小时内没有活跃事件')
|
||||||
print(f'[轮询] 初始最大事件ID: 0')
|
|
||||||
|
|
||||||
# 统计数据库中的事件总数
|
# 统计数据库中的事件总数
|
||||||
total_events = Event.query.filter_by(status='active').count()
|
total_events = Event.query.filter_by(status='active').count()
|
||||||
print(f'[轮询] 数据库中共有 {total_events} 个活跃事件(所有时间)')
|
print(f'[轮询] 数据库中共有 {total_events} 个活跃事件(所有时间)')
|
||||||
print(f'[轮询] 只会推送 ID > {last_max_event_id} 的新事件')
|
|
||||||
print(f'[轮询] ========== 初始化完成 ==========\n')
|
print(f'[轮询] ========== 初始化完成 ==========\n')
|
||||||
|
|
||||||
# 创建后台调度器
|
# 创建后台调度器
|
||||||
@@ -10561,10 +10594,38 @@ def initialize_event_polling():
|
|||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
print('[轮询] 调度器已启动,每 30 秒检查一次新事件')
|
print(f'[轮询] 调度器已启动 (PID: {os.getpid()}),每 30 秒检查一次新事件')
|
||||||
|
|
||||||
|
_polling_initialized = True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'[轮询] 初始化失败: {e}')
|
print(f'[轮询] 初始化失败: {e}')
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
|
||||||
|
# ==================== Gunicorn 兼容:自动初始化轮询 ====================
|
||||||
|
|
||||||
|
def _auto_init_polling():
|
||||||
|
"""
|
||||||
|
自动初始化事件轮询(兼容 gunicorn)
|
||||||
|
使用延迟初始化,在第一个请求到来时初始化
|
||||||
|
"""
|
||||||
|
global _polling_initialized
|
||||||
|
if not _polling_initialized:
|
||||||
|
try:
|
||||||
|
initialize_event_polling()
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[轮询] 自动初始化失败: {e}')
|
||||||
|
|
||||||
|
|
||||||
|
# 注册 before_request 钩子,确保 gunicorn 启动后也能初始化轮询
|
||||||
|
@app.before_request
|
||||||
|
def ensure_polling_initialized():
|
||||||
|
"""确保轮询机制已初始化(只执行一次)"""
|
||||||
|
global _polling_initialized
|
||||||
|
if not _polling_initialized:
|
||||||
|
_auto_init_polling()
|
||||||
|
|
||||||
|
|
||||||
# ==================== 结束 WebSocket 部分 ====================
|
# ==================== 结束 WebSocket 部分 ====================
|
||||||
|
|||||||
Reference in New Issue
Block a user