/** * 实时行情 Hook * 管理上交所和深交所 WebSocket 连接,获取实时行情数据 * * 上交所 (SSE): ws://49.232.185.254:8765 - 需主动订阅,提供五档行情 * 深交所 (SZSE): ws://222.128.1.157:8765 - 自动推送,提供十档行情 * * 深交所支持的数据类型 (category): * - stock (300111): 股票快照,含10档买卖盘 * - bond (300211): 债券快照 * - afterhours_block (300611): 盘后定价大宗交易 * - afterhours_trading (303711): 盘后定价交易 * - hk_stock (306311): 港股快照(深港通) * - index (309011): 指数快照 * - volume_stats (309111): 成交量统计 * - fund_nav (309211): 基金净值 */ import { useState, useEffect, useRef, useCallback } from 'react'; import { logger } from '@utils/logger'; // WebSocket 地址配置 const WS_CONFIG = { SSE: 'ws://49.232.185.254:8765', // 上交所 SZSE: 'ws://222.128.1.157:8765', // 深交所 }; // 心跳间隔 (ms) const HEARTBEAT_INTERVAL = 30000; // 重连间隔 (ms) const RECONNECT_INTERVAL = 3000; /** * 判断证券代码属于哪个交易所 * @param {string} code - 证券代码(可带或不带后缀) * @returns {'SSE'|'SZSE'} 交易所标识 */ const getExchange = (code) => { const baseCode = code.split('.')[0]; // 6开头为上海股票 if (baseCode.startsWith('6')) { return 'SSE'; } // 000开头的6位数可能是上证指数或深圳股票 if (baseCode.startsWith('000') && baseCode.length === 6) { // 000001-000999 是上证指数范围,但 000001 也是平安银行 // 这里需要更精确的判断,暂时把 000 开头当深圳 return 'SZSE'; } // 399开头是深证指数 if (baseCode.startsWith('399')) { return 'SZSE'; } // 0、3开头是深圳股票 if (baseCode.startsWith('0') || baseCode.startsWith('3')) { return 'SZSE'; } // 5开头是上海 ETF if (baseCode.startsWith('5')) { return 'SSE'; } // 1开头是深圳 ETF/债券 if (baseCode.startsWith('1')) { return 'SZSE'; } // 默认上海 return 'SSE'; }; /** * 标准化证券代码为无后缀格式 */ const normalizeCode = (code) => { return code.split('.')[0]; }; /** * 从深交所 bids/asks 数组提取价格和量数组 * @param {Array} orderBook - [{price, volume}, ...] * @returns {{ prices: number[], volumes: number[] }} */ const extractOrderBook = (orderBook) => { if (!orderBook || !Array.isArray(orderBook)) { return { prices: [], volumes: [] }; } const prices = orderBook.map(item => item.price || 0); const volumes = orderBook.map(item => item.volume || 0); return { prices, volumes }; }; /** * 实时行情 Hook * @param {string[]} codes - 订阅的证券代码列表 * @returns {Object} { quotes, connected, subscribe, unsubscribe } */ export const useRealtimeQuote = (codes = []) => { // 行情数据 { [code]: QuoteData } const [quotes, setQuotes] = useState({}); // 连接状态 { SSE: boolean, SZSE: boolean } const [connected, setConnected] = useState({ SSE: false, SZSE: false }); // WebSocket 实例引用 const wsRefs = useRef({ SSE: null, SZSE: null }); // 心跳定时器 const heartbeatRefs = useRef({ SSE: null, SZSE: null }); // 重连定时器 const reconnectRefs = useRef({ SSE: null, SZSE: null }); // 当前订阅的代码(按交易所分组) const subscribedCodes = useRef({ SSE: new Set(), SZSE: new Set() }); /** * 创建 WebSocket 连接 */ const createConnection = useCallback((exchange) => { // 清理现有连接 if (wsRefs.current[exchange]) { wsRefs.current[exchange].close(); } const ws = new WebSocket(WS_CONFIG[exchange]); wsRefs.current[exchange] = ws; ws.onopen = () => { logger.info('FlexScreen', `${exchange} WebSocket 已连接`); setConnected(prev => ({ ...prev, [exchange]: true })); // 上交所需要主动订阅 if (exchange === 'SSE') { const codes = Array.from(subscribedCodes.current.SSE); if (codes.length > 0) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: codes, })); } } // 启动心跳 startHeartbeat(exchange); }; ws.onmessage = (event) => { try { const msg = JSON.parse(event.data); handleMessage(exchange, msg); } catch (e) { logger.warn('FlexScreen', `${exchange} 消息解析失败`, e); } }; ws.onerror = (error) => { logger.error('FlexScreen', `${exchange} WebSocket 错误`, error); }; ws.onclose = () => { logger.info('FlexScreen', `${exchange} WebSocket 断开`); setConnected(prev => ({ ...prev, [exchange]: false })); stopHeartbeat(exchange); // 自动重连 scheduleReconnect(exchange); }; }, []); /** * 处理 WebSocket 消息 */ const handleMessage = useCallback((exchange, msg) => { // 处理 pong if (msg.type === 'pong') { return; } if (exchange === 'SSE') { // 上交所消息格式 if (msg.type === 'stock' || msg.type === 'index') { const data = msg.data || {}; setQuotes(prev => { const updated = { ...prev }; Object.entries(data).forEach(([code, quote]) => { // 只更新订阅的代码 if (subscribedCodes.current.SSE.has(code)) { updated[code] = { code: quote.security_id, name: quote.security_name, price: quote.last_price, prevClose: quote.prev_close, open: quote.open_price, high: quote.high_price, low: quote.low_price, volume: quote.volume, amount: quote.amount, change: quote.last_price - quote.prev_close, changePct: quote.prev_close ? ((quote.last_price - quote.prev_close) / quote.prev_close * 100) : 0, bidPrices: quote.bid_prices || [], bidVolumes: quote.bid_volumes || [], askPrices: quote.ask_prices || [], askVolumes: quote.ask_volumes || [], updateTime: quote.trade_time, exchange: 'SSE', }; } }); return updated; }); } } else if (exchange === 'SZSE') { // 深交所消息格式(更新后的 API) if (msg.type === 'realtime') { const { category, data } = msg; const code = data.security_id; // 只更新订阅的代码 if (!subscribedCodes.current.SZSE.has(code)) { return; } if (category === 'stock') { // 股票行情 - 含 10 档买卖盘 const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(data.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(data.asks); setQuotes(prev => ({ ...prev, [code]: { code: code, name: prev[code]?.name || '', price: data.last_px, prevClose: data.prev_close, open: data.open_px, high: data.high_px, low: data.low_px, volume: data.volume, amount: data.amount, numTrades: data.num_trades, upperLimit: data.upper_limit, // 涨停价 lowerLimit: data.lower_limit, // 跌停价 change: data.last_px - data.prev_close, changePct: data.prev_close ? ((data.last_px - data.prev_close) / data.prev_close * 100) : 0, bidPrices, bidVolumes, askPrices, askVolumes, tradingPhase: data.trading_phase, updateTime: msg.timestamp, exchange: 'SZSE', }, })); } else if (category === 'index') { // 指数行情 setQuotes(prev => ({ ...prev, [code]: { code: code, name: prev[code]?.name || '', price: data.current_index, prevClose: data.prev_close, open: data.open_index, high: data.high_index, low: data.low_index, close: data.close_index, volume: data.volume, amount: data.amount, numTrades: data.num_trades, change: data.current_index - data.prev_close, changePct: data.prev_close ? ((data.current_index - data.prev_close) / data.prev_close * 100) : 0, bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], tradingPhase: data.trading_phase, updateTime: msg.timestamp, exchange: 'SZSE', }, })); } else if (category === 'bond') { // 债券行情 setQuotes(prev => ({ ...prev, [code]: { code: code, name: prev[code]?.name || '', price: data.last_px, prevClose: data.prev_close, open: data.open_px, high: data.high_px, low: data.low_px, volume: data.volume, amount: data.amount, numTrades: data.num_trades, weightedAvgPx: data.weighted_avg_px, change: data.last_px - data.prev_close, changePct: data.prev_close ? ((data.last_px - data.prev_close) / data.prev_close * 100) : 0, bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], tradingPhase: data.trading_phase, updateTime: msg.timestamp, exchange: 'SZSE', isBond: true, }, })); } else if (category === 'hk_stock') { // 港股行情(深港通) const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(data.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(data.asks); setQuotes(prev => ({ ...prev, [code]: { code: code, name: prev[code]?.name || '', price: data.last_px, prevClose: data.prev_close, open: data.open_px, high: data.high_px, low: data.low_px, volume: data.volume, amount: data.amount, numTrades: data.num_trades, nominalPx: data.nominal_px, // 按盘价 referencePx: data.reference_px, // 参考价 change: data.last_px - data.prev_close, changePct: data.prev_close ? ((data.last_px - data.prev_close) / data.prev_close * 100) : 0, bidPrices, bidVolumes, askPrices, askVolumes, tradingPhase: data.trading_phase, updateTime: msg.timestamp, exchange: 'SZSE', isHK: true, }, })); } else if (category === 'afterhours_block' || category === 'afterhours_trading') { // 盘后交易 setQuotes(prev => ({ ...prev, [code]: { ...prev[code], afterhours: { bidPx: data.bid_px, bidSize: data.bid_size, offerPx: data.offer_px, offerSize: data.offer_size, volume: data.volume, amount: data.amount, numTrades: data.num_trades, }, updateTime: msg.timestamp, }, })); } // fund_nav 和 volume_stats 暂不处理 } else if (msg.type === 'snapshot') { // 深交所初始快照 const { stocks = [], indexes = [], bonds = [] } = msg.data || {}; setQuotes(prev => { const updated = { ...prev }; stocks.forEach(s => { if (subscribedCodes.current.SZSE.has(s.security_id)) { const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(s.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(s.asks); updated[s.security_id] = { code: s.security_id, name: s.security_name || '', price: s.last_px, prevClose: s.prev_close, open: s.open_px, high: s.high_px, low: s.low_px, volume: s.volume, amount: s.amount, numTrades: s.num_trades, upperLimit: s.upper_limit, lowerLimit: s.lower_limit, change: s.last_px - s.prev_close, changePct: s.prev_close ? ((s.last_px - s.prev_close) / s.prev_close * 100) : 0, bidPrices, bidVolumes, askPrices, askVolumes, exchange: 'SZSE', }; } }); indexes.forEach(i => { if (subscribedCodes.current.SZSE.has(i.security_id)) { updated[i.security_id] = { code: i.security_id, name: i.security_name || '', price: i.current_index, prevClose: i.prev_close, open: i.open_index, high: i.high_index, low: i.low_index, volume: i.volume, amount: i.amount, numTrades: i.num_trades, change: i.current_index - i.prev_close, changePct: i.prev_close ? ((i.current_index - i.prev_close) / i.prev_close * 100) : 0, bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], exchange: 'SZSE', }; } }); bonds.forEach(b => { if (subscribedCodes.current.SZSE.has(b.security_id)) { updated[b.security_id] = { code: b.security_id, name: b.security_name || '', price: b.last_px, prevClose: b.prev_close, open: b.open_px, high: b.high_px, low: b.low_px, volume: b.volume, amount: b.amount, change: b.last_px - b.prev_close, changePct: b.prev_close ? ((b.last_px - b.prev_close) / b.prev_close * 100) : 0, bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], exchange: 'SZSE', isBond: true, }; } }); return updated; }); } } }, []); /** * 启动心跳 */ const startHeartbeat = useCallback((exchange) => { stopHeartbeat(exchange); heartbeatRefs.current[exchange] = setInterval(() => { const ws = wsRefs.current[exchange]; if (ws && ws.readyState === WebSocket.OPEN) { if (exchange === 'SSE') { ws.send(JSON.stringify({ action: 'ping' })); } else { ws.send(JSON.stringify({ type: 'ping' })); } } }, HEARTBEAT_INTERVAL); }, []); /** * 停止心跳 */ const stopHeartbeat = useCallback((exchange) => { if (heartbeatRefs.current[exchange]) { clearInterval(heartbeatRefs.current[exchange]); heartbeatRefs.current[exchange] = null; } }, []); /** * 安排重连 */ const scheduleReconnect = useCallback((exchange) => { if (reconnectRefs.current[exchange]) { return; // 已有重连计划 } reconnectRefs.current[exchange] = setTimeout(() => { reconnectRefs.current[exchange] = null; // 只有还有订阅的代码才重连 if (subscribedCodes.current[exchange].size > 0) { createConnection(exchange); } }, RECONNECT_INTERVAL); }, [createConnection]); /** * 订阅证券 */ const subscribe = useCallback((code) => { const baseCode = normalizeCode(code); const exchange = getExchange(code); // 添加到订阅列表 subscribedCodes.current[exchange].add(baseCode); // 如果连接已建立,发送订阅消息(仅上交所需要) const ws = wsRefs.current[exchange]; if (exchange === 'SSE' && ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: [baseCode], })); } // 如果连接未建立,创建连接 if (!ws || ws.readyState !== WebSocket.OPEN) { createConnection(exchange); } }, [createConnection]); /** * 取消订阅 */ const unsubscribe = useCallback((code) => { const baseCode = normalizeCode(code); const exchange = getExchange(code); // 从订阅列表移除 subscribedCodes.current[exchange].delete(baseCode); // 从 quotes 中移除 setQuotes(prev => { const updated = { ...prev }; delete updated[baseCode]; return updated; }); // 如果该交易所没有订阅了,关闭连接 if (subscribedCodes.current[exchange].size === 0) { const ws = wsRefs.current[exchange]; if (ws) { ws.close(); wsRefs.current[exchange] = null; } } }, []); /** * 初始化订阅 */ useEffect(() => { if (!codes || codes.length === 0) { return; } // 按交易所分组 const sseCodesSet = new Set(); const szseCodesSet = new Set(); codes.forEach(code => { const baseCode = normalizeCode(code); const exchange = getExchange(code); if (exchange === 'SSE') { sseCodesSet.add(baseCode); } else { szseCodesSet.add(baseCode); } }); // 更新订阅列表 subscribedCodes.current.SSE = sseCodesSet; subscribedCodes.current.SZSE = szseCodesSet; // 建立连接 if (sseCodesSet.size > 0) { createConnection('SSE'); } if (szseCodesSet.size > 0) { createConnection('SZSE'); } // 清理 return () => { ['SSE', 'SZSE'].forEach(exchange => { stopHeartbeat(exchange); if (reconnectRefs.current[exchange]) { clearTimeout(reconnectRefs.current[exchange]); } const ws = wsRefs.current[exchange]; if (ws) { ws.close(); } }); }; }, []); // 只在挂载时执行 /** * 处理 codes 变化 */ useEffect(() => { if (!codes) return; // 计算新的订阅列表 const newSseCodes = new Set(); const newSzseCodes = new Set(); codes.forEach(code => { const baseCode = normalizeCode(code); const exchange = getExchange(code); if (exchange === 'SSE') { newSseCodes.add(baseCode); } else { newSzseCodes.add(baseCode); } }); // 找出需要新增和删除的代码 const oldSseCodes = subscribedCodes.current.SSE; const oldSzseCodes = subscribedCodes.current.SZSE; // 更新上交所订阅 const sseToAdd = [...newSseCodes].filter(c => !oldSseCodes.has(c)); const sseToRemove = [...oldSseCodes].filter(c => !newSseCodes.has(c)); if (sseToAdd.length > 0 || sseToRemove.length > 0) { subscribedCodes.current.SSE = newSseCodes; const ws = wsRefs.current.SSE; if (ws && ws.readyState === WebSocket.OPEN && sseToAdd.length > 0) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: sseToAdd, })); } // 如果新增了代码但连接未建立 if (sseToAdd.length > 0 && (!ws || ws.readyState !== WebSocket.OPEN)) { createConnection('SSE'); } // 如果没有订阅了,关闭连接 if (newSseCodes.size === 0 && ws) { ws.close(); wsRefs.current.SSE = null; } } // 更新深交所订阅 const szseToAdd = [...newSzseCodes].filter(c => !oldSzseCodes.has(c)); const szseToRemove = [...oldSzseCodes].filter(c => !newSzseCodes.has(c)); if (szseToAdd.length > 0 || szseToRemove.length > 0) { subscribedCodes.current.SZSE = newSzseCodes; // 深交所是自动推送,只需要管理连接 const ws = wsRefs.current.SZSE; if (szseToAdd.length > 0 && (!ws || ws.readyState !== WebSocket.OPEN)) { createConnection('SZSE'); } if (newSzseCodes.size === 0 && ws) { ws.close(); wsRefs.current.SZSE = null; } } // 清理已取消订阅的 quotes const removedCodes = [...sseToRemove, ...szseToRemove]; if (removedCodes.length > 0) { setQuotes(prev => { const updated = { ...prev }; removedCodes.forEach(code => { delete updated[code]; }); return updated; }); } }, [codes, createConnection]); return { quotes, connected, subscribe, unsubscribe, }; }; export default useRealtimeQuote;