diff --git a/WEBSOCKET_INTEGRATION_GUIDE.md b/WEBSOCKET_INTEGRATION_GUIDE.md new file mode 100644 index 00000000..7859cf9e --- /dev/null +++ b/WEBSOCKET_INTEGRATION_GUIDE.md @@ -0,0 +1,546 @@ +# WebSocket 事件实时推送 - 前端集成指南 + +## 📦 已创建的文件 + +1. **`src/services/socketService.js`** - WebSocket 服务(已扩展) +2. **`src/hooks/useEventNotifications.js`** - React Hook +3. **`test_websocket.html`** - 测试页面 +4. **`test_create_event.py`** - 测试脚本 + +--- + +## 🚀 快速开始 + +### 方案 1:使用 React Hook(推荐) + +在任何 React 组件中使用: + +```jsx +import { useEventNotifications } from 'hooks/useEventNotifications'; +import { useToast } from '@chakra-ui/react'; + +function EventsPage() { + const toast = useToast(); + + // 订阅事件推送 + const { newEvent, isConnected } = useEventNotifications({ + eventType: 'all', // 'all' | 'policy' | 'market' | 'tech' | ... + importance: 'all', // 'all' | 'S' | 'A' | 'B' | 'C' + enabled: true, // 是否启用订阅 + onNewEvent: (event) => { + // 收到新事件时的处理 + console.log('🔔 收到新事件:', event); + + // 显示 Toast 通知 + toast({ + title: '新事件提醒', + description: event.title, + status: 'info', + duration: 5000, + isClosable: true, + position: 'top-right', + }); + } + }); + + return ( + + 连接状态: {isConnected ? '已连接 ✅' : '未连接 ❌'} + {/* 你的事件列表 */} + + ); +} +``` + +--- + +### 方案 2:在事件列表页面集成(完整示例) + +**在 `src/views/Community/components/EventList.js` 中集成:** + +```jsx +import React, { useState, useEffect } from 'react'; +import { Box, Text, Badge, useToast } from '@chakra-ui/react'; +import { useEventNotifications } from 'hooks/useEventNotifications'; + +function EventList() { + const [events, setEvents] = useState([]); + const [loading, setLoading] = useState(true); + const toast = useToast(); + + // 1️⃣ 初始加载事件列表(REST API) + useEffect(() => { + fetchEvents(); + }, []); + + const fetchEvents = async () => { + try { + const response = await fetch('/api/events?per_page=20'); + const data = await response.json(); + + if (data.success) { + setEvents(data.data.events); + } + } catch (error) { + console.error('加载事件失败:', error); + } finally { + setLoading(false); + } + }; + + // 2️⃣ 订阅 WebSocket 实时推送 + const { newEvent, isConnected } = useEventNotifications({ + eventType: 'all', + importance: 'all', + enabled: true, // 可以根据用户设置控制是否启用 + onNewEvent: (event) => { + console.log('🔔 收到新事件:', event); + + // 显示通知 + toast({ + title: '📰 新事件发布', + description: `${event.title}`, + status: 'info', + duration: 6000, + isClosable: true, + position: 'top-right', + }); + + // 将新事件添加到列表顶部 + setEvents((prevEvents) => { + // 检查是否已存在(防止重复) + const exists = prevEvents.some(e => e.id === event.id); + if (exists) { + return prevEvents; + } + // 添加到顶部,最多保留 100 个 + return [event, ...prevEvents].slice(0, 100); + }); + } + }); + + return ( + + {/* 连接状态指示器 */} + + + {isConnected ? '实时推送已开启' : '实时推送未连接'} + + + + {/* 事件列表 */} + {loading ? ( + 加载中... + ) : ( + + {events.map((event) => ( + + ))} + + )} + + ); +} + +export default EventList; +``` + +--- + +### 方案 3:只订阅重要事件(S 和 A 级) + +```jsx +import { useImportantEventNotifications } from 'hooks/useEventNotifications'; + +function Dashboard() { + const { importantEvents, isConnected } = useImportantEventNotifications((event) => { + // 只会收到 S 和 A 级别的重要事件 + console.log('⚠️ 重要事件:', event); + + // 播放提示音 + new Audio('/notification.mp3').play(); + }); + + return ( + + 重要事件通知 + {importantEvents.map(event => ( + + + {event.title} + + ))} + + ); +} +``` + +--- + +### 方案 4:直接使用 Service(不用 Hook) + +```jsx +import { useEffect } from 'react'; +import socketService from 'services/socketService'; + +function MyComponent() { + useEffect(() => { + // 连接 + socketService.connect(); + + // 订阅 + const unsubscribe = socketService.subscribeToAllEvents((event) => { + console.log('新事件:', event); + }); + + // 清理 + return () => { + unsubscribe(); + socketService.disconnect(); + }; + }, []); + + return
...
; +} +``` + +--- + +## 🎨 UI 集成示例 + +### 1. Toast 通知(Chakra UI) + +```jsx +import { useToast } from '@chakra-ui/react'; + +const toast = useToast(); + +// 在 onNewEvent 回调中 +onNewEvent: (event) => { + toast({ + title: '新事件', + description: event.title, + status: 'info', + duration: 5000, + isClosable: true, + position: 'top-right', + }); +} +``` + +--- + +### 2. 顶部通知栏 + +```jsx +import { Alert, AlertIcon, CloseButton } from '@chakra-ui/react'; + +function EventNotificationBanner() { + const [showNotification, setShowNotification] = useState(false); + const [latestEvent, setLatestEvent] = useState(null); + + useEventNotifications({ + eventType: 'all', + onNewEvent: (event) => { + setLatestEvent(event); + setShowNotification(true); + } + }); + + if (!showNotification || !latestEvent) return null; + + return ( + + + 新事件:{latestEvent.title} + setShowNotification(false)} + /> + + ); +} +``` + +--- + +### 3. 角标提示(红点) + +```jsx +import { Badge } from '@chakra-ui/react'; + +function EventsMenuItem() { + const [unreadCount, setUnreadCount] = useState(0); + + useEventNotifications({ + eventType: 'all', + onNewEvent: () => { + setUnreadCount(prev => prev + 1); + } + }); + + return ( + + 事件中心 + {unreadCount > 0 && ( + + {unreadCount > 99 ? '99+' : unreadCount} + + )} + + ); +} +``` + +--- + +### 4. 浮动通知卡片 + +```jsx +import { Box, Slide, useDisclosure } from '@chakra-ui/react'; + +function FloatingEventNotification() { + const { isOpen, onClose, onOpen } = useDisclosure(); + const [event, setEvent] = useState(null); + + useEventNotifications({ + eventType: 'all', + onNewEvent: (newEvent) => { + setEvent(newEvent); + onOpen(); + + // 5秒后自动关闭 + setTimeout(onClose, 5000); + } + }); + + return ( + + + {event?.title} + {event?.description} + + + + ); +} +``` + +--- + +## 📋 API 参考 + +### `useEventNotifications(options)` + +**参数:** +| 参数 | 类型 | 默认值 | 说明 | +|------|------|--------|------| +| `eventType` | string | `'all'` | 事件类型:`'all'` / `'policy'` / `'market'` / `'tech'` 等 | +| `importance` | string | `'all'` | 重要性:`'all'` / `'S'` / `'A'` / `'B'` / `'C'` | +| `enabled` | boolean | `true` | 是否启用订阅 | +| `onNewEvent` | function | - | 收到新事件时的回调函数 | + +**返回值:** +| 属性 | 类型 | 说明 | +|------|------|------| +| `newEvent` | object | 最新收到的事件对象 | +| `isConnected` | boolean | WebSocket 连接状态 | +| `error` | object | 错误信息 | +| `clearNewEvent` | function | 清除新事件状态 | + +--- + +### `socketService` API + +```javascript +// 连接 +socketService.connect(options) + +// 断开 +socketService.disconnect() + +// 订阅所有事件 +socketService.subscribeToAllEvents(callback) + +// 订阅特定类型 +socketService.subscribeToEventType('tech', callback) + +// 订阅特定重要性 +socketService.subscribeToImportantEvents('S', callback) + +// 取消订阅 +socketService.unsubscribeFromEvents({ eventType: 'all' }) + +// 检查连接状态 +socketService.isConnected() +``` + +--- + +## 🔧 事件数据结构 + +收到的 `event` 对象包含: + +```javascript +{ + id: 123, + title: "事件标题", + description: "事件描述", + event_type: "tech", // 类型 + importance: "S", // 重要性 + status: "active", + created_at: "2025-01-21T14:30:00", + hot_score: 85.5, + view_count: 1234, + related_avg_chg: 5.2, // 平均涨幅 + related_max_chg: 15.8, // 最大涨幅 + keywords: ["AI", "芯片"], // 关键词 +} +``` + +--- + +## ⚙️ 高级配置 + +### 1. 条件订阅(用户设置) + +```jsx +function EventsPage() { + const [enableNotifications, setEnableNotifications] = useState( + localStorage.getItem('enableEventNotifications') === 'true' + ); + + useEventNotifications({ + eventType: 'all', + enabled: enableNotifications, // 根据用户设置控制 + onNewEvent: handleNewEvent + }); + + return ( + { + const enabled = e.target.checked; + setEnableNotifications(enabled); + localStorage.setItem('enableEventNotifications', enabled); + }} + > + 启用事件实时通知 + + ); +} +``` + +--- + +### 2. 多个订阅(不同类型) + +```jsx +function MultiSubscriptionExample() { + // 订阅科技类事件 + useEventNotifications({ + eventType: 'tech', + onNewEvent: (event) => console.log('科技事件:', event) + }); + + // 订阅政策类事件 + useEventNotifications({ + eventType: 'policy', + onNewEvent: (event) => console.log('政策事件:', event) + }); + + return
...
; +} +``` + +--- + +### 3. 防抖处理(避免通知过多) + +```jsx +import { debounce } from 'lodash'; + +const debouncedNotify = debounce((event) => { + toast({ + title: '新事件', + description: event.title, + }); +}, 1000); + +useEventNotifications({ + eventType: 'all', + onNewEvent: debouncedNotify +}); +``` + +--- + +## 🧪 测试步骤 + +1. **启动 Flask 服务** + ```bash + python app.py + ``` + +2. **启动 React 应用** + ```bash + npm start + ``` + +3. **创建测试事件** + ```bash + python test_create_event.py + ``` + +4. **观察结果** + - 最多等待 30 秒 + - 前端页面应该显示通知 + - 控制台输出日志 + +--- + +## 🐛 常见问题 + +### Q: 没有收到推送? +**A:** 检查: +1. Flask 服务是否启动 +2. 浏览器控制台是否有连接错误 +3. 后端日志是否显示 `[轮询] 发现 X 个新事件` + +### Q: 连接一直失败? +**A:** 检查: +1. API_BASE_URL 配置是否正确 +2. CORS 配置是否包含前端域名 +3. 防火墙/代理设置 + +### Q: 收到重复通知? +**A:** 检查是否多次调用了 Hook,确保只在需要的地方订阅一次。 + +--- + +## 📚 更多资源 + +- Socket.IO 文档: https://socket.io/docs/v4/ +- Chakra UI Toast: https://chakra-ui.com/docs/components/toast +- React Hooks: https://react.dev/reference/react + +--- + +**完成!🎉** 现在你的前端可以实时接收事件推送了! diff --git a/app.py b/app.py index b224d72b..4a188cab 100755 --- a/app.py +++ b/app.py @@ -7579,34 +7579,73 @@ def broadcast_new_event(event): # ==================== WebSocket 轮询机制(检测新事件) ==================== -# 内存变量:记录上次检查的最大事件 ID -last_checked_event_id = 0 +# 内存变量:记录上次检查的时间戳和已推送的事件 ID 集合 +last_checked_time = None +pushed_event_ids = set() # 已推送的事件 ID 集合,防止重复推送 +MAX_PUSHED_IDS_SIZE = 1000 # 已推送 ID 集合的最大容量 def poll_new_events(): """ 定期轮询数据库,检查是否有新事件 每 30 秒执行一次 + + 设计思路: + 1. 使用时间戳查询(created_at),而不是 ID + 2. 维护已推送事件 ID 集合,避免重复推送 + 3. 使用重叠时间窗口(向前多查60秒),捕获延迟写入的事件 + 4. 定期清理已推送集合,防止内存泄漏 """ - global last_checked_event_id + global last_checked_time, pushed_event_ids try: with app.app_context(): - # 查询比上次检查 ID 更大的所有新事件 + from datetime import datetime, timedelta + + current_time = datetime.now() + + # 如果是第一次运行,只查询最近 30 秒的事件 + if last_checked_time is None: + query_start_time = current_time - timedelta(seconds=30) + else: + # 向前多查 60 秒(重叠窗口),防止漏掉延迟写入的事件 + query_start_time = last_checked_time - timedelta(seconds=60) + + # 查询时间范围内的新事件 new_events = Event.query.filter( - Event.id > last_checked_event_id, + Event.created_at >= query_start_time, + Event.created_at <= current_time, Event.status == 'active' - ).order_by(Event.id.asc()).all() + ).order_by(Event.created_at.asc()).all() - if new_events: - print(f'[轮询] 发现 {len(new_events)} 个新事件') + # 过滤掉已经推送过的事件 + unpushed_events = [ + event for event in new_events + if event.id not in pushed_event_ids + ] - for event in new_events: - # 推送每个新事件 + if unpushed_events: + print(f'[轮询] 发现 {len(unpushed_events)} 个新事件(查询到 {len(new_events)} 个,已过滤 {len(new_events) - len(unpushed_events)} 个重复)') + + for event in unpushed_events: + # 推送新事件 broadcast_new_event(event) - # 更新最后检查的 ID - last_checked_event_id = event.id + # 记录已推送 + pushed_event_ids.add(event.id) + print(f'[轮询] 推送事件 ID={event.id}, 标题={event.title}') - print(f'[轮询] 已推送新事件,最新 ID: {last_checked_event_id}') + # 更新检查时间 + last_checked_time = current_time + + # 清理已推送集合(防止无限增长) + if len(pushed_event_ids) > MAX_PUSHED_IDS_SIZE: + # 只保留最新的一半 + sorted_ids = sorted(pushed_event_ids) + pushed_event_ids = set(sorted_ids[-MAX_PUSHED_IDS_SIZE//2:]) + print(f'[轮询] 已清理推送记录,当前保留 {len(pushed_event_ids)} 个') + + else: + # 没有新事件,也要更新检查时间 + last_checked_time = current_time except Exception as e: print(f'[轮询] 检查新事件时出错: {e}') @@ -7617,17 +7656,22 @@ def initialize_event_polling(): 初始化事件轮询机制 在应用启动时调用 """ - global last_checked_event_id + global last_checked_time, pushed_event_ids try: + from datetime import datetime + with app.app_context(): - # 获取当前数据库中最新的事件 ID,作为起点 - latest_event = Event.query.order_by(Event.id.desc()).first() - if latest_event: - last_checked_event_id = latest_event.id - print(f'[轮询] 初始化完成,起始事件 ID: {last_checked_event_id}') - else: - print('[轮询] 数据库中暂无事件') + # 设置初始检查时间为当前时间 + # 这样启动后只会推送新创建的事件,不会推送历史事件 + last_checked_time = datetime.now() + pushed_event_ids.clear() + + # 统计数据库中的事件总数 + total_events = Event.query.filter_by(status='active').count() + print(f'[轮询] 初始化完成,数据库中共有 {total_events} 个活跃事件') + print(f'[轮询] 起始时间: {last_checked_time.strftime("%Y-%m-%d %H:%M:%S")}') + print(f'[轮询] 只会推送此时间之后创建的新事件') # 创建后台调度器 scheduler = BackgroundScheduler() diff --git a/src/hooks/useEventNotifications.js b/src/hooks/useEventNotifications.js new file mode 100644 index 00000000..9b99a039 --- /dev/null +++ b/src/hooks/useEventNotifications.js @@ -0,0 +1,161 @@ +// src/hooks/useEventNotifications.js +/** + * React Hook:用于在组件中订阅事件推送通知 + * + * 使用示例: + * ```jsx + * import { useEventNotifications } from 'hooks/useEventNotifications'; + * + * function MyComponent() { + * const { newEvent, isConnected } = useEventNotifications({ + * eventType: 'all', + * importance: 'all', + * onNewEvent: (event) => { + * console.log('收到新事件:', event); + * // 显示通知... + * } + * }); + * + * return
...
; + * } + * ``` + */ + +import { useEffect, useState, useRef } from 'react'; +import { socketService } from '../services/socketService'; + +export const useEventNotifications = (options = {}) => { + const { + eventType = 'all', + importance = 'all', + enabled = true, + onNewEvent, + } = options; + + const [isConnected, setIsConnected] = useState(false); + const [newEvent, setNewEvent] = useState(null); + const [error, setError] = useState(null); + const unsubscribeRef = useRef(null); + + useEffect(() => { + // 如果禁用,则不订阅 + if (!enabled) { + return; + } + + // 连接状态监听 + const handleConnect = () => { + setIsConnected(true); + setError(null); + }; + + const handleDisconnect = () => { + setIsConnected(false); + }; + + const handleConnectError = (err) => { + setError(err); + setIsConnected(false); + }; + + // 连接 WebSocket + socketService.connect(); + + // 监听连接事件 + socketService.on('connect', handleConnect); + socketService.on('disconnect', handleDisconnect); + socketService.on('connect_error', handleConnectError); + + // 新事件处理函数 + const handleNewEvent = (eventData) => { + setNewEvent(eventData); + + // 调用外部回调 + if (onNewEvent) { + onNewEvent(eventData); + } + }; + + // 订阅事件推送 + socketService.subscribeToEvents({ + eventType, + importance, + onNewEvent: handleNewEvent, + onSubscribed: (data) => { + console.log('订阅成功:', data); + }, + }); + + // 保存取消订阅函数 + unsubscribeRef.current = () => { + socketService.unsubscribeFromEvents({ eventType }); + }; + + // 组件卸载时清理 + return () => { + console.log('清理 WebSocket 订阅'); + + // 取消订阅 + if (unsubscribeRef.current) { + unsubscribeRef.current(); + } + + // 移除监听器 + socketService.off('connect', handleConnect); + socketService.off('disconnect', handleDisconnect); + socketService.off('connect_error', handleConnectError); + + // 断开连接 + socketService.disconnect(); + }; + }, [eventType, importance, enabled, onNewEvent]); + + return { + newEvent, // 最新收到的事件 + isConnected, // WebSocket 连接状态 + error, // 错误信息 + clearNewEvent: () => setNewEvent(null), // 清除新事件状态 + }; +}; + +/** + * 简化版 Hook:只订阅所有事件 + */ +export const useAllEventNotifications = (onNewEvent) => { + return useEventNotifications({ + eventType: 'all', + importance: 'all', + onNewEvent, + }); +}; + +/** + * Hook:订阅重要事件(S 和 A 级) + */ +export const useImportantEventNotifications = (onNewEvent) => { + const [importantEvents, setImportantEvents] = useState([]); + + const handleEvent = (event) => { + // 只处理 S 和 A 级事件 + if (event.importance === 'S' || event.importance === 'A') { + setImportantEvents(prev => [event, ...prev].slice(0, 10)); // 最多保留 10 个 + if (onNewEvent) { + onNewEvent(event); + } + } + }; + + const result = useEventNotifications({ + eventType: 'all', + importance: 'all', + onNewEvent: handleEvent, + }); + + return { + ...result, + importantEvents, + clearImportantEvents: () => setImportantEvents([]), + }; +}; + +export default useEventNotifications; diff --git a/src/services/socketService.js b/src/services/socketService.js index 32e129f0..f3be42b0 100644 --- a/src/services/socketService.js +++ b/src/services/socketService.js @@ -186,6 +186,169 @@ class SocketService { getSocketId() { return this.socket?.id || null; } + + // ==================== 事件推送专用方法 ==================== + + /** + * 订阅事件推送 + * @param {object} options - 订阅选项 + * @param {string} options.eventType - 事件类型 ('all' | 'policy' | 'market' | 'tech' | ...) + * @param {string} options.importance - 重要性 ('all' | 'S' | 'A' | 'B' | 'C') + * @param {Function} options.onNewEvent - 收到新事件时的回调函数 + * @param {Function} options.onSubscribed - 订阅成功的回调函数(可选) + */ + subscribeToEvents(options = {}) { + const { + eventType = 'all', + importance = 'all', + onNewEvent, + onSubscribed, + } = options; + + if (!this.socket || !this.connected) { + logger.warn('socketService', 'Cannot subscribe: socket not connected'); + // 自动连接 + this.connect(); + // 等待连接成功后再订阅 + this.socket.once('connect', () => { + this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); + }); + return; + } + + this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); + } + + /** + * 执行订阅操作(内部方法) + */ + _doSubscribe(eventType, importance, onNewEvent, onSubscribed) { + // 发送订阅请求 + this.emit('subscribe_events', { + event_type: eventType, + importance: importance, + }); + + // 监听订阅确认 + this.socket.once('subscription_confirmed', (data) => { + logger.info('socketService', 'Subscription confirmed', data); + if (onSubscribed) { + onSubscribed(data); + } + }); + + // 监听订阅错误 + this.socket.once('subscription_error', (error) => { + logger.error('socketService', 'Subscription error', error); + }); + + // 监听新事件推送 + if (onNewEvent) { + // 先移除之前的监听器(避免重复) + this.socket.off('new_event'); + // 添加新的监听器 + this.socket.on('new_event', (eventData) => { + logger.info('socketService', 'New event received', eventData); + onNewEvent(eventData); + }); + } + } + + /** + * 取消订阅事件推送 + * @param {object} options - 取消订阅选项 + * @param {string} options.eventType - 事件类型 + * @param {Function} options.onUnsubscribed - 取消订阅成功的回调函数(可选) + */ + unsubscribeFromEvents(options = {}) { + const { + eventType = 'all', + onUnsubscribed, + } = options; + + if (!this.socket || !this.connected) { + logger.warn('socketService', 'Cannot unsubscribe: socket not connected'); + return; + } + + // 发送取消订阅请求 + this.emit('unsubscribe_events', { + event_type: eventType, + }); + + // 监听取消订阅确认 + this.socket.once('unsubscription_confirmed', (data) => { + logger.info('socketService', 'Unsubscription confirmed', data); + + // 移除新事件监听器 + this.socket.off('new_event'); + + if (onUnsubscribed) { + onUnsubscribed(data); + } + }); + + // 监听取消订阅错误 + this.socket.once('unsubscription_error', (error) => { + logger.error('socketService', 'Unsubscription error', error); + }); + } + + /** + * 快捷方法:订阅所有类型的事件 + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToAllEvents(onNewEvent) { + this.subscribeToEvents({ + eventType: 'all', + importance: 'all', + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType: 'all' }); + }; + } + + /** + * 快捷方法:订阅特定重要性的事件 + * @param {string} importance - 重要性级别 ('S' | 'A' | 'B' | 'C') + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToImportantEvents(importance, onNewEvent) { + this.subscribeToEvents({ + eventType: 'all', + importance, + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType: 'all' }); + }; + } + + /** + * 快捷方法:订阅特定类型的事件 + * @param {string} eventType - 事件类型 + * @param {Function} onNewEvent - 收到新事件时的回调函数 + * @returns {Function} 取消订阅的函数 + */ + subscribeToEventType(eventType, onNewEvent) { + this.subscribeToEvents({ + eventType, + importance: 'all', + onNewEvent, + }); + + // 返回取消订阅的清理函数 + return () => { + this.unsubscribeFromEvents({ eventType }); + }; + } } // 导出单例