Files
vf_react/src/services/socketService.js

465 lines
14 KiB
JavaScript

// src/services/socketService.js
/**
* 真实 Socket.IO 服务 - 用于生产环境连接真实后端
*/
import { io } from 'socket.io-client';
import { logger } from '../utils/logger';
import { getApiBase } from '../utils/apiConfig';
const API_BASE_URL = getApiBase();
class SocketService {
constructor() {
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = Infinity; // 无限重试
this.customReconnectTimer = null; // 自定义重连定时器
}
/**
* 计算指数退避延迟
* 第1次: 60秒, 第2次: 120秒, 第3次: 240秒, 第4次及以后: 240秒
*/
getReconnectionDelay(attempt) {
const delays = [60000, 120000, 240000]; // 1min, 2min, 4min
const index = Math.min(attempt - 1, delays.length - 1);
return delays[index];
}
/**
* 连接到 Socket.IO 服务器
* @param {object} options - 连接选项
*/
connect(options = {}) {
if (this.socket && this.connected) {
logger.warn('socketService', 'Already connected');
return;
}
logger.info('socketService', 'Connecting to Socket.IO server...', { url: API_BASE_URL });
// 创建 socket 连接 - 禁用 Socket.IO 自带的重连机制,使用自定义指数退避
this.socket = io(API_BASE_URL, {
transports: ['websocket', 'polling'],
reconnection: false, // 禁用自动重连,改用自定义策略
timeout: 20000,
autoConnect: true,
withCredentials: true, // 允许携带认证信息
...options,
});
// 监听连接成功
this.socket.on('connect', () => {
this.connected = true;
this.reconnectAttempts = 0;
// 清除自定义重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
logger.info('socketService', 'Socket.IO connected successfully', {
socketId: this.socket.id,
});
});
// 监听断开连接
this.socket.on('disconnect', (reason) => {
const wasConnected = this.connected;
this.connected = false;
logger.warn('socketService', 'Socket.IO disconnected', { reason });
// 如果是意外断开(非主动断开),触发自定义重连
if (wasConnected && reason !== 'io client disconnect') {
this.scheduleReconnection();
}
});
// 监听连接错误
this.socket.on('connect_error', (error) => {
this.reconnectAttempts++;
logger.error('socketService', 'connect_error', error, {
attempts: this.reconnectAttempts,
});
// 使用指数退避策略安排下次重连
this.scheduleReconnection();
});
}
/**
* 使用指数退避策略安排重连
*/
scheduleReconnection() {
// 清除之前的定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
}
const delay = this.getReconnectionDelay(this.reconnectAttempts);
logger.info('socketService', `Scheduling reconnection in ${delay / 1000}s (attempt ${this.reconnectAttempts})`);
this.customReconnectTimer = setTimeout(() => {
if (!this.connected && this.socket) {
logger.info('socketService', 'Attempting reconnection...', {
attempt: this.reconnectAttempts,
});
this.socket.connect();
}
}, delay);
}
/**
* 断开连接
*/
disconnect() {
if (!this.socket) {
return;
}
logger.info('socketService', 'Disconnecting from Socket.IO server...');
// 清除自定义重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
this.socket.disconnect();
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
}
/**
* 监听事件
* @param {string} event - 事件名称
* @param {Function} callback - 回调函数
*/
on(event, callback) {
if (!this.socket) {
logger.warn('socketService', 'Cannot listen to event: socket not initialized', { event });
return;
}
this.socket.on(event, callback);
logger.info('socketService', `Event listener added: ${event}`);
}
/**
* 移除事件监听
* @param {string} event - 事件名称
* @param {Function} callback - 回调函数(可选)
*/
off(event, callback) {
if (!this.socket) {
return;
}
if (callback) {
this.socket.off(event, callback);
} else {
this.socket.off(event);
}
logger.info('socketService', `Event listener removed: ${event}`);
}
/**
* 发送消息到服务器
* @param {string} event - 事件名称
* @param {*} data - 发送的数据
* @param {Function} callback - 确认回调(可选)
*/
emit(event, data, callback) {
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot emit: socket not connected', { event, data });
return;
}
if (callback) {
this.socket.emit(event, data, callback);
} else {
this.socket.emit(event, data);
}
logger.info('socketService', `Event emitted: ${event}`, data);
}
/**
* 加入房间
* @param {string} room - 房间名称
*/
joinRoom(room) {
this.emit('join_room', { room });
}
/**
* 离开房间
* @param {string} room - 房间名称
*/
leaveRoom(room) {
this.emit('leave_room', { room });
}
/**
* 获取连接状态
*/
isConnected() {
return this.connected;
}
/**
* 获取 Socket ID
*/
getSocketId() {
return this.socket?.id || null;
}
/**
* 手动重连
* @returns {boolean} 是否触发重连
*/
reconnect() {
if (!this.socket) {
logger.warn('socketService', 'Cannot reconnect: socket not initialized');
return false;
}
if (this.connected) {
logger.info('socketService', 'Already connected, no need to reconnect');
return false;
}
logger.info('socketService', 'Manually triggering reconnection...');
// 清除自动重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
// 重置重连计数
this.reconnectAttempts = 0;
// 立即触发重连
this.socket.connect();
return true;
}
/**
* 获取当前重连尝试次数
*/
getReconnectAttempts() {
return this.reconnectAttempts;
}
/**
* 获取最大重连次数
*/
getMaxReconnectAttempts() {
return this.maxReconnectAttempts;
}
// ==================== 事件推送专用方法 ====================
/**
* 订阅事件推送
* @param {object} options - 订阅选项
* @param {string} options.eventType - 事件类型 ('all' | 'policy' | 'market' | 'tech' | ...)
* @param {string} options.importance - 重要性 ('all' | 'S' | 'A' | 'B' | 'C')
* @param {Function} options.onNewEvent - 收到新事件时的回调函数
* @param {Function} options.onSubscribed - 订阅成功的回调函数(可选)
*/
subscribeToEvents(options = {}) {
const {
eventType = 'all',
importance = 'all',
onNewEvent,
onSubscribed,
} = options;
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot subscribe: socket not connected');
// 自动连接
this.connect();
// 等待连接成功后再订阅
this.socket.once('connect', () => {
this._doSubscribe(eventType, importance, onNewEvent, onSubscribed);
});
return;
}
this._doSubscribe(eventType, importance, onNewEvent, onSubscribed);
}
/**
* 执行订阅操作(内部方法)
*/
_doSubscribe(eventType, importance, onNewEvent, onSubscribed) {
console.log('\n========== [SocketService DEBUG] 开始订阅 ==========');
console.log('[SocketService DEBUG] 事件类型:', eventType);
console.log('[SocketService DEBUG] 重要性:', importance);
console.log('[SocketService DEBUG] Socket 连接状态:', this.connected);
console.log('[SocketService DEBUG] Socket ID:', this.socket?.id);
// 发送订阅请求
const subscribeData = {
event_type: eventType,
importance: importance,
};
console.log('[SocketService DEBUG] 准备发送 subscribe_events:', subscribeData);
this.emit('subscribe_events', subscribeData);
console.log('[SocketService DEBUG] ✓ 已发送 subscribe_events');
// 监听订阅确认
this.socket.once('subscription_confirmed', (data) => {
console.log('\n[SocketService DEBUG] ========== 收到订阅确认 ==========');
console.log('[SocketService DEBUG] 订阅确认数据:', data);
logger.info('socketService', 'Subscription confirmed', data);
if (onSubscribed) {
console.log('[SocketService DEBUG] 调用 onSubscribed 回调');
onSubscribed(data);
}
console.log('[SocketService DEBUG] ========== 订阅确认处理完成 ==========\n');
});
// 监听订阅错误
this.socket.once('subscription_error', (error) => {
console.error('\n[SocketService ERROR] ========== 订阅错误 ==========');
console.error('[SocketService ERROR] 错误信息:', error);
logger.error('socketService', 'Subscription error', error);
console.error('[SocketService ERROR] ========== 订阅错误处理完成 ==========\n');
});
// 监听新事件推送
if (onNewEvent) {
console.log('[SocketService DEBUG] 设置 new_event 监听器');
// 先移除之前的监听器(避免重复)
this.socket.off('new_event');
console.log('[SocketService DEBUG] ✓ 已移除旧的 new_event 监听器');
// 添加新的监听器
this.socket.on('new_event', (eventData) => {
console.log('\n[SocketService DEBUG] ========== 收到新事件推送 ==========');
console.log('[SocketService DEBUG] 事件数据:', eventData);
console.log('[SocketService DEBUG] 事件 ID:', eventData?.id);
console.log('[SocketService DEBUG] 事件标题:', eventData?.title);
logger.info('socketService', 'New event received', eventData);
console.log('[SocketService DEBUG] 准备调用 onNewEvent 回调');
onNewEvent(eventData);
console.log('[SocketService DEBUG] ✓ onNewEvent 回调已调用');
console.log('[SocketService DEBUG] ========== 新事件处理完成 ==========\n');
});
console.log('[SocketService DEBUG] ✓ new_event 监听器已设置');
}
console.log('[SocketService DEBUG] ========== 订阅完成 ==========\n');
}
/**
* 取消订阅事件推送
* @param {object} options - 取消订阅选项
* @param {string} options.eventType - 事件类型
* @param {Function} options.onUnsubscribed - 取消订阅成功的回调函数(可选)
*/
unsubscribeFromEvents(options = {}) {
const {
eventType = 'all',
onUnsubscribed,
} = options;
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot unsubscribe: socket not connected');
return;
}
// 发送取消订阅请求
this.emit('unsubscribe_events', {
event_type: eventType,
});
// 监听取消订阅确认
this.socket.once('unsubscription_confirmed', (data) => {
logger.info('socketService', 'Unsubscription confirmed', data);
// 移除新事件监听器
this.socket.off('new_event');
if (onUnsubscribed) {
onUnsubscribed(data);
}
});
// 监听取消订阅错误
this.socket.once('unsubscription_error', (error) => {
logger.error('socketService', 'Unsubscription error', error);
});
}
/**
* 快捷方法:订阅所有类型的事件
* @param {Function} onNewEvent - 收到新事件时的回调函数
* @returns {Function} 取消订阅的函数
*/
subscribeToAllEvents(onNewEvent) {
this.subscribeToEvents({
eventType: 'all',
importance: 'all',
onNewEvent,
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType: 'all' });
};
}
/**
* 快捷方法:订阅特定重要性的事件
* @param {string} importance - 重要性级别 ('S' | 'A' | 'B' | 'C')
* @param {Function} onNewEvent - 收到新事件时的回调函数
* @returns {Function} 取消订阅的函数
*/
subscribeToImportantEvents(importance, onNewEvent) {
this.subscribeToEvents({
eventType: 'all',
importance,
onNewEvent,
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType: 'all' });
};
}
/**
* 快捷方法:订阅特定类型的事件
* @param {string} eventType - 事件类型
* @param {Function} onNewEvent - 收到新事件时的回调函数
* @returns {Function} 取消订阅的函数
*/
subscribeToEventType(eventType, onNewEvent) {
this.subscribeToEvents({
eventType,
importance: 'all',
onNewEvent,
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType });
};
}
}
// 导出单例
export const socketService = new SocketService();
export default socketService;