diff --git a/app.py b/app.py index 7e4b7f20..b224d72b 100755 --- a/app.py +++ b/app.py @@ -251,7 +251,9 @@ socketio = SocketIO( "https://valuefrontier.cn", "http://valuefrontier.cn"], async_mode='gevent', logger=True, - engineio_logger=False + engineio_logger=False, + ping_timeout=120, # 心跳超时时间(秒),客户端120秒内无响应才断开 + ping_interval=25 # 心跳检测间隔(秒),每25秒发送一次ping ) @@ -7583,7 +7585,7 @@ last_checked_event_id = 0 def poll_new_events(): """ 定期轮询数据库,检查是否有新事件 - 每 2 分钟执行一次 + 每 30 秒执行一次 """ global last_checked_event_id @@ -7629,17 +7631,17 @@ def initialize_event_polling(): # 创建后台调度器 scheduler = BackgroundScheduler() - # 每 2 分钟执行一次轮询 + # 每 30 秒执行一次轮询 scheduler.add_job( func=poll_new_events, trigger='interval', - minutes=2, + seconds=30, id='poll_new_events', name='检查新事件并推送', replace_existing=True ) scheduler.start() - print('[轮询] 调度器已启动,每 2 分钟检查一次新事件') + print('[轮询] 调度器已启动,每 30 秒检查一次新事件') except Exception as e: print(f'[轮询] 初始化失败: {e}') diff --git a/test_create_event.py b/test_create_event.py new file mode 100644 index 00000000..5a475cc9 --- /dev/null +++ b/test_create_event.py @@ -0,0 +1,109 @@ +""" +测试脚本:手动创建事件到数据库 +用于测试 WebSocket 实时推送功能 +""" + +import sys +from datetime import datetime +from sqlalchemy import create_engine, Column, Integer, String, Text, Float, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +# 数据库连接(从 app.py 复制) +DATABASE_URI = 'mysql+pymysql://root:Zzl5588161!@111.198.58.126:33060/stock?charset=utf8mb4' + +engine = create_engine(DATABASE_URI, echo=False) +Session = sessionmaker(bind=engine) +session = Session() + +Base = declarative_base() + +# Event 模型(简化版,只包含必要字段) +class Event(Base): + __tablename__ = 'events' + + id = Column(Integer, primary_key=True) + title = Column(String(500), nullable=False) + description = Column(Text) + event_type = Column(String(100)) + importance = Column(String(10)) + status = Column(String(50), default='active') + hot_score = Column(Float, default=0) + view_count = Column(Integer, default=0) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +def create_test_event(): + """创建一个测试事件""" + + import random + + event_types = ['policy', 'market', 'tech', 'industry', 'finance'] + importances = ['S', 'A', 'B', 'C'] + + test_event = Event( + title=f'测试事件 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', + description=f'这是一个用于测试 WebSocket 实时推送的事件,创建于 {datetime.now()}', + event_type=random.choice(event_types), + importance=random.choice(importances), + status='active', + hot_score=round(random.uniform(50, 100), 2), + view_count=random.randint(100, 1000) + ) + + try: + session.add(test_event) + session.commit() + + print("✅ 测试事件创建成功!") + print(f" ID: {test_event.id}") + print(f" 标题: {test_event.title}") + print(f" 类型: {test_event.event_type}") + print(f" 重要性: {test_event.importance}") + print(f" 热度: {test_event.hot_score}") + print(f"\n💡 提示: 轮询将在 2 分钟内检测到此事件并推送到前端") + print(f" (如果需要立即推送,请将轮询间隔改为更短)") + + return test_event.id + + except Exception as e: + session.rollback() + print(f"❌ 创建事件失败: {e}") + return None + finally: + session.close() + + +def create_multiple_events(count=3): + """创建多个测试事件""" + print(f"正在创建 {count} 个测试事件...\n") + + for i in range(count): + event_id = create_test_event() + if event_id: + print(f"[{i+1}/{count}] 事件 #{event_id} 创建成功\n") + else: + print(f"[{i+1}/{count}] 创建失败\n") + + print(f"\n✅ 完成!共创建 {count} 个事件") + + +if __name__ == '__main__': + print("=" * 60) + print("WebSocket 事件推送测试 - 手动创建事件") + print("=" * 60) + print() + + if len(sys.argv) > 1: + try: + count = int(sys.argv[1]) + create_multiple_events(count) + except ValueError: + print("❌ 参数必须是数字") + print("用法: python test_create_event.py [数量]") + else: + # 默认创建 1 个事件 + create_test_event() + + print("\n" + "=" * 60) diff --git a/test_websocket.html b/test_websocket.html new file mode 100644 index 00000000..a11c5b1c --- /dev/null +++ b/test_websocket.html @@ -0,0 +1,289 @@ + + +
+ + +