update pay ui

This commit is contained in:
2025-12-10 11:19:02 +08:00
parent e501ac3819
commit da44dcd522
12 changed files with 1229 additions and 820 deletions

View File

@@ -0,0 +1,594 @@
/**
* 实时行情 Hook
* 管理上交所和深交所 WebSocket 连接,获取实时行情数据
*
* 上交所 (SSE): ws://49.232.185.254:8765 - 需主动订阅,提供五档行情
* 深交所 (SZSE): ws://222.128.1.157:8765 - 自动推送,提供十档行情
*/
import { useState, useEffect, useRef, useCallback } from 'react';
import { logger } from '@utils/logger';
import { WS_CONFIG, HEARTBEAT_INTERVAL, RECONNECT_INTERVAL } from './constants';
import { getExchange, normalizeCode, extractOrderBook, calcChangePct } from './utils';
import type {
Exchange,
ConnectionStatus,
QuotesMap,
QuoteData,
SSEMessage,
SSEQuoteItem,
SZSEMessage,
SZSERealtimeMessage,
SZSESnapshotMessage,
SZSEStockData,
SZSEIndexData,
SZSEBondData,
SZSEHKStockData,
SZSEAfterhoursData,
UseRealtimeQuoteReturn,
} from '../types';
/**
* 处理上交所消息
*/
const handleSSEMessage = (
msg: SSEMessage,
subscribedCodes: Set<string>,
prevQuotes: QuotesMap
): QuotesMap | null => {
if (msg.type !== 'stock' && msg.type !== 'index') {
return null;
}
const data = msg.data || {};
const updated: QuotesMap = { ...prevQuotes };
let hasUpdate = false;
Object.entries(data).forEach(([code, quote]: [string, SSEQuoteItem]) => {
if (subscribedCodes.has(code)) {
hasUpdate = true;
updated[code] = {
code: quote.security_id,
name: quote.security_name,
price: quote.last_price,
prevClose: quote.prev_close,
open: quote.open_price,
high: quote.high_price,
low: quote.low_price,
volume: quote.volume,
amount: quote.amount,
change: quote.last_price - quote.prev_close,
changePct: calcChangePct(quote.last_price, quote.prev_close),
bidPrices: quote.bid_prices || [],
bidVolumes: quote.bid_volumes || [],
askPrices: quote.ask_prices || [],
askVolumes: quote.ask_volumes || [],
updateTime: quote.trade_time,
exchange: 'SSE',
} as QuoteData;
}
});
return hasUpdate ? updated : null;
};
/**
* 处理深交所实时消息
*/
const handleSZSERealtimeMessage = (
msg: SZSERealtimeMessage,
subscribedCodes: Set<string>,
prevQuotes: QuotesMap
): QuotesMap | null => {
const { category, data, timestamp } = msg;
const code = data.security_id;
if (!subscribedCodes.has(code)) {
return null;
}
const updated: QuotesMap = { ...prevQuotes };
switch (category) {
case 'stock': {
const stockData = data as SZSEStockData;
const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(stockData.bids);
const { prices: askPrices, volumes: askVolumes } = extractOrderBook(stockData.asks);
updated[code] = {
code,
name: prevQuotes[code]?.name || '',
price: stockData.last_px,
prevClose: stockData.prev_close,
open: stockData.open_px,
high: stockData.high_px,
low: stockData.low_px,
volume: stockData.volume,
amount: stockData.amount,
numTrades: stockData.num_trades,
upperLimit: stockData.upper_limit,
lowerLimit: stockData.lower_limit,
change: stockData.last_px - stockData.prev_close,
changePct: calcChangePct(stockData.last_px, stockData.prev_close),
bidPrices,
bidVolumes,
askPrices,
askVolumes,
tradingPhase: stockData.trading_phase,
updateTime: timestamp,
exchange: 'SZSE',
} as QuoteData;
break;
}
case 'index': {
const indexData = data as SZSEIndexData;
updated[code] = {
code,
name: prevQuotes[code]?.name || '',
price: indexData.current_index,
prevClose: indexData.prev_close,
open: indexData.open_index,
high: indexData.high_index,
low: indexData.low_index,
close: indexData.close_index,
volume: indexData.volume,
amount: indexData.amount,
numTrades: indexData.num_trades,
change: indexData.current_index - indexData.prev_close,
changePct: calcChangePct(indexData.current_index, indexData.prev_close),
bidPrices: [],
bidVolumes: [],
askPrices: [],
askVolumes: [],
tradingPhase: indexData.trading_phase,
updateTime: timestamp,
exchange: 'SZSE',
} as QuoteData;
break;
}
case 'bond': {
const bondData = data as SZSEBondData;
updated[code] = {
code,
name: prevQuotes[code]?.name || '',
price: bondData.last_px,
prevClose: bondData.prev_close,
open: bondData.open_px,
high: bondData.high_px,
low: bondData.low_px,
volume: bondData.volume,
amount: bondData.amount,
numTrades: bondData.num_trades,
weightedAvgPx: bondData.weighted_avg_px,
change: bondData.last_px - bondData.prev_close,
changePct: calcChangePct(bondData.last_px, bondData.prev_close),
bidPrices: [],
bidVolumes: [],
askPrices: [],
askVolumes: [],
tradingPhase: bondData.trading_phase,
updateTime: timestamp,
exchange: 'SZSE',
isBond: true,
} as QuoteData;
break;
}
case 'hk_stock': {
const hkData = data as SZSEHKStockData;
const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(hkData.bids);
const { prices: askPrices, volumes: askVolumes } = extractOrderBook(hkData.asks);
updated[code] = {
code,
name: prevQuotes[code]?.name || '',
price: hkData.last_px,
prevClose: hkData.prev_close,
open: hkData.open_px,
high: hkData.high_px,
low: hkData.low_px,
volume: hkData.volume,
amount: hkData.amount,
numTrades: hkData.num_trades,
nominalPx: hkData.nominal_px,
referencePx: hkData.reference_px,
change: hkData.last_px - hkData.prev_close,
changePct: calcChangePct(hkData.last_px, hkData.prev_close),
bidPrices,
bidVolumes,
askPrices,
askVolumes,
tradingPhase: hkData.trading_phase,
updateTime: timestamp,
exchange: 'SZSE',
isHK: true,
} as QuoteData;
break;
}
case 'afterhours_block':
case 'afterhours_trading': {
const afterhoursData = data as SZSEAfterhoursData;
const existing = prevQuotes[code];
if (existing) {
updated[code] = {
...existing,
afterhours: {
bidPx: afterhoursData.bid_px,
bidSize: afterhoursData.bid_size,
offerPx: afterhoursData.offer_px,
offerSize: afterhoursData.offer_size,
volume: afterhoursData.volume,
amount: afterhoursData.amount,
numTrades: afterhoursData.num_trades || 0,
},
updateTime: timestamp,
} as QuoteData;
}
break;
}
default:
return null;
}
return updated;
};
/**
* 处理深交所快照消息
*/
const handleSZSESnapshotMessage = (
msg: SZSESnapshotMessage,
subscribedCodes: Set<string>,
prevQuotes: QuotesMap
): QuotesMap | null => {
const { stocks = [], indexes = [], bonds = [] } = msg.data || {};
const updated: QuotesMap = { ...prevQuotes };
let hasUpdate = false;
stocks.forEach((s: SZSEStockData) => {
if (subscribedCodes.has(s.security_id)) {
hasUpdate = true;
const { prices: bidPrices, volumes: bidVolumes } = extractOrderBook(s.bids);
const { prices: askPrices, volumes: askVolumes } = extractOrderBook(s.asks);
updated[s.security_id] = {
code: s.security_id,
name: '',
price: s.last_px,
prevClose: s.prev_close,
open: s.open_px,
high: s.high_px,
low: s.low_px,
volume: s.volume,
amount: s.amount,
numTrades: s.num_trades,
upperLimit: s.upper_limit,
lowerLimit: s.lower_limit,
change: s.last_px - s.prev_close,
changePct: calcChangePct(s.last_px, s.prev_close),
bidPrices,
bidVolumes,
askPrices,
askVolumes,
exchange: 'SZSE',
} as QuoteData;
}
});
indexes.forEach((i: SZSEIndexData) => {
if (subscribedCodes.has(i.security_id)) {
hasUpdate = true;
updated[i.security_id] = {
code: i.security_id,
name: '',
price: i.current_index,
prevClose: i.prev_close,
open: i.open_index,
high: i.high_index,
low: i.low_index,
volume: i.volume,
amount: i.amount,
numTrades: i.num_trades,
change: i.current_index - i.prev_close,
changePct: calcChangePct(i.current_index, i.prev_close),
bidPrices: [],
bidVolumes: [],
askPrices: [],
askVolumes: [],
exchange: 'SZSE',
} as QuoteData;
}
});
bonds.forEach((b: SZSEBondData) => {
if (subscribedCodes.has(b.security_id)) {
hasUpdate = true;
updated[b.security_id] = {
code: b.security_id,
name: '',
price: b.last_px,
prevClose: b.prev_close,
open: b.open_px,
high: b.high_px,
low: b.low_px,
volume: b.volume,
amount: b.amount,
change: b.last_px - b.prev_close,
changePct: calcChangePct(b.last_px, b.prev_close),
bidPrices: [],
bidVolumes: [],
askPrices: [],
askVolumes: [],
exchange: 'SZSE',
isBond: true,
} as QuoteData;
}
});
return hasUpdate ? updated : null;
};
/**
* 实时行情 Hook
* @param codes - 订阅的证券代码列表
*/
export const useRealtimeQuote = (codes: string[] = []): UseRealtimeQuoteReturn => {
const [quotes, setQuotes] = useState<QuotesMap>({});
const [connected, setConnected] = useState<ConnectionStatus>({ SSE: false, SZSE: false });
const wsRefs = useRef<Record<Exchange, WebSocket | null>>({ SSE: null, SZSE: null });
const heartbeatRefs = useRef<Record<Exchange, NodeJS.Timeout | null>>({ SSE: null, SZSE: null });
const reconnectRefs = useRef<Record<Exchange, NodeJS.Timeout | null>>({ SSE: null, SZSE: null });
const subscribedCodes = useRef<Record<Exchange, Set<string>>>({
SSE: new Set(),
SZSE: new Set(),
});
const stopHeartbeat = useCallback((exchange: Exchange) => {
if (heartbeatRefs.current[exchange]) {
clearInterval(heartbeatRefs.current[exchange]!);
heartbeatRefs.current[exchange] = null;
}
}, []);
const startHeartbeat = useCallback((exchange: Exchange) => {
stopHeartbeat(exchange);
heartbeatRefs.current[exchange] = setInterval(() => {
const ws = wsRefs.current[exchange];
if (ws && ws.readyState === WebSocket.OPEN) {
const msg = exchange === 'SSE' ? { action: 'ping' } : { type: 'ping' };
ws.send(JSON.stringify(msg));
}
}, HEARTBEAT_INTERVAL);
}, [stopHeartbeat]);
const handleMessage = useCallback((exchange: Exchange, msg: SSEMessage | SZSEMessage) => {
if (msg.type === 'pong') return;
if (exchange === 'SSE') {
const result = handleSSEMessage(
msg as SSEMessage,
subscribedCodes.current.SSE,
{} // Will be merged with current state
);
if (result) {
setQuotes(prev => ({ ...prev, ...result }));
}
} else {
if (msg.type === 'realtime') {
setQuotes(prev => {
const result = handleSZSERealtimeMessage(
msg as SZSERealtimeMessage,
subscribedCodes.current.SZSE,
prev
);
return result || prev;
});
} else if (msg.type === 'snapshot') {
setQuotes(prev => {
const result = handleSZSESnapshotMessage(
msg as SZSESnapshotMessage,
subscribedCodes.current.SZSE,
prev
);
return result || prev;
});
}
}
}, []);
const createConnection = useCallback((exchange: Exchange) => {
if (wsRefs.current[exchange]) {
wsRefs.current[exchange]!.close();
}
const ws = new WebSocket(WS_CONFIG[exchange]);
wsRefs.current[exchange] = ws;
ws.onopen = () => {
logger.info('FlexScreen', `${exchange} WebSocket 已连接`);
setConnected(prev => ({ ...prev, [exchange]: true }));
if (exchange === 'SSE') {
const codes = Array.from(subscribedCodes.current.SSE);
if (codes.length > 0) {
ws.send(JSON.stringify({
action: 'subscribe',
channels: ['stock', 'index'],
codes,
}));
}
}
startHeartbeat(exchange);
};
ws.onmessage = (event: MessageEvent) => {
try {
const msg = JSON.parse(event.data);
handleMessage(exchange, msg);
} catch (e) {
logger.warn('FlexScreen', `${exchange} 消息解析失败`, e);
}
};
ws.onerror = (error: Event) => {
logger.error('FlexScreen', `${exchange} WebSocket 错误`, error);
};
ws.onclose = () => {
logger.info('FlexScreen', `${exchange} WebSocket 断开`);
setConnected(prev => ({ ...prev, [exchange]: false }));
stopHeartbeat(exchange);
// 自动重连
if (!reconnectRefs.current[exchange] && subscribedCodes.current[exchange].size > 0) {
reconnectRefs.current[exchange] = setTimeout(() => {
reconnectRefs.current[exchange] = null;
if (subscribedCodes.current[exchange].size > 0) {
createConnection(exchange);
}
}, RECONNECT_INTERVAL);
}
};
}, [startHeartbeat, stopHeartbeat, handleMessage]);
const subscribe = useCallback((code: string) => {
const baseCode = normalizeCode(code);
const exchange = getExchange(code);
subscribedCodes.current[exchange].add(baseCode);
const ws = wsRefs.current[exchange];
if (exchange === 'SSE' && ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
action: 'subscribe',
channels: ['stock', 'index'],
codes: [baseCode],
}));
}
if (!ws || ws.readyState !== WebSocket.OPEN) {
createConnection(exchange);
}
}, [createConnection]);
const unsubscribe = useCallback((code: string) => {
const baseCode = normalizeCode(code);
const exchange = getExchange(code);
subscribedCodes.current[exchange].delete(baseCode);
setQuotes(prev => {
const updated = { ...prev };
delete updated[baseCode];
return updated;
});
if (subscribedCodes.current[exchange].size === 0) {
const ws = wsRefs.current[exchange];
if (ws) {
ws.close();
wsRefs.current[exchange] = null;
}
}
}, []);
// 初始化和 codes 变化处理
useEffect(() => {
if (!codes || codes.length === 0) return;
const newSseCodes = new Set<string>();
const newSzseCodes = new Set<string>();
codes.forEach(code => {
const baseCode = normalizeCode(code);
const exchange = getExchange(code);
if (exchange === 'SSE') {
newSseCodes.add(baseCode);
} else {
newSzseCodes.add(baseCode);
}
});
// 更新上交所订阅
const oldSseCodes = subscribedCodes.current.SSE;
const sseToAdd = [...newSseCodes].filter(c => !oldSseCodes.has(c));
if (sseToAdd.length > 0 || newSseCodes.size !== oldSseCodes.size) {
subscribedCodes.current.SSE = newSseCodes;
const ws = wsRefs.current.SSE;
if (ws && ws.readyState === WebSocket.OPEN && sseToAdd.length > 0) {
ws.send(JSON.stringify({
action: 'subscribe',
channels: ['stock', 'index'],
codes: sseToAdd,
}));
}
if (sseToAdd.length > 0 && (!ws || ws.readyState !== WebSocket.OPEN)) {
createConnection('SSE');
}
if (newSseCodes.size === 0 && ws) {
ws.close();
wsRefs.current.SSE = null;
}
}
// 更新深交所订阅
const oldSzseCodes = subscribedCodes.current.SZSE;
const szseToAdd = [...newSzseCodes].filter(c => !oldSzseCodes.has(c));
if (szseToAdd.length > 0 || newSzseCodes.size !== oldSzseCodes.size) {
subscribedCodes.current.SZSE = newSzseCodes;
const ws = wsRefs.current.SZSE;
if (szseToAdd.length > 0 && (!ws || ws.readyState !== WebSocket.OPEN)) {
createConnection('SZSE');
}
if (newSzseCodes.size === 0 && ws) {
ws.close();
wsRefs.current.SZSE = null;
}
}
// 清理已取消订阅的 quotes
const allNewCodes = new Set([...newSseCodes, ...newSzseCodes]);
setQuotes(prev => {
const updated: QuotesMap = {};
Object.keys(prev).forEach(code => {
if (allNewCodes.has(code)) {
updated[code] = prev[code];
}
});
return updated;
});
}, [codes, createConnection]);
// 清理
useEffect(() => {
return () => {
(['SSE', 'SZSE'] as Exchange[]).forEach(exchange => {
stopHeartbeat(exchange);
if (reconnectRefs.current[exchange]) {
clearTimeout(reconnectRefs.current[exchange]!);
}
const ws = wsRefs.current[exchange];
if (ws) {
ws.close();
}
});
};
}, [stopHeartbeat]);
return { quotes, connected, subscribe, unsubscribe };
};
export default useRealtimeQuote;