// src/services/socketService.js /** * 真实 Socket.IO 服务 - 用于生产环境连接真实后端 */ import { io } from 'socket.io-client'; import { logger } from '../utils/logger'; import { getApiBase } from '../utils/apiConfig'; const API_BASE_URL = getApiBase(); class SocketService { constructor() { this.socket = null; this.connected = false; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; } /** * 连接到 Socket.IO 服务器 * @param {object} options - 连接选项 */ connect(options = {}) { if (this.socket && this.connected) { logger.warn('socketService', 'Already connected'); return; } logger.info('socketService', 'Connecting to Socket.IO server...', { url: API_BASE_URL }); // 创建 socket 连接 this.socket = io(API_BASE_URL, { transports: ['websocket', 'polling'], reconnection: true, reconnectionDelay: 1000, reconnectionDelayMax: 5000, reconnectionAttempts: this.maxReconnectAttempts, timeout: 20000, autoConnect: true, withCredentials: true, // 允许携带认证信息 ...options, }); // 监听连接成功 this.socket.on('connect', () => { this.connected = true; this.reconnectAttempts = 0; logger.info('socketService', 'Socket.IO connected successfully', { socketId: this.socket.id, }); }); // 监听断开连接 this.socket.on('disconnect', (reason) => { this.connected = false; logger.warn('socketService', 'Socket.IO disconnected', { reason }); }); // 监听连接错误 this.socket.on('connect_error', (error) => { this.reconnectAttempts++; logger.error('socketService', 'connect_error', error, { attempts: this.reconnectAttempts, maxAttempts: this.maxReconnectAttempts, }); if (this.reconnectAttempts >= this.maxReconnectAttempts) { logger.error('socketService', 'Max reconnection attempts reached'); this.socket.close(); } }); // 监听重连尝试 this.socket.io.on('reconnect_attempt', (attemptNumber) => { logger.info('socketService', 'Reconnection attempt', { attemptNumber }); }); // 监听重连成功 this.socket.io.on('reconnect', (attemptNumber) => { this.reconnectAttempts = 0; logger.info('socketService', 'Reconnected successfully', { attemptNumber }); }); // 监听重连失败 this.socket.io.on('reconnect_failed', () => { logger.error('socketService', 'Reconnection failed after max attempts'); }); } /** * 断开连接 */ disconnect() { if (!this.socket) { return; } logger.info('socketService', 'Disconnecting from Socket.IO server...'); this.socket.disconnect(); this.socket = null; this.connected = false; } /** * 监听事件 * @param {string} event - 事件名称 * @param {Function} callback - 回调函数 */ on(event, callback) { if (!this.socket) { logger.warn('socketService', 'Cannot listen to event: socket not initialized', { event }); return; } this.socket.on(event, callback); logger.info('socketService', `Event listener added: ${event}`); } /** * 移除事件监听 * @param {string} event - 事件名称 * @param {Function} callback - 回调函数(可选) */ off(event, callback) { if (!this.socket) { return; } if (callback) { this.socket.off(event, callback); } else { this.socket.off(event); } logger.info('socketService', `Event listener removed: ${event}`); } /** * 发送消息到服务器 * @param {string} event - 事件名称 * @param {*} data - 发送的数据 * @param {Function} callback - 确认回调(可选) */ emit(event, data, callback) { if (!this.socket || !this.connected) { logger.warn('socketService', 'Cannot emit: socket not connected', { event, data }); return; } if (callback) { this.socket.emit(event, data, callback); } else { this.socket.emit(event, data); } logger.info('socketService', `Event emitted: ${event}`, data); } /** * 加入房间 * @param {string} room - 房间名称 */ joinRoom(room) { this.emit('join_room', { room }); } /** * 离开房间 * @param {string} room - 房间名称 */ leaveRoom(room) { this.emit('leave_room', { room }); } /** * 获取连接状态 */ isConnected() { return this.connected; } /** * 获取 Socket ID */ getSocketId() { return this.socket?.id || null; } /** * 手动重连 * @returns {boolean} 是否触发重连 */ reconnect() { if (!this.socket) { logger.warn('socketService', 'Cannot reconnect: socket not initialized'); return false; } if (this.connected) { logger.info('socketService', 'Already connected, no need to reconnect'); return false; } logger.info('socketService', 'Manually triggering reconnection...'); // 重置重连计数 this.reconnectAttempts = 0; // 触发重连 this.socket.connect(); return true; } /** * 获取当前重连尝试次数 */ getReconnectAttempts() { return this.reconnectAttempts; } /** * 获取最大重连次数 */ getMaxReconnectAttempts() { return this.maxReconnectAttempts; } // ==================== 事件推送专用方法 ==================== /** * 订阅事件推送 * @param {object} options - 订阅选项 * @param {string} options.eventType - 事件类型 ('all' | 'policy' | 'market' | 'tech' | ...) * @param {string} options.importance - 重要性 ('all' | 'S' | 'A' | 'B' | 'C') * @param {Function} options.onNewEvent - 收到新事件时的回调函数 * @param {Function} options.onSubscribed - 订阅成功的回调函数(可选) */ subscribeToEvents(options = {}) { const { eventType = 'all', importance = 'all', onNewEvent, onSubscribed, } = options; if (!this.socket || !this.connected) { logger.warn('socketService', 'Cannot subscribe: socket not connected'); // 自动连接 this.connect(); // 等待连接成功后再订阅 this.socket.once('connect', () => { this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); }); return; } this._doSubscribe(eventType, importance, onNewEvent, onSubscribed); } /** * 执行订阅操作(内部方法) */ _doSubscribe(eventType, importance, onNewEvent, onSubscribed) { // 发送订阅请求 this.emit('subscribe_events', { event_type: eventType, importance: importance, }); // 监听订阅确认 this.socket.once('subscription_confirmed', (data) => { logger.info('socketService', 'Subscription confirmed', data); if (onSubscribed) { onSubscribed(data); } }); // 监听订阅错误 this.socket.once('subscription_error', (error) => { logger.error('socketService', 'Subscription error', error); }); // 监听新事件推送 if (onNewEvent) { // 先移除之前的监听器(避免重复) this.socket.off('new_event'); // 添加新的监听器 this.socket.on('new_event', (eventData) => { logger.info('socketService', 'New event received', eventData); onNewEvent(eventData); }); } } /** * 取消订阅事件推送 * @param {object} options - 取消订阅选项 * @param {string} options.eventType - 事件类型 * @param {Function} options.onUnsubscribed - 取消订阅成功的回调函数(可选) */ unsubscribeFromEvents(options = {}) { const { eventType = 'all', onUnsubscribed, } = options; if (!this.socket || !this.connected) { logger.warn('socketService', 'Cannot unsubscribe: socket not connected'); return; } // 发送取消订阅请求 this.emit('unsubscribe_events', { event_type: eventType, }); // 监听取消订阅确认 this.socket.once('unsubscription_confirmed', (data) => { logger.info('socketService', 'Unsubscription confirmed', data); // 移除新事件监听器 this.socket.off('new_event'); if (onUnsubscribed) { onUnsubscribed(data); } }); // 监听取消订阅错误 this.socket.once('unsubscription_error', (error) => { logger.error('socketService', 'Unsubscription error', error); }); } /** * 快捷方法:订阅所有类型的事件 * @param {Function} onNewEvent - 收到新事件时的回调函数 * @returns {Function} 取消订阅的函数 */ subscribeToAllEvents(onNewEvent) { this.subscribeToEvents({ eventType: 'all', importance: 'all', onNewEvent, }); // 返回取消订阅的清理函数 return () => { this.unsubscribeFromEvents({ eventType: 'all' }); }; } /** * 快捷方法:订阅特定重要性的事件 * @param {string} importance - 重要性级别 ('S' | 'A' | 'B' | 'C') * @param {Function} onNewEvent - 收到新事件时的回调函数 * @returns {Function} 取消订阅的函数 */ subscribeToImportantEvents(importance, onNewEvent) { this.subscribeToEvents({ eventType: 'all', importance, onNewEvent, }); // 返回取消订阅的清理函数 return () => { this.unsubscribeFromEvents({ eventType: 'all' }); }; } /** * 快捷方法:订阅特定类型的事件 * @param {string} eventType - 事件类型 * @param {Function} onNewEvent - 收到新事件时的回调函数 * @returns {Function} 取消订阅的函数 */ subscribeToEventType(eventType, onNewEvent) { this.subscribeToEvents({ eventType, importance: 'all', onNewEvent, }); // 返回取消订阅的清理函数 return () => { this.unsubscribeFromEvents({ eventType }); }; } } // 导出单例 export const socketService = new SocketService(); export default socketService;