Files
vf_react/src/services/socketService.js
zdl 926ffa1b8f fix(socket): 保留暂存监听器,修复重连后事件监听器丢失问题
## 问题
生产环境 Socket 已连接且订阅成功,但收到事件时不触发通知:
- Socket 连接正常:`connected: true`
- 订阅成功:`已订阅 all 类型的事件推送`
- **但是 `new_event` 监听器未注册**:`_callbacks.$new_event: undefined`
- Network 面板显示后端推送的消息已到达

## 根本原因
`socketService.js` 的监听器注册机制有缺陷:

### 原始逻辑(有问题):
```javascript
// connect() 方法中
if (this.pendingListeners.length > 0) {
    this.pendingListeners.forEach(({ event, callback }) => {
        this.on(event, callback);  // 注册监听器
    });
    this.pendingListeners = [];  //  清空暂存队列
}
```

### 问题:
1. **首次连接**:监听器从 `pendingListeners` 注册到 Socket,然后清空队列
2. **Socket 重连**:`pendingListeners` 已被清空,无法重新注册监听器
3. **结果**:重连后 `new_event` 监听器丢失,事件无法触发

### 为什么会重连?
- 用户网络波动
- 服务器重启
- 浏览器从休眠恢复
- Socket.IO 底层重连机制

## 解决方案

### 修改 1:保留 `pendingListeners`(不清空)

**文件**:`src/services/socketService.js:54-69`

```javascript
// 注册所有暂存的事件监听器(保留 pendingListeners,不清空)
if (this.pendingListeners.length > 0) {
    console.log(`[socketService] 📦 注册 ${this.pendingListeners.length} 个暂存的事件监听器`);
    this.pendingListeners.forEach(({ event, callback }) => {
        // 直接在 Socket.IO 实例上注册(避免递归调用 this.on())
        const wrappedCallback = (...args) => {
            console.log(`%c[socketService] 🔔 收到原始事件: ${event}`, ...);
            callback(...args);
        };

        this.socket.on(event, wrappedCallback);
        console.log(`[socketService] ✓ 已注册事件监听器: ${event}`);
    });
    // ⚠️ 重要:不清空 pendingListeners,保留用于重连
}
```

**变更**:
-  删除:`this.pendingListeners = [];`
-  新增:直接在 `this.socket.on()` 上注册(避免递归)
-  保留:`pendingListeners` 数组,用于重连时重新注册

### 修改 2:避免重复注册

**文件**:`src/services/socketService.js:166-181`

```javascript
on(event, callback) {
    if (!this.socket) {
        // Socket 未初始化,暂存监听器(检查是否已存在,避免重复)
        const exists = this.pendingListeners.some(
            (listener) => listener.event === event && listener.callback === callback
        );

        if (!exists) {
            this.pendingListeners.push({ event, callback });
        } else {
            console.log(`[socketService] ⚠️ 监听器已存在,跳过: ${event}`);
        }
        return;
    }
    // ...
}
```

**变更**:
-  新增:检查监听器是否已存在(`event` 和 `callback` 都匹配)
-  避免:重复添加相同监听器到 `pendingListeners`

## 效果

### 修复前:
```
首次连接:  new_event 监听器注册
重连后:    new_event 监听器丢失
事件推送:  不触发通知
```

### 修复后:
```
首次连接:  new_event 监听器注册
重连后:    new_event 监听器自动重新注册
事件推送:  正常触发通知
```

## 验证步骤

部署后在浏览器 Console 执行:

```javascript
// 1. 检查监听器
window.socket.socket._callbacks.$new_event  // 应该有 1-2 个监听器

// 2. 手动断开重连
window.socket.disconnect();
setTimeout(() => window.socket.connect(), 1000);

// 3. 重连后再次检查
window.socket.socket._callbacks.$new_event  // 应该仍然有监听器

// 4. 等待后端推送事件,验证通知显示
```

## 影响范围
- 修改文件: `src/services/socketService.js`(1 个文件,2 处修改)
- 影响功能: Socket 事件监听器注册机制
- 风险等级: 低(只修改监听器管理逻辑,不改变业务代码)

## 相关 Issue
- 修复生产环境 Socket 事件不触发通知问题
- 解决 Socket 重连后监听器丢失问题

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 14:16:00 +08:00

521 lines
18 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/services/socketService.js
/**
* 真实 Socket.IO 服务 - 用于生产环境连接真实后端
*/
import { io } from 'socket.io-client';
import { logger } from '../utils/logger';
import { getApiBase } from '../utils/apiConfig';
const API_BASE_URL = getApiBase();
class SocketService {
constructor() {
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = Infinity; // 无限重试
this.customReconnectTimer = null; // 自定义重连定时器
this.pendingListeners = []; // 暂存等待注册的事件监听器
}
/**
* 计算指数退避延迟
* 第1次: 60秒, 第2次: 120秒, 第3次: 240秒, 第4次及以后: 240秒
*/
getReconnectionDelay(attempt) {
const delays = [60000, 120000, 240000]; // 1min, 2min, 4min
const index = Math.min(attempt - 1, delays.length - 1);
return delays[index];
}
/**
* 连接到 Socket.IO 服务器
* @param {object} options - 连接选项
*/
connect(options = {}) {
if (this.socket && this.connected) {
logger.warn('socketService', 'Already connected');
return;
}
logger.info('socketService', 'Connecting to Socket.IO server...', { url: API_BASE_URL });
// 创建 socket 连接 - 禁用 Socket.IO 自带的重连机制,使用自定义指数退避
this.socket = io(API_BASE_URL, {
transports: ['websocket', 'polling'],
reconnection: false, // 禁用自动重连,改用自定义策略
timeout: 20000,
autoConnect: true,
withCredentials: true, // 允许携带认证信息
...options,
});
// 注册所有暂存的事件监听器(保留 pendingListeners不清空
if (this.pendingListeners.length > 0) {
console.log(`[socketService] 📦 注册 ${this.pendingListeners.length} 个暂存的事件监听器`);
this.pendingListeners.forEach(({ event, callback }) => {
// 直接在 Socket.IO 实例上注册(避免递归调用 this.on()
const wrappedCallback = (...args) => {
console.log(`%c[socketService] 🔔 收到原始事件: ${event}`, 'color: #2196F3; font-weight: bold;');
console.log(`[socketService] 事件数据 (${event}):`, ...args);
callback(...args);
};
this.socket.on(event, wrappedCallback);
console.log(`[socketService] ✓ 已注册事件监听器: ${event}`);
});
// ⚠️ 重要:不清空 pendingListeners保留用于重连
}
// 监听连接成功
this.socket.on('connect', () => {
this.connected = true;
this.reconnectAttempts = 0;
// 清除自定义重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
logger.info('socketService', 'Socket.IO connected successfully', {
socketId: this.socket.id,
});
console.log(`%c[socketService] ✅ WebSocket 已连接`, 'color: #4CAF50; font-weight: bold;');
console.log('[socketService] Socket ID:', this.socket.id);
// ⚠️ 已移除自动订阅,让 NotificationContext 负责订阅
// this.subscribeToAllEvents();
});
// 监听断开连接
this.socket.on('disconnect', (reason) => {
const wasConnected = this.connected;
this.connected = false;
logger.warn('socketService', 'Socket.IO disconnected', { reason });
// 如果是意外断开(非主动断开),触发自定义重连
if (wasConnected && reason !== 'io client disconnect') {
this.scheduleReconnection();
}
});
// 监听连接错误
this.socket.on('connect_error', (error) => {
this.reconnectAttempts++;
logger.error('socketService', 'connect_error', error, {
attempts: this.reconnectAttempts,
});
// 使用指数退避策略安排下次重连
this.scheduleReconnection();
});
}
/**
* 使用指数退避策略安排重连
*/
scheduleReconnection() {
// 清除之前的定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
}
const delay = this.getReconnectionDelay(this.reconnectAttempts);
logger.info('socketService', `Scheduling reconnection in ${delay / 1000}s (attempt ${this.reconnectAttempts})`);
this.customReconnectTimer = setTimeout(() => {
if (!this.connected && this.socket) {
logger.info('socketService', 'Attempting reconnection...', {
attempt: this.reconnectAttempts,
});
this.socket.connect();
}
}, delay);
}
/**
* 断开连接
*/
disconnect() {
if (!this.socket) {
return;
}
logger.info('socketService', 'Disconnecting from Socket.IO server...');
// 清除自定义重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
this.socket.disconnect();
this.socket = null;
this.connected = false;
this.reconnectAttempts = 0;
}
/**
* 监听事件
* @param {string} event - 事件名称
* @param {Function} callback - 回调函数
*/
on(event, callback) {
if (!this.socket) {
// Socket 未初始化,暂存监听器(检查是否已存在,避免重复)
const exists = this.pendingListeners.some(
(listener) => listener.event === event && listener.callback === callback
);
if (!exists) {
logger.info('socketService', 'Socket not ready, queuing listener', { event });
console.log(`[socketService] 📦 Socket 未初始化,暂存事件监听器: ${event}`);
this.pendingListeners.push({ event, callback });
} else {
console.log(`[socketService] ⚠️ 监听器已存在,跳过: ${event}`);
}
return;
}
// 包装回调函数,添加日志
const wrappedCallback = (...args) => {
console.log(`%c[socketService] 🔔 收到原始事件: ${event}`, 'color: #2196F3; font-weight: bold;');
console.log(`[socketService] 事件数据 (${event}):`, ...args);
callback(...args);
};
this.socket.on(event, wrappedCallback);
logger.info('socketService', `Event listener added: ${event}`);
console.log(`[socketService] ✓ 已注册事件监听器: ${event}`);
}
/**
* 移除事件监听
* @param {string} event - 事件名称
* @param {Function} callback - 回调函数(可选)
*/
off(event, callback) {
if (!this.socket) {
return;
}
if (callback) {
this.socket.off(event, callback);
} else {
this.socket.off(event);
}
logger.info('socketService', `Event listener removed: ${event}`);
}
/**
* 发送消息到服务器
* @param {string} event - 事件名称
* @param {*} data - 发送的数据
* @param {Function} callback - 确认回调(可选)
*/
emit(event, data, callback) {
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot emit: socket not connected', { event, data });
return;
}
if (callback) {
this.socket.emit(event, data, callback);
} else {
this.socket.emit(event, data);
}
logger.info('socketService', `Event emitted: ${event}`, data);
}
/**
* 加入房间
* @param {string} room - 房间名称
*/
joinRoom(room) {
this.emit('join_room', { room });
}
/**
* 离开房间
* @param {string} room - 房间名称
*/
leaveRoom(room) {
this.emit('leave_room', { room });
}
/**
* 获取连接状态
*/
isConnected() {
return this.connected;
}
/**
* 获取 Socket ID
*/
getSocketId() {
return this.socket?.id || null;
}
/**
* 手动重连
* @returns {boolean} 是否触发重连
*/
reconnect() {
if (!this.socket) {
logger.warn('socketService', 'Cannot reconnect: socket not initialized');
return false;
}
if (this.connected) {
logger.info('socketService', 'Already connected, no need to reconnect');
return false;
}
logger.info('socketService', 'Manually triggering reconnection...');
// 清除自动重连定时器
if (this.customReconnectTimer) {
clearTimeout(this.customReconnectTimer);
this.customReconnectTimer = null;
}
// 重置重连计数
this.reconnectAttempts = 0;
// 立即触发重连
this.socket.connect();
return true;
}
/**
* 获取当前重连尝试次数
*/
getReconnectAttempts() {
return this.reconnectAttempts;
}
/**
* 获取最大重连次数
*/
getMaxReconnectAttempts() {
return this.maxReconnectAttempts;
}
// ==================== 事件推送专用方法 ====================
/**
* 订阅事件推送
* @param {object} options - 订阅选项
* @param {string} options.eventType - 事件类型 ('all' | 'policy' | 'market' | 'tech' | ...)
* @param {string} options.importance - 重要性 ('all' | 'S' | 'A' | 'B' | 'C')
* @param {Function} options.onNewEvent - 收到新事件时的回调函数
* @param {Function} options.onSubscribed - 订阅成功的回调函数(可选)
*/
subscribeToEvents(options = {}) {
const {
eventType = 'all',
importance = 'all',
onNewEvent,
onSubscribed,
} = options;
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot subscribe: socket not connected');
// 自动连接
this.connect();
// 等待连接成功后再订阅
this.socket.once('connect', () => {
this._doSubscribe(eventType, importance, onNewEvent, onSubscribed);
});
return;
}
this._doSubscribe(eventType, importance, onNewEvent, onSubscribed);
}
/**
* 执行订阅操作(内部方法)
*/
_doSubscribe(eventType, importance, onNewEvent, onSubscribed) {
console.log('\n========== [SocketService DEBUG] 开始订阅 ==========');
console.log('[SocketService DEBUG] 事件类型:', eventType);
console.log('[SocketService DEBUG] 重要性:', importance);
console.log('[SocketService DEBUG] Socket 连接状态:', this.connected);
console.log('[SocketService DEBUG] Socket ID:', this.socket?.id);
// 发送订阅请求
const subscribeData = {
event_type: eventType,
importance: importance,
};
console.log('[SocketService DEBUG] 准备发送 subscribe_events:', subscribeData);
this.emit('subscribe_events', subscribeData);
console.log('[SocketService DEBUG] ✓ 已发送 subscribe_events');
// 监听订阅确认
this.socket.once('subscription_confirmed', (data) => {
console.log('\n[SocketService DEBUG] ========== 收到订阅确认 ==========');
console.log('[SocketService DEBUG] 订阅确认数据:', data);
logger.info('socketService', 'Subscription confirmed', data);
if (onSubscribed) {
console.log('[SocketService DEBUG] 调用 onSubscribed 回调');
onSubscribed(data);
}
console.log('[SocketService DEBUG] ========== 订阅确认处理完成 ==========\n');
});
// 监听订阅错误
this.socket.once('subscription_error', (error) => {
console.error('\n[SocketService ERROR] ========== 订阅错误 ==========');
console.error('[SocketService ERROR] 错误信息:', error);
logger.error('socketService', 'Subscription error', error);
console.error('[SocketService ERROR] ========== 订阅错误处理完成 ==========\n');
});
// 监听新事件推送
// ⚠️ 注意:不要移除其他地方注册的 new_event 监听器(如 NotificationContext
// 多个监听器可以共存,都会被触发
if (onNewEvent) {
console.log('[SocketService DEBUG] 设置 new_event 监听器');
// ⚠️ 已移除 this.socket.off('new_event'),允许多个监听器共存
// 添加新的监听器(与其他监听器共存)
this.socket.on('new_event', (eventData) => {
console.log('\n[SocketService DEBUG] ========== 收到新事件推送 ==========');
console.log('[SocketService DEBUG] 事件数据:', eventData);
console.log('[SocketService DEBUG] 事件 ID:', eventData?.id);
console.log('[SocketService DEBUG] 事件标题:', eventData?.title);
logger.info('socketService', 'New event received', eventData);
console.log('[SocketService DEBUG] 准备调用 onNewEvent 回调');
onNewEvent(eventData);
console.log('[SocketService DEBUG] ✓ onNewEvent 回调已调用');
console.log('[SocketService DEBUG] ========== 新事件处理完成 ==========\n');
});
console.log('[SocketService DEBUG] ✓ new_event 监听器已设置(与其他监听器共存)');
}
console.log('[SocketService DEBUG] ========== 订阅完成 ==========\n');
}
/**
* 取消订阅事件推送
* @param {object} options - 取消订阅选项
* @param {string} options.eventType - 事件类型
* @param {Function} options.onUnsubscribed - 取消订阅成功的回调函数(可选)
*/
unsubscribeFromEvents(options = {}) {
const {
eventType = 'all',
onUnsubscribed,
} = options;
if (!this.socket || !this.connected) {
logger.warn('socketService', 'Cannot unsubscribe: socket not connected');
return;
}
// 发送取消订阅请求
this.emit('unsubscribe_events', {
event_type: eventType,
});
// 监听取消订阅确认
this.socket.once('unsubscription_confirmed', (data) => {
logger.info('socketService', 'Unsubscription confirmed', data);
// 移除新事件监听器
this.socket.off('new_event');
if (onUnsubscribed) {
onUnsubscribed(data);
}
});
// 监听取消订阅错误
this.socket.once('unsubscription_error', (error) => {
logger.error('socketService', 'Unsubscription error', error);
});
}
/**
* 快捷方法:订阅所有类型的事件
* @param {Function} onNewEvent - 收到新事件时的回调函数(可选)
* @returns {Function} 取消订阅的函数
*/
subscribeToAllEvents(onNewEvent) {
console.log('%c[socketService] 🔔 自动订阅所有事件...', 'color: #FF9800; font-weight: bold;');
// 如果没有提供回调,添加一个默认的日志回调
const defaultCallback = (event) => {
console.log('%c[socketService] 📨 收到新事件(默认回调)', 'color: #4CAF50; font-weight: bold;');
console.log('[socketService] 事件数据:', event);
};
this.subscribeToEvents({
eventType: 'all',
importance: 'all',
onNewEvent: onNewEvent || defaultCallback,
onSubscribed: (data) => {
console.log('%c[socketService] ✅ 订阅成功!', 'color: #4CAF50; font-weight: bold;');
console.log('[socketService] 订阅确认:', data);
},
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType: 'all' });
};
}
/**
* 快捷方法:订阅特定重要性的事件
* @param {string} importance - 重要性级别 ('S' | 'A' | 'B' | 'C')
* @param {Function} onNewEvent - 收到新事件时的回调函数
* @returns {Function} 取消订阅的函数
*/
subscribeToImportantEvents(importance, onNewEvent) {
this.subscribeToEvents({
eventType: 'all',
importance,
onNewEvent,
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType: 'all' });
};
}
/**
* 快捷方法:订阅特定类型的事件
* @param {string} eventType - 事件类型
* @param {Function} onNewEvent - 收到新事件时的回调函数
* @returns {Function} 取消订阅的函数
*/
subscribeToEventType(eventType, onNewEvent) {
this.subscribeToEvents({
eventType,
importance: 'all',
onNewEvent,
});
// 返回取消订阅的清理函数
return () => {
this.unsubscribeFromEvents({ eventType });
};
}
}
// 导出单例
export const socketService = new SocketService();
export default socketService;