diff --git a/app.py b/app.py index d7d67ffb..a38d0504 100755 --- a/app.py +++ b/app.py @@ -6485,6 +6485,360 @@ def delete_account_calendar_event(event_id): return jsonify({'success': False, 'error': str(e)}), 500 +# ==================== 灵活屏实时行情 API ==================== +# 从 ClickHouse 实时行情表获取最新数据(用于盘后/WebSocket 无数据时的回退) +@app.route('/api/flex-screen/quotes', methods=['POST']) +def get_flex_screen_quotes(): + """ + 获取灵活屏行情数据 + 优先从实时行情表查询,如果没有则从分钟线表查询 + + 请求体: + { + "codes": ["000001.SZ", "399001.SZ", "600519.SH"], + "include_order_book": false // 是否包含五档盘口 + } + + 返回: + { + "success": true, + "data": { + "000001.SZ": { + "security_id": "000001", + "name": "平安银行", + "last_px": 10.50, + "prev_close_px": 10.20, + "open_px": 10.30, + "high_px": 10.55, + "low_px": 10.15, + "total_volume_trade": 1000000, + "total_value_trade": 10500000, + "change": 0.30, + "change_pct": 2.94, + "bid_prices": [10.49, 10.48, ...], + "bid_volumes": [1000, 2000, ...], + "ask_prices": [10.50, 10.51, ...], + "ask_volumes": [800, 1200, ...], + "update_time": "2024-12-11 15:00:00" + }, + ... + }, + "source": "realtime" | "minute" // 数据来源 + } + """ + try: + data = request.json or {} + codes = data.get('codes', []) + include_order_book = data.get('include_order_book', False) + + if not codes: + return jsonify({'success': False, 'error': '请提供股票代码'}), 400 + + client = get_clickhouse_client() + results = {} + source = 'realtime' + + # 分离上交所和深交所代码 + sse_codes = [] # 上交所 + szse_stock_codes = [] # 深交所股票 + szse_index_codes = [] # 深交所指数 + + for code in codes: + base_code = code.split('.')[0] + if code.endswith('.SH'): + sse_codes.append(base_code) + elif code.endswith('.SZ'): + # 399 开头是指数 + if base_code.startswith('399'): + szse_index_codes.append(base_code) + else: + szse_stock_codes.append(base_code) + + # 获取股票名称 + stock_names = {} + with engine.connect() as conn: + base_codes = list(set([code.split('.')[0] for code in codes])) + if base_codes: + placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) + params = {f'code{i}': code for i, code in enumerate(base_codes)} + result = conn.execute(text( + f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" + ), params).fetchall() + stock_names = {row[0]: row[1] for row in result} + + # 查询深交所股票实时行情 + if szse_stock_codes: + try: + order_book_cols = "" + if include_order_book: + order_book_cols = """, + bid_price1, bid_volume1, bid_price2, bid_volume2, bid_price3, bid_volume3, + bid_price4, bid_volume4, bid_price5, bid_volume5, + ask_price1, ask_volume1, ask_price2, ask_volume2, ask_price3, ask_volume3, + ask_price4, ask_volume4, ask_price5, ask_volume5""" + + szse_stock_query = f""" + SELECT + security_id, + last_price, + prev_close, + open_price, + high_price, + low_price, + volume, + amount, + num_trades, + upper_limit_price, + lower_limit_price, + trading_phase_code, + trade_time + {order_book_cols} + FROM stock.szse_stock_realtime + WHERE trade_date = today() + AND security_id IN %(codes)s + ORDER BY security_id, trade_time DESC + LIMIT 1 BY security_id + """ + szse_stock_data = client.execute(szse_stock_query, {'codes': szse_stock_codes}) + + for row in szse_stock_data: + security_id = row[0] + full_code = f"{security_id}.SZ" + last_px = float(row[1]) if row[1] else 0 + prev_close = float(row[2]) if row[2] else 0 + change = last_px - prev_close if last_px and prev_close else 0 + change_pct = (change / prev_close * 100) if prev_close else 0 + + quote = { + 'security_id': security_id, + 'name': stock_names.get(security_id, ''), + 'last_px': last_px, + 'prev_close_px': prev_close, + 'open_px': float(row[3]) if row[3] else 0, + 'high_px': float(row[4]) if row[4] else 0, + 'low_px': float(row[5]) if row[5] else 0, + 'total_volume_trade': float(row[6]) if row[6] else 0, + 'total_value_trade': float(row[7]) if row[7] else 0, + 'num_trades': int(row[8]) if row[8] else 0, + 'upper_limit_px': float(row[9]) if row[9] else None, + 'lower_limit_px': float(row[10]) if row[10] else None, + 'trading_phase_code': row[11], + 'change': change, + 'change_pct': change_pct, + 'update_time': str(row[12]) if row[12] else None, + } + + if include_order_book and len(row) > 13: + quote['bid_prices'] = [float(row[i]) if row[i] else 0 for i in range(13, 23, 2)] + quote['bid_volumes'] = [float(row[i]) if row[i] else 0 for i in range(14, 24, 2)] + quote['ask_prices'] = [float(row[i]) if row[i] else 0 for i in range(23, 33, 2)] + quote['ask_volumes'] = [float(row[i]) if row[i] else 0 for i in range(24, 34, 2)] + + results[full_code] = quote + + except Exception as e: + print(f"查询深交所实时行情失败: {e}") + + # 查询深交所指数实时行情 + if szse_index_codes: + try: + szse_index_query = """ + SELECT + security_id, + current_index, + prev_close, + open_index, + high_index, + low_index, + close_index, + volume, + amount, + num_trades, + trade_time + FROM stock.szse_index_realtime + WHERE trade_date = today() + AND security_id IN %(codes)s + ORDER BY security_id, trade_time DESC + LIMIT 1 BY security_id + """ + szse_index_data = client.execute(szse_index_query, {'codes': szse_index_codes}) + + for row in szse_index_data: + security_id = row[0] + full_code = f"{security_id}.SZ" + current_index = float(row[1]) if row[1] else 0 + prev_close = float(row[2]) if row[2] else 0 + change = current_index - prev_close if current_index and prev_close else 0 + change_pct = (change / prev_close * 100) if prev_close else 0 + + results[full_code] = { + 'security_id': security_id, + 'name': stock_names.get(security_id, ''), + 'last_px': current_index, + 'prev_close_px': prev_close, + 'open_px': float(row[3]) if row[3] else 0, + 'high_px': float(row[4]) if row[4] else 0, + 'low_px': float(row[5]) if row[5] else 0, + 'close_px': float(row[6]) if row[6] else None, + 'total_volume_trade': float(row[7]) if row[7] else 0, + 'total_value_trade': float(row[8]) if row[8] else 0, + 'num_trades': int(row[9]) if row[9] else 0, + 'change': change, + 'change_pct': change_pct, + 'update_time': str(row[10]) if row[10] else None, + 'bid_prices': [], + 'bid_volumes': [], + 'ask_prices': [], + 'ask_volumes': [], + } + + except Exception as e: + print(f"查询深交所指数实时行情失败: {e}") + + # 查询上交所实时行情(如果有 sse_stock_realtime 表) + if sse_codes: + try: + sse_query = """ + SELECT + security_id, + last_price, + prev_close, + open_price, + high_price, + low_price, + volume, + amount, + trade_time + FROM stock.sse_stock_realtime + WHERE trade_date = today() + AND security_id IN %(codes)s + ORDER BY security_id, trade_time DESC + LIMIT 1 BY security_id + """ + sse_data = client.execute(sse_query, {'codes': sse_codes}) + + for row in sse_data: + security_id = row[0] + full_code = f"{security_id}.SH" + last_px = float(row[1]) if row[1] else 0 + prev_close = float(row[2]) if row[2] else 0 + change = last_px - prev_close if last_px and prev_close else 0 + change_pct = (change / prev_close * 100) if prev_close else 0 + + results[full_code] = { + 'security_id': security_id, + 'name': stock_names.get(security_id, ''), + 'last_px': last_px, + 'prev_close_px': prev_close, + 'open_px': float(row[3]) if row[3] else 0, + 'high_px': float(row[4]) if row[4] else 0, + 'low_px': float(row[5]) if row[5] else 0, + 'total_volume_trade': float(row[6]) if row[6] else 0, + 'total_value_trade': float(row[7]) if row[7] else 0, + 'change': change, + 'change_pct': change_pct, + 'update_time': str(row[8]) if row[8] else None, + 'bid_prices': [], + 'bid_volumes': [], + 'ask_prices': [], + 'ask_volumes': [], + } + + except Exception as e: + print(f"查询上交所实时行情失败: {e},尝试从分钟线表查询") + + # 对于实时表中没有数据的股票,从分钟线表查询 + missing_codes = [code for code in codes if code not in results] + if missing_codes: + source = 'minute' if not results else 'mixed' + try: + # 从分钟线表查询最新数据 + minute_query = """ + SELECT + code, + close, + open, + high, + low, + volume, + amount, + timestamp + FROM stock.stock_minute + WHERE toDate(timestamp) = today() + AND code IN %(codes)s + ORDER BY code, timestamp DESC + LIMIT 1 BY code + """ + minute_data = client.execute(minute_query, {'codes': missing_codes}) + + # 获取昨收价 + prev_close_map = {} + with engine.connect() as conn: + base_codes = list(set([code.split('.')[0] for code in missing_codes])) + if base_codes: + # 获取上一交易日 + prev_day_result = conn.execute(text(""" + SELECT EXCHANGE_DATE FROM trading_days + WHERE EXCHANGE_DATE < CURDATE() + ORDER BY EXCHANGE_DATE DESC LIMIT 1 + """)).fetchone() + + if prev_day_result: + prev_day = prev_day_result[0] + placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) + params = {f'code{i}': code for i, code in enumerate(base_codes)} + params['trade_date'] = prev_day + + prev_result = conn.execute(text(f""" + SELECT SECCODE, F007N as close_price FROM ea_trade + WHERE SECCODE IN ({placeholders}) AND TRADEDATE = :trade_date + """), params).fetchall() + + prev_close_map = {row[0]: float(row[1]) if row[1] else 0 for row in prev_result} + + for row in minute_data: + code = row[0] + base_code = code.split('.')[0] + last_px = float(row[1]) if row[1] else 0 + prev_close = prev_close_map.get(base_code, 0) + change = last_px - prev_close if last_px and prev_close else 0 + change_pct = (change / prev_close * 100) if prev_close else 0 + + results[code] = { + 'security_id': base_code, + 'name': stock_names.get(base_code, ''), + 'last_px': last_px, + 'prev_close_px': prev_close, + 'open_px': float(row[2]) if row[2] else 0, + 'high_px': float(row[3]) if row[3] else 0, + 'low_px': float(row[4]) if row[4] else 0, + 'total_volume_trade': float(row[5]) if row[5] else 0, + 'total_value_trade': float(row[6]) if row[6] else 0, + 'change': change, + 'change_pct': change_pct, + 'update_time': str(row[7]) if row[7] else None, + 'bid_prices': [], + 'bid_volumes': [], + 'ask_prices': [], + 'ask_volumes': [], + } + + except Exception as e: + print(f"查询分钟线数据失败: {e}") + + return jsonify({ + 'success': True, + 'data': results, + 'source': source + }) + + except Exception as e: + print(f"灵活屏行情查询失败: {e}") + import traceback + traceback.print_exc() + return jsonify({'success': False, 'error': str(e)}), 500 + + @app.route('/api/stock//kline') def get_stock_kline(stock_code): chart_type = request.args.get('type', 'minute') diff --git a/src/views/StockOverview/components/FlexScreen/hooks/useRealtimeQuote.ts b/src/views/StockOverview/components/FlexScreen/hooks/useRealtimeQuote.ts index df943ba5..ff380efe 100644 --- a/src/views/StockOverview/components/FlexScreen/hooks/useRealtimeQuote.ts +++ b/src/views/StockOverview/components/FlexScreen/hooks/useRealtimeQuote.ts @@ -293,6 +293,102 @@ const handleSZSESnapshotMessage = ( return updated; }; +/** API 响应中的行情数据 */ +interface ApiQuoteData { + security_id: string; + name: string; + last_px: number; + prev_close_px: number; + open_px: number; + high_px: number; + low_px: number; + total_volume_trade: number; + total_value_trade: number; + num_trades?: number; + upper_limit_px?: number; + lower_limit_px?: number; + trading_phase_code?: string; + change: number; + change_pct: number; + bid_prices?: number[]; + bid_volumes?: number[]; + ask_prices?: number[]; + ask_volumes?: number[]; + update_time?: string; +} + +/** API 响应结构 */ +interface FlexScreenQuotesResponse { + success: boolean; + data: Record; + source: 'realtime' | 'minute' | 'mixed'; + error?: string; +} + +/** + * 从后端 API 获取初始行情数据 + * 用于盘后或 WebSocket 无数据时的回退 + */ +const fetchInitialQuotes = async ( + codes: string[], + includeOrderBook = true +): Promise => { + if (codes.length === 0) return {}; + + try { + const response = await fetch('/api/flex-screen/quotes', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ codes, include_order_book: includeOrderBook }), + }); + + const result: FlexScreenQuotesResponse = await response.json(); + + if (!result.success || !result.data) { + logger.warn('FlexScreen', '获取初始行情失败', { error: result.error }); + return {}; + } + + logger.info('FlexScreen', `获取初始行情成功`, { source: result.source, count: Object.keys(result.data).length }); + + // 转换 API 响应为 QuotesMap 格式 + const quotesMap: QuotesMap = {}; + + Object.entries(result.data).forEach(([code, apiQuote]) => { + const exchange: Exchange = code.endsWith('.SH') ? 'SSE' : 'SZSE'; + + quotesMap[code] = { + code, + name: apiQuote.name || '', + price: apiQuote.last_px, + prevClose: apiQuote.prev_close_px, + open: apiQuote.open_px, + high: apiQuote.high_px, + low: apiQuote.low_px, + volume: apiQuote.total_volume_trade, + amount: apiQuote.total_value_trade, + numTrades: apiQuote.num_trades, + upperLimit: apiQuote.upper_limit_px, + lowerLimit: apiQuote.lower_limit_px, + change: apiQuote.change, + changePct: apiQuote.change_pct, + bidPrices: apiQuote.bid_prices || [], + bidVolumes: apiQuote.bid_volumes || [], + askPrices: apiQuote.ask_prices || [], + askVolumes: apiQuote.ask_volumes || [], + tradingPhase: apiQuote.trading_phase_code, + updateTime: apiQuote.update_time, + exchange, + } as QuoteData; + }); + + return quotesMap; + } catch (e) { + logger.error('FlexScreen', '获取初始行情失败', e); + return {}; + } +}; + /** * 实时行情 Hook * @param codes - 订阅的证券代码列表(带后缀,如 000001.SZ) @@ -305,6 +401,8 @@ export const useRealtimeQuote = (codes: string[] = []): UseRealtimeQuoteReturn = const heartbeatRefs = useRef>({ SSE: null, SZSE: null }); const reconnectRefs = useRef>({ SSE: null, SZSE: null }); const reconnectCountRef = useRef>({ SSE: 0, SZSE: 0 }); + // 是否已加载过初始数据 + const initialLoadedRef = useRef>(new Set()); const subscribedCodes = useRef>>({ SSE: new Set(), @@ -634,6 +732,32 @@ export const useRealtimeQuote = (codes: string[] = []): UseRealtimeQuoteReturn = } }); + const allNewCodes = [...newSseCodes, ...newSzseCodes]; + + // 检查是否有新增的代码需要加载初始数据 + const codesToLoad = allNewCodes.filter(c => !initialLoadedRef.current.has(c)); + + if (codesToLoad.length > 0) { + // 标记为已加载(避免重复请求) + codesToLoad.forEach(c => initialLoadedRef.current.add(c)); + + // 从后端获取初始行情数据(异步,不阻塞 WebSocket 连接) + fetchInitialQuotes(codesToLoad, true).then(initialQuotes => { + if (Object.keys(initialQuotes).length > 0) { + setQuotes(prev => { + // 只设置当前没有数据的股票(避免覆盖 WebSocket 实时数据) + const merged = { ...prev }; + Object.entries(initialQuotes).forEach(([code, quote]) => { + if (!merged[code] || !merged[code].price) { + merged[code] = quote; + } + }); + return merged; + }); + } + }); + } + // 更新上交所订阅 const oldSseCodes = subscribedCodes.current.SSE; const sseToAdd = [...newSseCodes].filter(c => !oldSseCodes.has(c)); @@ -684,13 +808,16 @@ export const useRealtimeQuote = (codes: string[] = []): UseRealtimeQuoteReturn = } } - // 清理已取消订阅的 quotes - const allNewCodes = new Set([...newSseCodes, ...newSzseCodes]); + // 清理已取消订阅的 quotes 和初始加载记录 + const allNewCodesSet = new Set(allNewCodes); setQuotes(prev => { const updated: QuotesMap = {}; Object.keys(prev).forEach(code => { - if (allNewCodes.has(code)) { + if (allNewCodesSet.has(code)) { updated[code] = prev[code]; + } else { + // 清理初始加载记录 + initialLoadedRef.current.delete(code); } }); return updated;