/** * 实时行情 Hook * 管理上交所和深交所 WebSocket 连接,获取实时行情数据 * * 连接方式: * - 生产环境 (HTTPS): 通过 Nginx 代理使用 wss:// * - 开发环境 (HTTP): 直连 ws:// * * 上交所 (SSE): 需主动订阅,提供五档行情 * 深交所 (SZSE): v4.0 API - 需主动订阅,提供十档行情 */ import { useState, useEffect, useRef, useCallback } from 'react'; import { logger } from '@utils/logger'; import { WS_CONFIG, HEARTBEAT_INTERVAL, RECONNECT_INTERVAL } from './constants'; import { getExchange, normalizeCode, extractOrderBook, calcChangePct } from './utils'; import type { Exchange, ConnectionStatus, QuotesMap, QuoteData, SSEMessage, SSEQuoteItem, SZSEMessage, SZSERealtimeMessage, SZSESnapshotMessage, SZSEStockData, SZSEIndexData, SZSEBondData, SZSEHKStockData, SZSEAfterhoursData, UseRealtimeQuoteReturn, } from '../types'; /** * 处理上交所消息 */ const handleSSEMessage = ( msg: SSEMessage, subscribedCodes: Set, prevQuotes: QuotesMap ): QuotesMap | null => { if (msg.type !== 'stock' && msg.type !== 'index') { return null; } const data = msg.data || {}; const updated: QuotesMap = { ...prevQuotes }; let hasUpdate = false; Object.entries(data).forEach(([code, quote]: [string, SSEQuoteItem]) => { const fullCode = code.includes('.') ? code : `${code}.SH`; if (subscribedCodes.has(code) || subscribedCodes.has(fullCode)) { hasUpdate = true; updated[fullCode] = { code: fullCode, name: quote.security_name, price: quote.last_price, prevClose: quote.prev_close, open: quote.open_price, high: quote.high_price, low: quote.low_price, volume: quote.volume, amount: quote.amount, change: quote.last_price - quote.prev_close, changePct: calcChangePct(quote.last_price, quote.prev_close), bidPrices: quote.bid_prices || [], bidVolumes: quote.bid_volumes || [], askPrices: quote.ask_prices || [], askVolumes: quote.ask_volumes || [], updateTime: quote.trade_time, exchange: 'SSE', } as QuoteData; } }); return hasUpdate ? updated : null; }; /** * 处理深交所实时消息 (realtime) */ const handleSZSERealtimeMessage = ( msg: SZSERealtimeMessage, subscribedCodes: Set, prevQuotes: QuotesMap ): QuotesMap | null => { const { category, data, timestamp } = msg; const rawCode = data.security_id; const fullCode = rawCode.includes('.') ? rawCode : `${rawCode}.SZ`; if (!subscribedCodes.has(rawCode) && !subscribedCodes.has(fullCode)) { return null; } const updated: QuotesMap = { ...prevQuotes }; switch (category) { case 'stock': { const stockData = data as SZSEStockData; const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(stockData.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(stockData.asks); updated[fullCode] = { code: fullCode, name: prevQuotes[fullCode]?.name || '', price: stockData.last_px, prevClose: stockData.prev_close, open: stockData.open_px, high: stockData.high_px, low: stockData.low_px, volume: stockData.volume, amount: stockData.amount, numTrades: stockData.num_trades, upperLimit: stockData.upper_limit, lowerLimit: stockData.lower_limit, change: stockData.last_px - stockData.prev_close, changePct: calcChangePct(stockData.last_px, stockData.prev_close), bidPrices, bidVolumes, askPrices, askVolumes, tradingPhase: stockData.trading_phase, updateTime: timestamp, exchange: 'SZSE', } as QuoteData; break; } case 'index': { const indexData = data as SZSEIndexData; updated[fullCode] = { code: fullCode, name: prevQuotes[fullCode]?.name || '', price: indexData.current_index, prevClose: indexData.prev_close, open: indexData.open_index, high: indexData.high_index, low: indexData.low_index, close: indexData.close_index, volume: indexData.volume, amount: indexData.amount, numTrades: indexData.num_trades, change: indexData.current_index - indexData.prev_close, changePct: calcChangePct(indexData.current_index, indexData.prev_close), bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], tradingPhase: indexData.trading_phase, updateTime: timestamp, exchange: 'SZSE', } as QuoteData; break; } case 'bond': { const bondData = data as SZSEBondData; updated[fullCode] = { code: fullCode, name: prevQuotes[fullCode]?.name || '', price: bondData.last_px, prevClose: bondData.prev_close, open: bondData.open_px, high: bondData.high_px, low: bondData.low_px, volume: bondData.volume, amount: bondData.amount, numTrades: bondData.num_trades, weightedAvgPx: bondData.weighted_avg_px, change: bondData.last_px - bondData.prev_close, changePct: calcChangePct(bondData.last_px, bondData.prev_close), bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], tradingPhase: bondData.trading_phase, updateTime: timestamp, exchange: 'SZSE', isBond: true, } as QuoteData; break; } case 'hk_stock': { const hkData = data as SZSEHKStockData; const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(hkData.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(hkData.asks); updated[fullCode] = { code: fullCode, name: prevQuotes[fullCode]?.name || '', price: hkData.last_px, prevClose: hkData.prev_close, open: hkData.open_px, high: hkData.high_px, low: hkData.low_px, volume: hkData.volume, amount: hkData.amount, numTrades: hkData.num_trades, nominalPx: hkData.nominal_px, referencePx: hkData.reference_px, change: hkData.last_px - hkData.prev_close, changePct: calcChangePct(hkData.last_px, hkData.prev_close), bidPrices, bidVolumes, askPrices, askVolumes, tradingPhase: hkData.trading_phase, updateTime: timestamp, exchange: 'SZSE', isHK: true, } as QuoteData; break; } case 'afterhours_block': case 'afterhours_trading': { const afterhoursData = data as SZSEAfterhoursData; const existing = prevQuotes[fullCode]; const afterhoursInfo = { bidPx: afterhoursData.bid_px, bidSize: afterhoursData.bid_size, offerPx: afterhoursData.offer_px, offerSize: afterhoursData.offer_size, volume: afterhoursData.volume, amount: afterhoursData.amount, numTrades: afterhoursData.num_trades || 0, }; if (existing) { // 合并到现有数据 updated[fullCode] = { ...existing, afterhours: afterhoursInfo, updateTime: timestamp, } as QuoteData; } else { // 盘后首次收到数据(刷新页面后),创建基础行情结构 updated[fullCode] = { code: fullCode, name: '', price: 0, prevClose: afterhoursData.prev_close, open: 0, high: 0, low: 0, volume: 0, amount: 0, change: 0, changePct: 0, bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], tradingPhase: afterhoursData.trading_phase, afterhours: afterhoursInfo, updateTime: timestamp, exchange: 'SZSE', } as QuoteData; } break; } default: return null; } return updated; }; /** * 处理深交所快照消息 (snapshot) * 订阅后首次返回的批量数据 */ const handleSZSESnapshotMessage = ( msg: SZSESnapshotMessage, subscribedCodes: Set, prevQuotes: QuotesMap ): QuotesMap | null => { const { stocks = [], indexes = [], bonds = [] } = msg.data || {}; const updated: QuotesMap = { ...prevQuotes }; let hasUpdate = false; stocks.forEach((s: SZSEStockData) => { const rawCode = s.security_id; const fullCode = rawCode.includes('.') ? rawCode : `${rawCode}.SZ`; if (subscribedCodes.has(rawCode) || subscribedCodes.has(fullCode)) { hasUpdate = true; const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(s.bids); const { prices: askPrices, volumes: askVolumes } = extractOrderBook(s.asks); updated[fullCode] = { code: fullCode, name: '', price: s.last_px, prevClose: s.prev_close, open: s.open_px, high: s.high_px, low: s.low_px, volume: s.volume, amount: s.amount, numTrades: s.num_trades, upperLimit: s.upper_limit, lowerLimit: s.lower_limit, change: s.last_px - s.prev_close, changePct: calcChangePct(s.last_px, s.prev_close), bidPrices, bidVolumes, askPrices, askVolumes, exchange: 'SZSE', } as QuoteData; } }); indexes.forEach((i: SZSEIndexData) => { const rawCode = i.security_id; const fullCode = rawCode.includes('.') ? rawCode : `${rawCode}.SZ`; if (subscribedCodes.has(rawCode) || subscribedCodes.has(fullCode)) { hasUpdate = true; updated[fullCode] = { code: fullCode, name: '', price: i.current_index, prevClose: i.prev_close, open: i.open_index, high: i.high_index, low: i.low_index, volume: i.volume, amount: i.amount, numTrades: i.num_trades, change: i.current_index - i.prev_close, changePct: calcChangePct(i.current_index, i.prev_close), bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], exchange: 'SZSE', } as QuoteData; } }); bonds.forEach((b: SZSEBondData) => { const rawCode = b.security_id; const fullCode = rawCode.includes('.') ? rawCode : `${rawCode}.SZ`; if (subscribedCodes.has(rawCode) || subscribedCodes.has(fullCode)) { hasUpdate = true; updated[fullCode] = { code: fullCode, name: '', price: b.last_px, prevClose: b.prev_close, open: b.open_px, high: b.high_px, low: b.low_px, volume: b.volume, amount: b.amount, change: b.last_px - b.prev_close, changePct: calcChangePct(b.last_px, b.prev_close), bidPrices: [], bidVolumes: [], askPrices: [], askVolumes: [], exchange: 'SZSE', isBond: true, } as QuoteData; } }); return hasUpdate ? updated : null; }; /** * 实时行情 Hook * @param codes - 订阅的证券代码列表(带后缀,如 000001.SZ) */ export const useRealtimeQuote = (codes: string[] = []): UseRealtimeQuoteReturn => { const [quotes, setQuotes] = useState({}); const [connected, setConnected] = useState({ SSE: false, SZSE: false }); const wsRefs = useRef>({ SSE: null, SZSE: null }); const heartbeatRefs = useRef>({ SSE: null, SZSE: null }); const reconnectRefs = useRef>({ SSE: null, SZSE: null }); // 重连计数器(避免无限重连刷屏) const reconnectCountRef = useRef>({ SSE: 0, SZSE: 0 }); const MAX_RECONNECT_ATTEMPTS = 5; // 深交所 WebSocket 就绪状态(收到 welcome 消息后才能订阅) const szseReadyRef = useRef(false); // 待发送的深交所订阅队列(在 welcome 之前收到的订阅请求) const szsePendingSubscribeRef = useRef([]); const subscribedCodes = useRef>>({ SSE: new Set(), SZSE: new Set(), }); const stopHeartbeat = useCallback((exchange: Exchange) => { if (heartbeatRefs.current[exchange]) { clearInterval(heartbeatRefs.current[exchange]!); heartbeatRefs.current[exchange] = null; } }, []); const startHeartbeat = useCallback((exchange: Exchange) => { stopHeartbeat(exchange); heartbeatRefs.current[exchange] = setInterval(() => { const ws = wsRefs.current[exchange]; if (ws && ws.readyState === WebSocket.OPEN) { // 上交所使用 action: 'ping',深交所使用 type: 'ping' const pingMsg = exchange === 'SSE' ? { action: 'ping' } : { type: 'ping' }; ws.send(JSON.stringify(pingMsg)); } }, HEARTBEAT_INTERVAL); }, [stopHeartbeat]); /** * 发送深交所订阅请求 * 格式:{ type: 'subscribe', securities: ['000001', '000002'] } */ const sendSZSESubscribe = useCallback((baseCodes: string[]) => { const ws = wsRefs.current.SZSE; if (ws && ws.readyState === WebSocket.OPEN && baseCodes.length > 0) { ws.send(JSON.stringify({ type: 'subscribe', securities: baseCodes, })); logger.info('FlexScreen', `SZSE 发送订阅请求`, { securities: baseCodes }); } }, []); /** * 处理消息 */ const handleMessage = useCallback((exchange: Exchange, msg: SSEMessage | SZSEMessage) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const anyMsg = msg as any; // 心跳响应(上交所和深交所格式可能不同) if (msg.type === 'pong' || anyMsg.action === 'pong') return; if (exchange === 'SSE') { // 上交所消息处理 if (msg.type === 'subscribed') { logger.info('FlexScreen', 'SSE 订阅成功', { channels: anyMsg.channels }); return; } if (msg.type === 'error') { logger.error('FlexScreen', 'SSE WebSocket 错误', { message: anyMsg.message }); return; } // 处理行情数据 const result = handleSSEMessage( msg as SSEMessage, subscribedCodes.current.SSE, {} ); if (result) { setQuotes(prev => ({ ...prev, ...result })); } } else { // 深交所消息处理 switch (msg.type) { case 'welcome': // 收到欢迎消息,深交所 WebSocket 就绪 logger.info('FlexScreen', 'SZSE WebSocket 就绪,可以订阅'); szseReadyRef.current = true; // 发送之前待处理的订阅请求 if (szsePendingSubscribeRef.current.length > 0) { sendSZSESubscribe(szsePendingSubscribeRef.current); szsePendingSubscribeRef.current = []; } else { // 如果已有订阅的代码,立即发送订阅 const currentCodes = Array.from(subscribedCodes.current.SZSE).map(c => normalizeCode(c)); if (currentCodes.length > 0) { sendSZSESubscribe(currentCodes); } } break; case 'subscribed': // 订阅成功确认 logger.info('FlexScreen', 'SZSE 订阅成功', { securities: anyMsg.securities, categories: anyMsg.categories, }); break; case 'unsubscribed': // 取消订阅确认 logger.info('FlexScreen', 'SZSE 取消订阅成功'); break; case 'snapshot': // 快照消息(订阅后首次返回的批量数据) setQuotes(prev => { const result = handleSZSESnapshotMessage( msg as SZSESnapshotMessage, subscribedCodes.current.SZSE, prev ); return result || prev; }); break; case 'realtime': // 实时行情推送 setQuotes(prev => { const result = handleSZSERealtimeMessage( msg as SZSERealtimeMessage, subscribedCodes.current.SZSE, prev ); return result || prev; }); break; case 'query_result': case 'query_batch_result': // 查询结果(目前不处理) break; case 'error': logger.error('FlexScreen', 'SZSE WebSocket 错误', { message: anyMsg.message }); break; default: // 未知消息类型 break; } } }, [sendSZSESubscribe]); /** * 创建 WebSocket 连接 */ const createConnection = useCallback((exchange: Exchange) => { const isHttps = typeof window !== 'undefined' && window.location.protocol === 'https:'; const wsUrl = WS_CONFIG[exchange]; const isInsecureWs = wsUrl.startsWith('ws://'); if (isHttps && isInsecureWs) { logger.warn( 'FlexScreen', `${exchange} WebSocket 配置错误:HTTPS 页面尝试连接 ws:// 端点,请检查 Nginx 代理配置` ); return; } if (wsRefs.current[exchange]) { wsRefs.current[exchange]!.close(); } // 重置深交所就绪状态 if (exchange === 'SZSE') { szseReadyRef.current = false; } try { const ws = new WebSocket(wsUrl); wsRefs.current[exchange] = ws; ws.onopen = () => { logger.info('FlexScreen', `${exchange} WebSocket 已连接`, { url: wsUrl }); setConnected(prev => ({ ...prev, [exchange]: true })); // 连接成功,重置重连计数 reconnectCountRef.current[exchange] = 0; if (exchange === 'SSE') { // 上交所:连接后立即发送订阅 const fullCodes = Array.from(subscribedCodes.current.SSE); const baseCodes = fullCodes.map(c => normalizeCode(c)); if (baseCodes.length > 0) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: baseCodes, })); } } // 深交所:等待 welcome 消息后再订阅 startHeartbeat(exchange); }; ws.onmessage = (event: MessageEvent) => { try { const msg = JSON.parse(event.data); handleMessage(exchange, msg); } catch (e) { logger.warn('FlexScreen', `${exchange} 消息解析失败`, e); } }; ws.onerror = (error: Event) => { logger.error('FlexScreen', `${exchange} WebSocket 连接失败`, { url: wsUrl, readyState: ws.readyState, hint: '请检查:1) 后端服务是否启动 2) Nginx 代理是否配置正确', }); }; ws.onclose = () => { logger.info('FlexScreen', `${exchange} WebSocket 断开`); setConnected(prev => ({ ...prev, [exchange]: false })); stopHeartbeat(exchange); // 重置深交所就绪状态 if (exchange === 'SZSE') { szseReadyRef.current = false; } // 自动重连(有次数限制,避免刷屏) const currentAttempts = reconnectCountRef.current[exchange]; if ( !reconnectRefs.current[exchange] && subscribedCodes.current[exchange].size > 0 && currentAttempts < MAX_RECONNECT_ATTEMPTS ) { reconnectCountRef.current[exchange] = currentAttempts + 1; // 指数退避:3秒、6秒、12秒、24秒、48秒 const delay = RECONNECT_INTERVAL * Math.pow(2, currentAttempts); logger.info('FlexScreen', `${exchange} 将在 ${delay / 1000} 秒后重连 (${currentAttempts + 1}/${MAX_RECONNECT_ATTEMPTS})`); reconnectRefs.current[exchange] = setTimeout(() => { reconnectRefs.current[exchange] = null; if (subscribedCodes.current[exchange].size > 0) { createConnection(exchange); } }, delay); } else if (currentAttempts >= MAX_RECONNECT_ATTEMPTS) { logger.warn('FlexScreen', `${exchange} 达到最大重连次数,停止重连。请检查 WebSocket 服务是否正常。`, { url: wsUrl, }); } }; } catch (e) { logger.error('FlexScreen', `${exchange} WebSocket 连接失败`, e); setConnected(prev => ({ ...prev, [exchange]: false })); } }, [startHeartbeat, stopHeartbeat, handleMessage]); /** * 订阅单个证券 */ const subscribe = useCallback((code: string) => { const exchange = getExchange(code); const fullCode = code.includes('.') ? code : `${code}.${exchange === 'SSE' ? 'SH' : 'SZ'}`; const baseCode = normalizeCode(code); subscribedCodes.current[exchange].add(fullCode); const ws = wsRefs.current[exchange]; if (exchange === 'SSE') { if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: [baseCode], })); } } else { // 深交所 if (ws && ws.readyState === WebSocket.OPEN && szseReadyRef.current) { sendSZSESubscribe([baseCode]); } else if (ws && ws.readyState === WebSocket.OPEN) { // WebSocket 已连接但未收到 welcome,加入待处理队列 szsePendingSubscribeRef.current.push(baseCode); } } if (!ws || ws.readyState !== WebSocket.OPEN) { createConnection(exchange); } }, [createConnection, sendSZSESubscribe]); /** * 取消订阅 */ const unsubscribe = useCallback((code: string) => { const exchange = getExchange(code); const fullCode = code.includes('.') ? code : `${code}.${exchange === 'SSE' ? 'SH' : 'SZ'}`; const baseCode = normalizeCode(code); subscribedCodes.current[exchange].delete(fullCode); // 发送取消订阅请求(深交所) const ws = wsRefs.current[exchange]; if (exchange === 'SZSE' && ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'unsubscribe', securities: [baseCode], })); } setQuotes(prev => { const updated = { ...prev }; delete updated[fullCode]; return updated; }); if (subscribedCodes.current[exchange].size === 0) { if (ws) { ws.close(); wsRefs.current[exchange] = null; } } }, []); // 初始化和 codes 变化处理 useEffect(() => { if (!codes || codes.length === 0) return; const newSseCodes = new Set(); const newSzseCodes = new Set(); codes.forEach(code => { const exchange = getExchange(code); const fullCode = code.includes('.') ? code : `${code}.${exchange === 'SSE' ? 'SH' : 'SZ'}`; if (exchange === 'SSE') { newSseCodes.add(fullCode); } else { newSzseCodes.add(fullCode); } }); // 更新上交所订阅 const oldSseCodes = subscribedCodes.current.SSE; const sseToAdd = [...newSseCodes].filter(c => !oldSseCodes.has(c)); const sseToAddBase = sseToAdd.map(c => normalizeCode(c)); if (sseToAdd.length > 0 || newSseCodes.size !== oldSseCodes.size) { subscribedCodes.current.SSE = newSseCodes; const ws = wsRefs.current.SSE; if (ws && ws.readyState === WebSocket.OPEN && sseToAddBase.length > 0) { ws.send(JSON.stringify({ action: 'subscribe', channels: ['stock', 'index'], codes: sseToAddBase, })); } if (sseToAdd.length > 0 && (!ws || ws.readyState !== WebSocket.OPEN)) { createConnection('SSE'); } if (newSseCodes.size === 0 && ws) { ws.close(); wsRefs.current.SSE = null; } } // 更新深交所订阅 const oldSzseCodes = subscribedCodes.current.SZSE; const szseToAdd = [...newSzseCodes].filter(c => !oldSzseCodes.has(c)); const szseToAddBase = szseToAdd.map(c => normalizeCode(c)); if (szseToAdd.length > 0 || newSzseCodes.size !== oldSzseCodes.size) { subscribedCodes.current.SZSE = newSzseCodes; const ws = wsRefs.current.SZSE; if (szseToAdd.length > 0) { if (ws && ws.readyState === WebSocket.OPEN && szseReadyRef.current) { // WebSocket 已就绪,直接发送订阅 sendSZSESubscribe(szseToAddBase); } else if (ws && ws.readyState === WebSocket.OPEN) { // WebSocket 已连接但未就绪,加入待处理队列 szsePendingSubscribeRef.current.push(...szseToAddBase); } else { // WebSocket 未连接,创建连接 createConnection('SZSE'); } } if (newSzseCodes.size === 0 && ws) { ws.close(); wsRefs.current.SZSE = null; } } // 清理已取消订阅的 quotes const allNewCodes = new Set([...newSseCodes, ...newSzseCodes]); setQuotes(prev => { const updated: QuotesMap = {}; Object.keys(prev).forEach(code => { if (allNewCodes.has(code)) { updated[code] = prev[code]; } }); return updated; }); }, [codes, createConnection, sendSZSESubscribe]); // 清理 useEffect(() => { return () => { (['SSE', 'SZSE'] as Exchange[]).forEach(exchange => { stopHeartbeat(exchange); if (reconnectRefs.current[exchange]) { clearTimeout(reconnectRefs.current[exchange]!); } const ws = wsRefs.current[exchange]; if (ws) { ws.close(); } }); }; }, [stopHeartbeat]); return { quotes, connected, subscribe, unsubscribe }; }; export default useRealtimeQuote;