diff --git a/WEBSOCKET_INTEGRATION_GUIDE.md b/WEBSOCKET_INTEGRATION_GUIDE.md new file mode 100644 index 00000000..7859cf9e --- /dev/null +++ b/WEBSOCKET_INTEGRATION_GUIDE.md @@ -0,0 +1,546 @@ +# WebSocket 事件实时推送 - 前端集成指南 + +## 📦 已创建的文件 + +1. **`src/services/socketService.js`** - WebSocket 服务(已扩展) +2. **`src/hooks/useEventNotifications.js`** - React Hook +3. **`test_websocket.html`** - 测试页面 +4. **`test_create_event.py`** - 测试脚本 + +--- + +## 🚀 快速开始 + +### 方案 1:使用 React Hook(推荐) + +在任何 React 组件中使用: + +```jsx +import { useEventNotifications } from 'hooks/useEventNotifications'; +import { useToast } from '@chakra-ui/react'; + +function EventsPage() { + const toast = useToast(); + + // 订阅事件推送 + const { newEvent, isConnected } = useEventNotifications({ + eventType: 'all', // 'all' | 'policy' | 'market' | 'tech' | ... + importance: 'all', // 'all' | 'S' | 'A' | 'B' | 'C' + enabled: true, // 是否启用订阅 + onNewEvent: (event) => { + // 收到新事件时的处理 + console.log('🔔 收到新事件:', event); + + // 显示 Toast 通知 + toast({ + title: '新事件提醒', + description: event.title, + status: 'info', + duration: 5000, + isClosable: true, + position: 'top-right', + }); + } + }); + + return ( + + 连接状态: {isConnected ? '已连接 ✅' : '未连接 ❌'} + {/* 你的事件列表 */} + + ); +} +``` + +--- + +### 方案 2:在事件列表页面集成(完整示例) + +**在 `src/views/Community/components/EventList.js` 中集成:** + +```jsx +import React, { useState, useEffect } from 'react'; +import { Box, Text, Badge, useToast } from '@chakra-ui/react'; +import { useEventNotifications } from 'hooks/useEventNotifications'; + +function EventList() { + const [events, setEvents] = useState([]); + const [loading, setLoading] = useState(true); + const toast = useToast(); + + // 1️⃣ 初始加载事件列表(REST API) + useEffect(() => { + fetchEvents(); + }, []); + + const fetchEvents = async () => { + try { + const response = await fetch('/api/events?per_page=20'); + const data = await response.json(); + + if (data.success) { + setEvents(data.data.events); + } + } catch (error) { + console.error('加载事件失败:', error); + } finally { + setLoading(false); + } + }; + + // 2️⃣ 订阅 WebSocket 实时推送 + const { newEvent, isConnected } = useEventNotifications({ + eventType: 'all', + importance: 'all', + enabled: true, // 可以根据用户设置控制是否启用 + onNewEvent: (event) => { + console.log('🔔 收到新事件:', event); + + // 显示通知 + toast({ + title: '📰 新事件发布', + description: `${event.title}`, + status: 'info', + duration: 6000, + isClosable: true, + position: 'top-right', + }); + + // 将新事件添加到列表顶部 + setEvents((prevEvents) => { + // 检查是否已存在(防止重复) + const exists = prevEvents.some(e => e.id === event.id); + if (exists) { + return prevEvents; + } + // 添加到顶部,最多保留 100 个 + return [event, ...prevEvents].slice(0, 100); + }); + } + }); + + return ( + + {/* 连接状态指示器 */} + + + {isConnected ? '实时推送已开启' : '实时推送未连接'} + + + + {/* 事件列表 */} + {loading ? ( + 加载中... + ) : ( + + {events.map((event) => ( + + ))} + + )} + + ); +} + +export default EventList; +``` + +--- + +### 方案 3:只订阅重要事件(S 和 A 级) + +```jsx +import { useImportantEventNotifications } from 'hooks/useEventNotifications'; + +function Dashboard() { + const { importantEvents, isConnected } = useImportantEventNotifications((event) => { + // 只会收到 S 和 A 级别的重要事件 + console.log('⚠️ 重要事件:', event); + + // 播放提示音 + new Audio('/notification.mp3').play(); + }); + + return ( + + 重要事件通知 + {importantEvents.map(event => ( + + + {event.title} + + ))} + + ); +} +``` + +--- + +### 方案 4:直接使用 Service(不用 Hook) + +```jsx +import { useEffect } from 'react'; +import socketService from 'services/socketService'; + +function MyComponent() { + useEffect(() => { + // 连接 + socketService.connect(); + + // 订阅 + const unsubscribe = socketService.subscribeToAllEvents((event) => { + console.log('新事件:', event); + }); + + // 清理 + return () => { + unsubscribe(); + socketService.disconnect(); + }; + }, []); + + return
...
; +} +``` + +--- + +## 🎨 UI 集成示例 + +### 1. Toast 通知(Chakra UI) + +```jsx +import { useToast } from '@chakra-ui/react'; + +const toast = useToast(); + +// 在 onNewEvent 回调中 +onNewEvent: (event) => { + toast({ + title: '新事件', + description: event.title, + status: 'info', + duration: 5000, + isClosable: true, + position: 'top-right', + }); +} +``` + +--- + +### 2. 顶部通知栏 + +```jsx +import { Alert, AlertIcon, CloseButton } from '@chakra-ui/react'; + +function EventNotificationBanner() { + const [showNotification, setShowNotification] = useState(false); + const [latestEvent, setLatestEvent] = useState(null); + + useEventNotifications({ + eventType: 'all', + onNewEvent: (event) => { + setLatestEvent(event); + setShowNotification(true); + } + }); + + if (!showNotification || !latestEvent) return null; + + return ( + + + 新事件:{latestEvent.title} + setShowNotification(false)} + /> + + ); +} +``` + +--- + +### 3. 角标提示(红点) + +```jsx +import { Badge } from '@chakra-ui/react'; + +function EventsMenuItem() { + const [unreadCount, setUnreadCount] = useState(0); + + useEventNotifications({ + eventType: 'all', + onNewEvent: () => { + setUnreadCount(prev => prev + 1); + } + }); + + return ( + + 事件中心 + {unreadCount > 0 && ( + + {unreadCount > 99 ? '99+' : unreadCount} + + )} + + ); +} +``` + +--- + +### 4. 浮动通知卡片 + +```jsx +import { Box, Slide, useDisclosure } from '@chakra-ui/react'; + +function FloatingEventNotification() { + const { isOpen, onClose, onOpen } = useDisclosure(); + const [event, setEvent] = useState(null); + + useEventNotifications({ + eventType: 'all', + onNewEvent: (newEvent) => { + setEvent(newEvent); + onOpen(); + + // 5秒后自动关闭 + setTimeout(onClose, 5000); + } + }); + + return ( + + + {event?.title} + {event?.description} + + + + ); +} +``` + +--- + +## 📋 API 参考 + +### `useEventNotifications(options)` + +**参数:** +| 参数 | 类型 | 默认值 | 说明 | +|------|------|--------|------| +| `eventType` | string | `'all'` | 事件类型:`'all'` / `'policy'` / `'market'` / `'tech'` 等 | +| `importance` | string | `'all'` | 重要性:`'all'` / `'S'` / `'A'` / `'B'` / `'C'` | +| `enabled` | boolean | `true` | 是否启用订阅 | +| `onNewEvent` | function | - | 收到新事件时的回调函数 | + +**返回值:** +| 属性 | 类型 | 说明 | +|------|------|------| +| `newEvent` | object | 最新收到的事件对象 | +| `isConnected` | boolean | WebSocket 连接状态 | +| `error` | object | 错误信息 | +| `clearNewEvent` | function | 清除新事件状态 | + +--- + +### `socketService` API + +```javascript +// 连接 +socketService.connect(options) + +// 断开 +socketService.disconnect() + +// 订阅所有事件 +socketService.subscribeToAllEvents(callback) + +// 订阅特定类型 +socketService.subscribeToEventType('tech', callback) + +// 订阅特定重要性 +socketService.subscribeToImportantEvents('S', callback) + +// 取消订阅 +socketService.unsubscribeFromEvents({ eventType: 'all' }) + +// 检查连接状态 +socketService.isConnected() +``` + +--- + +## 🔧 事件数据结构 + +收到的 `event` 对象包含: + +```javascript +{ + id: 123, + title: "事件标题", + description: "事件描述", + event_type: "tech", // 类型 + importance: "S", // 重要性 + status: "active", + created_at: "2025-01-21T14:30:00", + hot_score: 85.5, + view_count: 1234, + related_avg_chg: 5.2, // 平均涨幅 + related_max_chg: 15.8, // 最大涨幅 + keywords: ["AI", "芯片"], // 关键词 +} +``` + +--- + +## ⚙️ 高级配置 + +### 1. 条件订阅(用户设置) + +```jsx +function EventsPage() { + const [enableNotifications, setEnableNotifications] = useState( + localStorage.getItem('enableEventNotifications') === 'true' + ); + + useEventNotifications({ + eventType: 'all', + enabled: enableNotifications, // 根据用户设置控制 + onNewEvent: handleNewEvent + }); + + return ( + { + const enabled = e.target.checked; + setEnableNotifications(enabled); + localStorage.setItem('enableEventNotifications', enabled); + }} + > + 启用事件实时通知 + + ); +} +``` + +--- + +### 2. 多个订阅(不同类型) + +```jsx +function MultiSubscriptionExample() { + // 订阅科技类事件 + useEventNotifications({ + eventType: 'tech', + onNewEvent: (event) => console.log('科技事件:', event) + }); + + // 订阅政策类事件 + useEventNotifications({ + eventType: 'policy', + onNewEvent: (event) => console.log('政策事件:', event) + }); + + return
...
; +} +``` + +--- + +### 3. 防抖处理(避免通知过多) + +```jsx +import { debounce } from 'lodash'; + +const debouncedNotify = debounce((event) => { + toast({ + title: '新事件', + description: event.title, + }); +}, 1000); + +useEventNotifications({ + eventType: 'all', + onNewEvent: debouncedNotify +}); +``` + +--- + +## 🧪 测试步骤 + +1. **启动 Flask 服务** + ```bash + python app.py + ``` + +2. **启动 React 应用** + ```bash + npm start + ``` + +3. **创建测试事件** + ```bash + python test_create_event.py + ``` + +4. **观察结果** + - 最多等待 30 秒 + - 前端页面应该显示通知 + - 控制台输出日志 + +--- + +## 🐛 常见问题 + +### Q: 没有收到推送? +**A:** 检查: +1. Flask 服务是否启动 +2. 浏览器控制台是否有连接错误 +3. 后端日志是否显示 `[轮询] 发现 X 个新事件` + +### Q: 连接一直失败? +**A:** 检查: +1. API_BASE_URL 配置是否正确 +2. CORS 配置是否包含前端域名 +3. 防火墙/代理设置 + +### Q: 收到重复通知? +**A:** 检查是否多次调用了 Hook,确保只在需要的地方订阅一次。 + +--- + +## 📚 更多资源 + +- Socket.IO 文档: https://socket.io/docs/v4/ +- Chakra UI Toast: https://chakra-ui.com/docs/components/toast +- React Hooks: https://react.dev/reference/react + +--- + +**完成!🎉** 现在你的前端可以实时接收事件推送了! diff --git a/app.py b/app.py index e798b4d7..4a188cab 100755 --- a/app.py +++ b/app.py @@ -8,6 +8,7 @@ import uuid from functools import wraps import qrcode from flask_mail import Mail, Message +from flask_socketio import SocketIO, emit, join_room, leave_room import pytz import requests from celery import Celery @@ -40,6 +41,7 @@ from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentClo from sqlalchemy import text, desc, and_ import pandas as pd from decimal import Decimal +from apscheduler.schedulers.background import BackgroundScheduler # 交易日数据缓存 trading_days = [] @@ -242,6 +244,18 @@ db = SQLAlchemy(app) # 初始化邮件服务 mail = Mail(app) +# 初始化 Flask-SocketIO(用于实时事件推送) +socketio = SocketIO( + app, + cors_allowed_origins=["http://localhost:3000", "http://127.0.0.1:3000", "http://localhost:5173", + "https://valuefrontier.cn", "http://valuefrontier.cn"], + async_mode='gevent', + logger=True, + engineio_logger=False, + ping_timeout=120, # 心跳超时时间(秒),客户端120秒内无响应才断开 + ping_interval=25 # 心跳检测间隔(秒),每25秒发送一次ping +) + @login_manager.user_loader def load_user(user_id): @@ -7443,6 +7457,243 @@ def add_event_comment(event_id): }), 500 +# ==================== WebSocket 事件处理器(实时事件推送) ==================== + +@socketio.on('connect') +def handle_connect(): + """客户端连接事件""" + print(f'[WebSocket] 客户端已连接: {request.sid}') + emit('connection_response', { + 'status': 'connected', + 'sid': request.sid, + 'message': '已连接到事件推送服务' + }) + + +@socketio.on('subscribe_events') +def handle_subscribe(data): + """ + 客户端订阅事件推送 + data: { + 'event_type': 'all' | 'policy' | 'market' | 'tech' | ..., + 'importance': 'all' | 'S' | 'A' | 'B' | 'C', + 'filters': {...} # 可选的其他筛选条件 + } + """ + try: + event_type = data.get('event_type', 'all') + importance = data.get('importance', 'all') + + # 加入对应的房间 + room_name = f"events_{event_type}" + join_room(room_name) + + print(f'[WebSocket] 客户端 {request.sid} 订阅了房间: {room_name}') + + emit('subscription_confirmed', { + 'success': True, + 'room': room_name, + 'event_type': event_type, + 'importance': importance, + 'message': f'已订阅 {event_type} 类型的事件推送' + }) + + except Exception as e: + print(f'[WebSocket] 订阅失败: {e}') + emit('subscription_error', { + 'success': False, + 'error': str(e) + }) + + +@socketio.on('unsubscribe_events') +def handle_unsubscribe(data): + """取消订阅事件推送""" + try: + event_type = data.get('event_type', 'all') + room_name = f"events_{event_type}" + leave_room(room_name) + + print(f'[WebSocket] 客户端 {request.sid} 取消订阅房间: {room_name}') + + emit('unsubscription_confirmed', { + 'success': True, + 'room': room_name, + 'message': f'已取消订阅 {event_type} 类型的事件推送' + }) + + except Exception as e: + print(f'[WebSocket] 取消订阅失败: {e}') + emit('unsubscription_error', { + 'success': False, + 'error': str(e) + }) + + +@socketio.on('disconnect') +def handle_disconnect(): + """客户端断开连接事件""" + print(f'[WebSocket] 客户端已断开: {request.sid}') + + +# ==================== WebSocket 辅助函数 ==================== + +def broadcast_new_event(event): + """ + 广播新事件到所有订阅的客户端 + 在创建新事件时调用此函数 + + Args: + event: Event 模型实例 + """ + try: + event_data = { + 'id': event.id, + 'title': event.title, + 'description': event.description, + 'event_type': event.event_type, + 'importance': event.importance, + 'status': event.status, + 'created_at': event.created_at.isoformat() if event.created_at else None, + 'hot_score': event.hot_score, + 'view_count': event.view_count, + 'related_avg_chg': event.related_avg_chg, + 'related_max_chg': event.related_max_chg, + 'keywords': event.keywords_list if hasattr(event, 'keywords_list') else event.keywords, + } + + # 发送到所有订阅者(all 房间) + socketio.emit('new_event', event_data, room='events_all', namespace='/') + + # 发送到特定类型订阅者 + if event.event_type: + room_name = f"events_{event.event_type}" + socketio.emit('new_event', event_data, room=room_name, namespace='/') + print(f'[WebSocket] 已推送新事件到房间: events_all, {room_name}') + else: + print(f'[WebSocket] 已推送新事件到房间: events_all') + + except Exception as e: + print(f'[WebSocket] 推送新事件失败: {e}') + + +# ==================== WebSocket 轮询机制(检测新事件) ==================== + +# 内存变量:记录上次检查的时间戳和已推送的事件 ID 集合 +last_checked_time = None +pushed_event_ids = set() # 已推送的事件 ID 集合,防止重复推送 +MAX_PUSHED_IDS_SIZE = 1000 # 已推送 ID 集合的最大容量 + +def poll_new_events(): + """ + 定期轮询数据库,检查是否有新事件 + 每 30 秒执行一次 + + 设计思路: + 1. 使用时间戳查询(created_at),而不是 ID + 2. 维护已推送事件 ID 集合,避免重复推送 + 3. 使用重叠时间窗口(向前多查60秒),捕获延迟写入的事件 + 4. 定期清理已推送集合,防止内存泄漏 + """ + global last_checked_time, pushed_event_ids + + try: + with app.app_context(): + from datetime import datetime, timedelta + + current_time = datetime.now() + + # 如果是第一次运行,只查询最近 30 秒的事件 + if last_checked_time is None: + query_start_time = current_time - timedelta(seconds=30) + else: + # 向前多查 60 秒(重叠窗口),防止漏掉延迟写入的事件 + query_start_time = last_checked_time - timedelta(seconds=60) + + # 查询时间范围内的新事件 + new_events = Event.query.filter( + Event.created_at >= query_start_time, + Event.created_at <= current_time, + Event.status == 'active' + ).order_by(Event.created_at.asc()).all() + + # 过滤掉已经推送过的事件 + unpushed_events = [ + event for event in new_events + if event.id not in pushed_event_ids + ] + + if unpushed_events: + print(f'[轮询] 发现 {len(unpushed_events)} 个新事件(查询到 {len(new_events)} 个,已过滤 {len(new_events) - len(unpushed_events)} 个重复)') + + for event in unpushed_events: + # 推送新事件 + broadcast_new_event(event) + # 记录已推送 + pushed_event_ids.add(event.id) + print(f'[轮询] 推送事件 ID={event.id}, 标题={event.title}') + + # 更新检查时间 + last_checked_time = current_time + + # 清理已推送集合(防止无限增长) + if len(pushed_event_ids) > MAX_PUSHED_IDS_SIZE: + # 只保留最新的一半 + sorted_ids = sorted(pushed_event_ids) + pushed_event_ids = set(sorted_ids[-MAX_PUSHED_IDS_SIZE//2:]) + print(f'[轮询] 已清理推送记录,当前保留 {len(pushed_event_ids)} 个') + + else: + # 没有新事件,也要更新检查时间 + last_checked_time = current_time + + except Exception as e: + print(f'[轮询] 检查新事件时出错: {e}') + + +def initialize_event_polling(): + """ + 初始化事件轮询机制 + 在应用启动时调用 + """ + global last_checked_time, pushed_event_ids + + try: + from datetime import datetime + + with app.app_context(): + # 设置初始检查时间为当前时间 + # 这样启动后只会推送新创建的事件,不会推送历史事件 + last_checked_time = datetime.now() + pushed_event_ids.clear() + + # 统计数据库中的事件总数 + total_events = Event.query.filter_by(status='active').count() + print(f'[轮询] 初始化完成,数据库中共有 {total_events} 个活跃事件') + print(f'[轮询] 起始时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S")}') + print(f'[轮询] 只会推送此时间之后创建的新事件') + + # 创建后台调度器 + scheduler = BackgroundScheduler() + # 每 30 秒执行一次轮询 + scheduler.add_job( + func=poll_new_events, + trigger='interval', + seconds=30, + id='poll_new_events', + name='检查新事件并推送', + replace_existing=True + ) + scheduler.start() + print('[轮询] 调度器已启动,每 30 秒检查一次新事件') + + except Exception as e: + print(f'[轮询] 初始化失败: {e}') + + +# ==================== 结束 WebSocket 部分 ==================== + + @app.route('/api/posts//like', methods=['POST']) @login_required def like_post(post_id): @@ -11581,4 +11832,8 @@ if __name__ == '__main__': except Exception as e: app.logger.error(f"数据库初始化失败: {e}") - app.run(host='0.0.0.0', port=5001, debug=False) \ No newline at end of file + # 初始化事件轮询机制(WebSocket 推送) + initialize_event_polling() + + # 使用 socketio.run 替代 app.run 以支持 WebSocket + socketio.run(app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True) \ No newline at end of file diff --git a/src/hooks/useEventNotifications.js b/src/hooks/useEventNotifications.js new file mode 100644 index 00000000..9b99a039 --- /dev/null +++ b/src/hooks/useEventNotifications.js @@ -0,0 +1,161 @@ +// src/hooks/useEventNotifications.js +/** + * React Hook:用于在组件中订阅事件推送通知 + * + * 使用示例: + * ```jsx + * import { useEventNotifications } from 'hooks/useEventNotifications'; + * + * function MyComponent() { + * const { newEvent, isConnected } = useEventNotifications({ + * eventType: 'all', + * importance: 'all', + * onNewEvent: (event) => { + * console.log('收到新事件:', event); + * // 显示通知... + * } + * }); + * + * return
...
; + * } + * ``` + */ + +import { useEffect, useState, useRef } from 'react'; +import { socketService } from '../services/socketService'; + +export const useEventNotifications = (options = {}) => { + const { + eventType = 'all', + importance = 'all', + enabled = true, + onNewEvent, + } = options; + + const [isConnected, setIsConnected] = useState(false); + const [newEvent, setNewEvent] = useState(null); + const [error, setError] = useState(null); + const unsubscribeRef = useRef(null); + + useEffect(() => { + // 如果禁用,则不订阅 + if (!enabled) { + return; + } + + // 连接状态监听 + const handleConnect = () => { + setIsConnected(true); + setError(null); + }; + + const handleDisconnect = () => { + setIsConnected(false); + }; + + const handleConnectError = (err) => { + setError(err); + setIsConnected(false); + }; + + // 连接 WebSocket + socketService.connect(); + + // 监听连接事件 + socketService.on('connect', handleConnect); + socketService.on('disconnect', handleDisconnect); + socketService.on('connect_error', handleConnectError); + + // 新事件处理函数 + const handleNewEvent = (eventData) => { + setNewEvent(eventData); + + // 调用外部回调 + if (onNewEvent) { + onNewEvent(eventData); + } + }; + + // 订阅事件推送 + socketService.subscribeToEvents({ + eventType, + importance, + onNewEvent: handleNewEvent, + onSubscribed: (data) => { + console.log('订阅成功:', data); + }, + }); + + // 保存取消订阅函数 + unsubscribeRef.current = () => { + socketService.unsubscribeFromEvents({ eventType }); + }; + + // 组件卸载时清理 + return () => { + console.log('清理 WebSocket 订阅'); + + // 取消订阅 + if (unsubscribeRef.current) { + unsubscribeRef.current(); + } + + // 移除监听器 + socketService.off('connect', handleConnect); + socketService.off('disconnect', handleDisconnect); + socketService.off('connect_error', handleConnectError); + + // 断开连接 + socketService.disconnect(); + }; + }, [eventType, importance, enabled, onNewEvent]); + + return { + newEvent, // 最新收到的事件 + isConnected, // WebSocket 连接状态 + error, // 错误信息 + clearNewEvent: () => setNewEvent(null), // 清除新事件状态 + }; +}; + +/** + * 简化版 Hook:只订阅所有事件 + */ +export const useAllEventNotifications = (onNewEvent) => { + return useEventNotifications({ + eventType: 'all', + importance: 'all', + onNewEvent, + }); +}; + +/** + * Hook:订阅重要事件(S 和 A 级) + */ +export const useImportantEventNotifications = (onNewEvent) => { + const [importantEvents, setImportantEvents] = useState([]); + + const handleEvent = (event) => { + // 只处理 S 和 A 级事件 + if (event.importance === 'S' || event.importance === 'A') { + setImportantEvents(prev => [event, ...prev].slice(0, 10)); // 最多保留 10 个 + if (onNewEvent) { + onNewEvent(event); + } + } + }; + + const result = useEventNotifications({ + eventType: 'all', + importance: 'all', + onNewEvent: handleEvent, + }); + + return { + ...result, + importantEvents, + clearImportantEvents: () => setImportantEvents([]), + }; +}; + +export default useEventNotifications; diff --git a/src/services/socketService.js b/src/services/socketService.js index 32e129f0..f3be42b0 100644 --- a/src/services/socketService.js +++ b/src/services/socketService.js @@ -186,6 +186,169 @@ class SocketService { getSocketId() { return this.socket?.id || null; } + + // ==================== 事件推送专用方法 ==================== + + /** + * 订阅事件推送 + * @param {object} options - 订阅选项 + * @param {string} options.eventType - 事件类型 ('all' | 'policy' | 'market' | 'tech' | ...) + * @param {string} options.importance - 重要性 ('all' | 'S' | 'A' | 'B' | 'C') + * @param {Function} options.onNewEvent - 收到新事件时的回调函数 + * @param {Function} options.onSubscribed - 订阅成功的回调函数(可选) + */ + subscribeToEvents(options = {}) { + const { + eventType = 'all', + importance = 'all', + onNewEvent, + onSubscribed, + } = options; + + if (!this.socket || !this.connected) { + logger.warn('socketService', 'Cannot subscribe: socket not connected'); + // 自动连接 + this.connect(); + // 等待连接成功后再订阅 + this.socket.once('connect', () => { + this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); + }); + return; + } + + this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); + } + + /** + * 执行订阅操作(内部方法) + */ + _doSubscribe(eventType, importance, onNewEvent, onSubscribed) { + // 发送订阅请求 + this.emit('subscribe_events', { + event_type: eventType, + importance: importance, + }); + + // 监听订阅确认 + this.socket.once('subscription_confirmed', (data) => { + logger.info('socketService', 'Subscription confirmed', data); + if (onSubscribed) { + onSubscribed(data); + } + }); + + // 监听订阅错误 + this.socket.once('subscription_error', (error) => { + logger.error('socketService', 'Subscription error', error); + }); + + // 监听新事件推送 + if (onNewEvent) { + // 先移除之前的监听器(避免重复) + this.socket.off('new_event'); + // 添加新的监听器 + this.socket.on('new_event', (eventData) => { + logger.info('socketService', 'New event received', eventData); + onNewEvent(eventData); + }); + } + } + + /** + * 取消订阅事件推送 + * @param {object} options - 取消订阅选项 + * @param {string} options.eventType - 事件类型 + * @param {Function} options.onUnsubscribed - 取消订阅成功的回调函数(可选) + */ + unsubscribeFromEvents(options = {}) { + const { + eventType = 'all', + onUnsubscribed, + } = options; + + if (!this.socket || !this.connected) { + logger.warn('socketService', 'Cannot unsubscribe: socket not connected'); + return; + } + + // 发送取消订阅请求 + this.emit('unsubscribe_events', { + event_type: eventType, + }); + + // 监听取消订阅确认 + this.socket.once('unsubscription_confirmed', (data) => { + logger.info('socketService', 'Unsubscription confirmed', data); + + // 移除新事件监听器 + this.socket.off('new_event'); + + if (onUnsubscribed) { + onUnsubscribed(data); + } + }); + + // 监听取消订阅错误 + this.socket.once('unsubscription_error', (error) => { + logger.error('socketService', 'Unsubscription error', error); + }); + } + + /** + * 快捷方法:订阅所有类型的事件 + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToAllEvents(onNewEvent) { + this.subscribeToEvents({ + eventType: 'all', + importance: 'all', + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType: 'all' }); + }; + } + + /** + * 快捷方法:订阅特定重要性的事件 + * @param {string} importance - 重要性级别 ('S' | 'A' | 'B' | 'C') + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToImportantEvents(importance, onNewEvent) { + this.subscribeToEvents({ + eventType: 'all', + importance, + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType: 'all' }); + }; + } + + /** + * 快捷方法:订阅特定类型的事件 + * @param {string} eventType - 事件类型 + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToEventType(eventType, onNewEvent) { + this.subscribeToEvents({ + eventType, + importance: 'all', + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType }); + }; + } } // 导出单例 diff --git a/test_create_event.py b/test_create_event.py new file mode 100644 index 00000000..5a475cc9 --- /dev/null +++ b/test_create_event.py @@ -0,0 +1,109 @@ +""" +测试脚本:手动创建事件到数据库 +用于测试 WebSocket 实时推送功能 +""" + +import sys +from datetime import datetime +from sqlalchemy import create_engine, Column, Integer, String, Text, Float, DateTime +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +# 数据库连接(从 app.py 复制) +DATABASE_URI = 'mysql+pymysql://root:Zzl5588161!@111.198.58.126:33060/stock?charset=utf8mb4' + +engine = create_engine(DATABASE_URI, echo=False) +Session = sessionmaker(bind=engine) +session = Session() + +Base = declarative_base() + +# Event 模型(简化版,只包含必要字段) +class Event(Base): + __tablename__ = 'events' + + id = Column(Integer, primary_key=True) + title = Column(String(500), nullable=False) + description = Column(Text) + event_type = Column(String(100)) + importance = Column(String(10)) + status = Column(String(50), default='active') + hot_score = Column(Float, default=0) + view_count = Column(Integer, default=0) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +def create_test_event(): + """创建一个测试事件""" + + import random + + event_types = ['policy', 'market', 'tech', 'industry', 'finance'] + importances = ['S', 'A', 'B', 'C'] + + test_event = Event( + title=f'测试事件 - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', + description=f'这是一个用于测试 WebSocket 实时推送的事件,创建于 {datetime.now()}', + event_type=random.choice(event_types), + importance=random.choice(importances), + status='active', + hot_score=round(random.uniform(50, 100), 2), + view_count=random.randint(100, 1000) + ) + + try: + session.add(test_event) + session.commit() + + print("✅ 测试事件创建成功!") + print(f" ID: {test_event.id}") + print(f" 标题: {test_event.title}") + print(f" 类型: {test_event.event_type}") + print(f" 重要性: {test_event.importance}") + print(f" 热度: {test_event.hot_score}") + print(f"\n💡 提示: 轮询将在 2 分钟内检测到此事件并推送到前端") + print(f" (如果需要立即推送,请将轮询间隔改为更短)") + + return test_event.id + + except Exception as e: + session.rollback() + print(f"❌ 创建事件失败: {e}") + return None + finally: + session.close() + + +def create_multiple_events(count=3): + """创建多个测试事件""" + print(f"正在创建 {count} 个测试事件...\n") + + for i in range(count): + event_id = create_test_event() + if event_id: + print(f"[{i+1}/{count}] 事件 #{event_id} 创建成功\n") + else: + print(f"[{i+1}/{count}] 创建失败\n") + + print(f"\n✅ 完成!共创建 {count} 个事件") + + +if __name__ == '__main__': + print("=" * 60) + print("WebSocket 事件推送测试 - 手动创建事件") + print("=" * 60) + print() + + if len(sys.argv) > 1: + try: + count = int(sys.argv[1]) + create_multiple_events(count) + except ValueError: + print("❌ 参数必须是数字") + print("用法: python test_create_event.py [数量]") + else: + # 默认创建 1 个事件 + create_test_event() + + print("\n" + "=" * 60) diff --git a/test_websocket.html b/test_websocket.html new file mode 100644 index 00000000..a11c5b1c --- /dev/null +++ b/test_websocket.html @@ -0,0 +1,289 @@ + + + + + + WebSocket 事件推送测试 + + + + +
+

🔌 WebSocket 事件推送测试页面

+ +
+ 状态: 未连接 +
+ +
+ + + + + +
+ +
+ +

📋 日志

+
+
+ + + +