From 7d9e1174e4c9ef69406b548724aac99df626cf72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=B7=E5=B0=8F=E5=89=8D?= Date: Sun, 18 Jan 2026 16:37:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0ios?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../screens/Agent/components/MessageBubble.js | 198 +++++----- .../screens/StockDetail/StockDetailScreen.js | 34 +- app.py | 342 +++++++++++++----- requirements.txt | 2 +- 4 files changed, 360 insertions(+), 216 deletions(-) diff --git a/MeAgent/src/screens/Agent/components/MessageBubble.js b/MeAgent/src/screens/Agent/components/MessageBubble.js index b5897800..7259a44f 100644 --- a/MeAgent/src/screens/Agent/components/MessageBubble.js +++ b/MeAgent/src/screens/Agent/components/MessageBubble.js @@ -268,7 +268,7 @@ const PlanBubble = memo(({ content, plan }) => ( )); /** - * 执行中气泡 - 带实时步骤进度显示 + * 执行中气泡 - 简洁的步骤进度显示 */ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, currentStepIndex }) => { const spinAnim = useRef(new Animated.Value(0)).current; @@ -317,6 +317,7 @@ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, cu return ( + {/* 头部 */} ⚙️ @@ -324,12 +325,9 @@ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, cu 正在执行 - {completedCount}{totalSteps > 0 ? ` / ${totalSteps}` : ''} + {completedCount} / {totalSteps} - {currentStep && ( - → {currentStep.tool} - )} {/* 进度条 */} @@ -344,7 +342,7 @@ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, cu )} - {/* 步骤列表 */} + {/* 步骤列表 - 简洁版 */} {displaySteps.length > 0 && ( {displaySteps.map((step, index) => { @@ -353,59 +351,45 @@ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, cu const isRunning = currentStepIndex !== null && currentStepIndex !== undefined ? index === currentStepIndex : index === stepResults.length && currentStep; - const isPending = !isCompleted && !isRunning; return ( - - {/* 状态图标 */} - + + {/* 状态指示器 */} + {isCompleted ? ( - + {result?.status === 'success' ? '✓' : '✗'} ) : isRunning ? ( - + ) : ( - + {index + 1} )} - {/* 步骤序号 */} - - {index + 1} - - {/* 工具名称 */} - + {step.tool} {/* 执行时间 */} {result?.execution_time && ( - - {result.execution_time.toFixed(2)}s + + {result.execution_time.toFixed(1)}s )} @@ -418,12 +402,37 @@ const ExecutingBubble = memo(({ content, plan, stepResults = [], currentStep, cu ); }); +/** + * 过滤 AI 响应中的特殊标签 + * @param {string} content - 原始内容 + * @returns {string} - 过滤后的内容 + */ +const filterResponseContent = (content) => { + if (!content) return ''; + + // 过滤 minimax:tool_call 标签及其内容 + // 格式: ... + let filtered = content.replace(/[\s\S]*?<\/minimax:tool_call>/g, ''); + + // 过滤可能的其他特殊标签 + // 格式: ... + filtered = filtered.replace(/[\s\S]*?<\/tool_call>/g, ''); + + // 清理多余的空行 + filtered = filtered.replace(/\n{3,}/g, '\n\n'); + + return filtered.trim(); +}; + /** * AI 响应气泡 - 支持 Markdown 和图表 */ const ResponseBubble = memo(({ content, isStreaming }) => { const cursorAnim = useRef(new Animated.Value(0)).current; + // 过滤特殊标签 + const filteredContent = filterResponseContent(content); + useEffect(() => { if (isStreaming) { const blink = Animated.loop( @@ -448,7 +457,7 @@ const ResponseBubble = memo(({ content, isStreaming }) => { return ( - + {isStreaming && ( { ...realtimeQuote, }), [currentStock, realtimeQuote]); + // 获取显示用的股票名称(优先使用 API 返回的名称) + const displayStockName = useMemo(() => { + // 1. 优先使用 API 返回的股票名称 + if (currentStock?.stock_name) { + return currentStock.stock_name; + } + // 2. 如果路由参数的 stockName 与 stockCode 不同,使用它 + if (stockName && stockName !== stockCode) { + return stockName; + } + // 3. 降级显示股票代码 + return stockCode; + }, [currentStock?.stock_name, stockName, stockCode]); + // 加载涨幅分析数据 const loadRiseAnalysis = useCallback(async () => { if (!stockCode) return; @@ -121,6 +136,16 @@ const StockDetailScreen = () => { loadRiseAnalysis(); }, [dispatch, stockCode, stockName, chartType, eventTime, loadRiseAnalysis]); + // 股票代码变化时,清空之前的数据并重置图表类型 + useEffect(() => { + // 清空之前股票的数据 + dispatch(clearCurrentStock()); + // 重置为分时图 + dispatch(setChartType('minute')); + // 清空本地状态(涨幅分析数据) + setRiseAnalysisData([]); + }, [dispatch, stockCode]); + // 初始加载 useEffect(() => { loadStockData(); @@ -133,11 +158,10 @@ const StockDetailScreen = () => { if (type === 'minute') { dispatch(fetchMinuteData(stockCode)); } else { - if (!klineData[type] || klineData[type].length === 0) { - dispatch(fetchKlineData({ stockCode, type, eventTime })); - } + // 每次切换都重新加载数据,确保是当前股票的数据 + dispatch(fetchKlineData({ stockCode, type, eventTime })); } - }, [dispatch, stockCode, klineData, eventTime]); + }, [dispatch, stockCode, eventTime]); // 返回 const handleBack = useCallback(() => { @@ -200,7 +224,7 @@ const StockDetailScreen = () => { {/* 价格头部 - Wind 风格 */} = %(start)s ) - SELECT code, close, timestamp, high, low, volume, amt + SELECT code, close, timestamp, high, low, volume, amt, change_pct FROM latest WHERE rn = 1 """ @@ -6092,14 +6093,15 @@ def get_watchlist_realtime(): # 构建最新价格映射 latest_data_map = {} for row in result: - code, close, ts, high, low, volume, amt = row + code, close, ts, high, low, volume, amt, change_pct = row latest_data_map[code] = { 'close': float(close), 'timestamp': ts, 'high': float(high), 'low': float(low), 'volume': int(volume), - 'amount': float(amt) + 'amount': float(amt), + 'change_pct': float(change_pct) if change_pct else 0 } # 批量查询前收盘价(使用 ea_trade 表,更准确) @@ -8145,12 +8147,13 @@ def get_stock_quotes(): prev_close_map[norm_code] = base_close_map[base_code] # 批量查询当前价格数据(从 ClickHouse) - # 使用 argMax 函数获取最新价格,比窗口函数效率高很多 + # 使用 argMax 函数获取最新价格和涨跌幅 batch_price_query = """ SELECT code, - argMax(close, timestamp) as last_price - FROM stock_minute + argMax(close, timestamp) as last_price, + argMax(change_pct, timestamp) as last_change_pct + FROM stock.stock_minute WHERE code IN %(codes)s AND timestamp >= %(start)s AND timestamp <= %(end)s @@ -8170,12 +8173,13 @@ def get_stock_quotes(): for row in batch_data: code = row[0] last_price = float(row[1]) if row[1] is not None else None - prev_close = prev_close_map.get(code) + change_pct = float(row[2]) if row[2] is not None else None - # 计算涨跌幅 - change_pct = None - if last_price is not None and prev_close is not None and prev_close > 0: - change_pct = (last_price - prev_close) / prev_close * 100 + # 如果数据库中没有涨跌幅,使用前收盘价计算 + if change_pct is None and last_price is not None: + prev_close = prev_close_map.get(code) + if prev_close is not None and prev_close > 0: + change_pct = (last_price - prev_close) / prev_close * 100 price_data_map[code] = { 'price': last_price, @@ -8205,20 +8209,19 @@ def get_stock_quotes(): for orig_code in original_codes: norm_code = code_mapping[orig_code] try: - # 查询当前价格 + # 查询当前价格和涨跌幅 current_data = client.execute(""" - SELECT close FROM stock_minute + SELECT close, change_pct FROM stock.stock_minute WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s ORDER BY timestamp DESC LIMIT 1 """, {'code': norm_code, 'start': start_datetime, 'end': end_datetime}) last_price = float(current_data[0][0]) if current_data and current_data[0] and current_data[0][0] else None + change_pct = float(current_data[0][1]) if current_data and current_data[0] and len(current_data[0]) > 1 and current_data[0][1] else None - # 查询前一交易日收盘价 - prev_close = None - if prev_trading_day and last_price is not None: + # 如果数据库中没有涨跌幅,使用前收盘价计算 + if change_pct is None and prev_trading_day and last_price is not None: base_code = orig_code.split('.')[0] - # ea_trade 表的 TRADEDATE 格式是 YYYYMMDD(无连字符) prev_day_str = prev_trading_day.strftime('%Y%m%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day).replace('-', '') with engine.connect() as conn: prev_result = conn.execute(text(""" @@ -8227,11 +8230,8 @@ def get_stock_quotes(): WHERE SECCODE = :code AND TRADEDATE = :trade_date """), {'code': base_code, 'trade_date': prev_day_str}).fetchone() prev_close = float(prev_result[0]) if prev_result and prev_result[0] else None - - # 计算涨跌幅 - change_pct = None - if last_price is not None and prev_close is not None and prev_close > 0: - change_pct = (last_price - prev_close) / prev_close * 100 + if prev_close is not None and prev_close > 0: + change_pct = (last_price - prev_close) / prev_close * 100 results[orig_code] = { 'price': last_price, @@ -8865,6 +8865,10 @@ def get_stock_kline(stock_code): if chart_type == 'daily': return get_daily_kline(stock_code, event_datetime, stock_name) + elif chart_type == 'weekly': + return get_weekly_kline(stock_code, event_datetime, stock_name) + elif chart_type == 'monthly': + return get_monthly_kline(stock_code, event_datetime, stock_name) elif chart_type == 'minute': return get_minute_kline(stock_code, event_datetime, stock_name, skip_next_day=skip_next_day) elif chart_type == 'timeline': @@ -8958,8 +8962,8 @@ def get_batch_kline_data(): # 批量查询分时数据(使用标准化代码查询 ClickHouse) batch_data = client.execute(""" - SELECT code, timestamp, close, volume - FROM stock_minute + SELECT code, timestamp, close, volume, amt, change_pct + FROM stock.stock_minute WHERE code IN %(codes)s AND timestamp BETWEEN %(start)s AND %(end)s ORDER BY code, timestamp @@ -8969,7 +8973,7 @@ def get_batch_kline_data(): 'end': end_time }) - # 按股票代码分组,同时计算均价和涨跌幅 + # 按股票代码分组,同时计算均价 stock_data = {} stock_accum = {} # 用于计算均价的累计值 for row in batch_data: @@ -8977,27 +8981,25 @@ def get_batch_kline_data(): base_code = norm_code.split('.')[0] price = float(row[2]) volume = float(row[3]) + amount = float(row[4]) if row[4] else price * volume + change_pct = float(row[5]) if row[5] else 0 if norm_code not in stock_data: stock_data[norm_code] = [] stock_accum[norm_code] = {'total_amount': 0, 'total_volume': 0} - # 累计计算均价 - stock_accum[norm_code]['total_amount'] += price * volume + # 累计计算均价(使用真实成交额) + stock_accum[norm_code]['total_amount'] += amount stock_accum[norm_code]['total_volume'] += volume total_vol = stock_accum[norm_code]['total_volume'] avg_price = stock_accum[norm_code]['total_amount'] / total_vol if total_vol > 0 else price - # 计算涨跌幅 - prev_close = prev_close_map.get(base_code) - change_percent = ((price - prev_close) / prev_close * 100) if prev_close and prev_close > 0 else 0 - stock_data[norm_code].append({ 'time': row[1].strftime('%H:%M'), 'price': price, 'avg_price': round(avg_price, 2), 'volume': volume, - 'change_percent': round(change_percent, 2) + 'change_percent': round(change_pct, 2) # 直接使用数据库中的涨跌幅 }) # 组装结果(使用原始代码作为 key 返回) @@ -9147,7 +9149,7 @@ def get_latest_minute_data(stock_code): # 检查这个交易日是否有分钟数据 test_data = client.execute(""" SELECT COUNT(*) - FROM stock_minute + FROM stock.stock_minute WHERE code = %(code)s AND timestamp BETWEEN %(start)s AND %(end)s LIMIT 1 @@ -9180,8 +9182,9 @@ def get_latest_minute_data(stock_code): low, close, volume, - amt - FROM stock_minute + amt, + change_pct + FROM stock.stock_minute WHERE code = %(code)s AND timestamp BETWEEN %(start)s AND %(end)s ORDER BY timestamp @@ -9198,7 +9201,8 @@ def get_latest_minute_data(stock_code): 'low': float(row[3]), 'close': float(row[4]), 'volume': float(row[5]), - 'amount': float(row[6]) + 'amount': float(row[6]), + 'change_pct': float(row[7]) if row[7] else 0 } for row in data] return jsonify({ @@ -10167,6 +10171,158 @@ def get_daily_kline(stock_code, event_datetime, stock_name): }) +def get_weekly_kline(stock_code, event_datetime, stock_name): + """处理周K线数据 - 从日K数据聚合计算""" + stock_code = stock_code.split('.')[0] + + with engine.connect() as conn: + # 获取3年的日K数据,然后在 Python 端聚合为周K + kline_sql = """ + SELECT + t.TRADEDATE, + CAST(t.F003N AS FLOAT) as open, + CAST(t.F007N AS FLOAT) as close, + CAST(t.F005N AS FLOAT) as high, + CAST(t.F006N AS FLOAT) as low, + CAST(t.F004N AS FLOAT) as volume + FROM ea_trade t + WHERE t.SECCODE = :stock_code + AND t.TRADEDATE BETWEEN DATE_SUB(:trade_date, INTERVAL 3 YEAR) + AND DATE_ADD(:trade_date, INTERVAL 30 DAY) + ORDER BY t.TRADEDATE + """ + + result = conn.execute(text(kline_sql), { + "stock_code": stock_code, + "trade_date": event_datetime.date() + }).fetchall() + + if not result: + return jsonify({ + 'error': 'No data available', + 'code': stock_code, + 'name': stock_name, + 'data': [], + 'trade_date': event_datetime.date().strftime('%Y-%m-%d'), + 'type': 'weekly' + }) + + # 按周聚合日K数据 + from collections import defaultdict + weekly_data = defaultdict(list) + + for row in result: + # 使用 ISO 周:(年, 周数) + week_key = row.TRADEDATE.isocalendar()[:2] + weekly_data[week_key].append({ + 'date': row.TRADEDATE, + 'open': float(row.open) if row.open else 0, + 'high': float(row.high) if row.high else 0, + 'low': float(row.low) if row.low else 0, + 'close': float(row.close) if row.close else 0, + 'volume': float(row.volume) if row.volume else 0 + }) + + # 聚合为周K + kline_data = [] + for week_key in sorted(weekly_data.keys()): + days = weekly_data[week_key] + days.sort(key=lambda x: x['date']) + kline_data.append({ + 'time': days[0]['date'].strftime('%Y-%m-%d'), # 周一日期 + 'open': days[0]['open'], # 周一开盘价 + 'high': max(d['high'] for d in days), # 周内最高 + 'low': min(d['low'] for d in days), # 周内最低 + 'close': days[-1]['close'], # 周五收盘价 + 'volume': sum(d['volume'] for d in days) # 周成交量 + }) + + return jsonify({ + 'code': stock_code, + 'name': stock_name, + 'data': kline_data, + 'trade_date': event_datetime.date().strftime('%Y-%m-%d'), + 'type': 'weekly', + 'is_history': True + }) + + +def get_monthly_kline(stock_code, event_datetime, stock_name): + """处理月K线数据 - 从日K数据聚合计算""" + stock_code = stock_code.split('.')[0] + + with engine.connect() as conn: + # 获取5年的日K数据,然后在 Python 端聚合为月K + kline_sql = """ + SELECT + t.TRADEDATE, + CAST(t.F003N AS FLOAT) as open, + CAST(t.F007N AS FLOAT) as close, + CAST(t.F005N AS FLOAT) as high, + CAST(t.F006N AS FLOAT) as low, + CAST(t.F004N AS FLOAT) as volume + FROM ea_trade t + WHERE t.SECCODE = :stock_code + AND t.TRADEDATE BETWEEN DATE_SUB(:trade_date, INTERVAL 5 YEAR) + AND DATE_ADD(:trade_date, INTERVAL 30 DAY) + ORDER BY t.TRADEDATE + """ + + result = conn.execute(text(kline_sql), { + "stock_code": stock_code, + "trade_date": event_datetime.date() + }).fetchall() + + if not result: + return jsonify({ + 'error': 'No data available', + 'code': stock_code, + 'name': stock_name, + 'data': [], + 'trade_date': event_datetime.date().strftime('%Y-%m-%d'), + 'type': 'monthly' + }) + + # 按月聚合日K数据 + from collections import defaultdict + monthly_data = defaultdict(list) + + for row in result: + # 月份键:(年, 月) + month_key = (row.TRADEDATE.year, row.TRADEDATE.month) + monthly_data[month_key].append({ + 'date': row.TRADEDATE, + 'open': float(row.open) if row.open else 0, + 'high': float(row.high) if row.high else 0, + 'low': float(row.low) if row.low else 0, + 'close': float(row.close) if row.close else 0, + 'volume': float(row.volume) if row.volume else 0 + }) + + # 聚合为月K + kline_data = [] + for month_key in sorted(monthly_data.keys()): + days = monthly_data[month_key] + days.sort(key=lambda x: x['date']) + kline_data.append({ + 'time': days[0]['date'].strftime('%Y-%m-%d'), # 月初日期 + 'open': days[0]['open'], # 月初开盘价 + 'high': max(d['high'] for d in days), # 月内最高 + 'low': min(d['low'] for d in days), # 月内最低 + 'close': days[-1]['close'], # 月末收盘价 + 'volume': sum(d['volume'] for d in days) # 月成交量 + }) + + return jsonify({ + 'code': stock_code, + 'name': stock_name, + 'data': kline_data, + 'trade_date': event_datetime.date().strftime('%Y-%m-%d'), + 'type': 'monthly', + 'is_history': True + }) + + def get_minute_kline(stock_code, event_datetime, stock_name, skip_next_day=False): """处理分钟K线数据 @@ -10202,8 +10358,8 @@ def get_minute_kline(stock_code, event_datetime, stock_name, skip_next_day=False # 获取目标日期的完整交易时段数据 data = client.execute(""" SELECT - timestamp, open, high, low, close, volume, amt - FROM stock_minute + timestamp, open, high, low, close, volume, amt, change_pct + FROM stock.stock_minute WHERE code = %(code)s AND timestamp BETWEEN %(start)s AND %(end)s @@ -10221,7 +10377,8 @@ def get_minute_kline(stock_code, event_datetime, stock_name, skip_next_day=False 'low': float(row[3]), 'close': float(row[4]), 'volume': float(row[5]), - 'amount': float(row[6]) + 'amount': float(row[6]), + 'change_pct': float(row[7]) if row[7] else 0 } for row in data] return jsonify({ @@ -10282,7 +10439,7 @@ def get_timeline_data(stock_code, event_datetime, stock_name): # 如果 MySQL 没有数据,回退到 ClickHouse if prev_close is None: prev_close_query = """ - SELECT close FROM stock_minute + SELECT close FROM stock.stock_minute WHERE code = %(code)s AND timestamp < %(start)s ORDER BY timestamp DESC LIMIT 1 """ @@ -10293,11 +10450,12 @@ def get_timeline_data(stock_code, event_datetime, stock_name): if prev_close_result: prev_close = float(prev_close_result[0][0]) + # 查询分时数据,包含 change_pct 和 amt 用于计算均价 data = client.execute( """ SELECT - timestamp, close, volume - FROM stock_minute + timestamp, close, volume, amt, change_pct + FROM stock.stock_minute WHERE code = %(code)s AND timestamp BETWEEN %(start)s AND %(end)s @@ -10316,19 +10474,19 @@ def get_timeline_data(stock_code, event_datetime, stock_name): for row in data: price = float(row[1]) volume = float(row[2]) - total_amount += price * volume + amount = float(row[3]) if row[3] else price * volume + change_pct = float(row[4]) if row[4] else 0 + + total_amount += amount total_volume += volume avg_price = total_amount / total_volume if total_volume > 0 else price - # 计算涨跌幅 - change_percent = ((price - prev_close) / prev_close * 100) if prev_close else 0 - timeline_data.append({ 'time': row[0].strftime('%H:%M'), 'price': price, 'avg_price': avg_price, 'volume': volume, - 'change_percent': change_percent, + 'change_percent': change_pct, # 直接使用数据库中的涨跌幅 }) return jsonify({ @@ -11414,12 +11572,13 @@ def get_events_effectiveness_stats(): start_datetime = datetime.combine(target_date, dt_time(9, 30)) end_datetime = datetime.combine(target_date, dt_time(15, 0)) - # 1. 从 ClickHouse 批量查询最新价格 + # 1. 从 ClickHouse 批量查询最新价格和涨跌幅 batch_price_query = """ SELECT code, - argMax(close, timestamp) as last_price - FROM stock_minute + argMax(close, timestamp) as last_price, + argMax(change_pct, timestamp) as last_change_pct + FROM stock.stock_minute WHERE code IN %(codes)s AND timestamp >= %(start)s AND timestamp <= %(end)s @@ -11431,35 +11590,17 @@ def get_events_effectiveness_stats(): 'end': end_datetime }) - # 构建价格映射 + # 构建价格和涨跌幅映射(直接使用数据库中的 change_pct) price_map = {row[0]: float(row[1]) if row[1] else None for row in batch_data} + change_pct_map = {row[0]: float(row[2]) if row[2] is not None else None for row in batch_data} - # 2. 批量获取前收盘价(使用 Redis 缓存) - prev_date_str = None - try: - target_idx = trading_days.index(target_date) - if target_idx > 0: - prev_trading_day = trading_days[target_idx - 1] - prev_date_str = prev_trading_day.strftime('%Y%m%d') - except (ValueError, IndexError): - pass - - prev_close_map = {} - if prev_date_str: - base_codes = list(set([code.split('.')[0] for code in unique_stocks.keys() if code])) - prev_close_map = get_cached_prev_close(base_codes, prev_date_str) - - # 3. 计算涨跌幅并更新 + # 直接使用数据库返回的涨跌幅更新 for orig_code, stock_info in unique_stocks.items(): norm_code = code_mapping.get(orig_code) - base_code = orig_code.split('.')[0] if orig_code else '' + db_change_pct = change_pct_map.get(norm_code) if norm_code else None - last_price = price_map.get(norm_code) if norm_code else None - prev_close = prev_close_map.get(base_code) - - if last_price is not None and prev_close is not None and prev_close > 0: - change_pct = (last_price - prev_close) / prev_close * 100 - stock_info['maxChg'] = round(change_pct, 2) + if db_change_pct is not None: + stock_info['maxChg'] = round(db_change_pct, 2) else: stock_info['maxChg'] = 0 @@ -11525,8 +11666,9 @@ def get_events_effectiveness_stats(): market_price_query = """ SELECT code, - argMax(close, timestamp) as last_price - FROM stock_minute + argMax(close, timestamp) as last_price, + argMax(change_pct, timestamp) as last_change_pct + FROM stock.stock_minute WHERE timestamp >= %(start)s AND timestamp <= %(end)s AND ( @@ -11542,22 +11684,16 @@ def get_events_effectiveness_stats(): }) if market_data: - # 提取股票代码并获取前收盘价 - all_base_codes = [row[0].split('.')[0] for row in market_data] - all_prev_close = get_cached_prev_close(all_base_codes, prev_date_str) - + # 直接使用数据库返回的涨跌幅统计 rising = 0 falling = 0 flat = 0 for row in market_data: - code = row[0] - last_price = float(row[1]) if row[1] else None - base_code = code.split('.')[0] - prev_close = all_prev_close.get(base_code) + # row: (code, last_price, last_change_pct) + change_pct = float(row[2]) if row[2] is not None else None - if last_price and prev_close and prev_close > 0: - change_pct = (last_price - prev_close) / prev_close * 100 + if change_pct is not None: if change_pct > 0.01: # 上涨 rising += 1 elif change_pct < -0.01: # 下跌 @@ -17350,10 +17486,10 @@ def get_concept_stocks(concept_id): ch_codes_str = "','".join(ch_codes) - # 查询当天最新价格 + # 查询当天最新价格和涨跌幅 query = f""" - SELECT code, close - FROM stock_minute + SELECT code, close, change_pct + FROM stock.stock_minute WHERE code IN ('{ch_codes_str}') AND toDate(timestamp) = today() ORDER BY timestamp DESC @@ -17361,25 +17497,31 @@ def get_concept_stocks(concept_id): """ result = ch_client.execute(query) + # 存储价格和涨跌幅 + change_pct_map = {} for row in result: - ch_code, close_price = row + ch_code, close_price, db_change_pct = row if ch_code in code_mapping and close_price: original_code = code_mapping[ch_code] current_price_map[original_code] = float(close_price) + if db_change_pct is not None: + change_pct_map[original_code] = float(db_change_pct) except Exception as ch_err: app.logger.warning(f"ClickHouse 获取价格失败: {ch_err}") + change_pct_map = {} - # 5. 计算涨跌幅并合并数据 + # 5. 合并数据(直接使用数据库的涨跌幅) result_stocks = [] for stock in stocks_info: code = stock['code'] prev_close = prev_close_map.get(code) current_price = current_price_map.get(code) - change_pct = None - if prev_close and current_price and prev_close > 0: - change_pct = round((current_price - prev_close) / prev_close * 100, 2) + # 优先使用数据库返回的涨跌幅 + change_pct = change_pct_map.get(code) + if change_pct is not None: + change_pct = round(change_pct, 2) result_stocks.append({ 'code': code, @@ -18333,7 +18475,7 @@ def get_latest_price_from_clickhouse(stock_code): # 1. 首先尝试获取最新的分钟数据(近30天) minute_query = """ SELECT close, timestamp - FROM stock_minute + FROM stock.stock_minute WHERE code = %(code)s AND timestamp >= today() - 30 ORDER BY timestamp DESC @@ -18401,7 +18543,7 @@ def get_next_minute_price(stock_code, order_time): # 获取下单后一分钟内的数据 query = """ SELECT close, timestamp - FROM stock_minute + FROM stock.stock_minute WHERE code = %(code)s AND timestamp \ > %(order_time)s @@ -18422,9 +18564,9 @@ def get_next_minute_price(stock_code, order_time): return float(result[0][0]), result[0][1] # 如果一分钟内没有数据,获取最近的数据 - query = """ + fallback_query = """ SELECT close, timestamp - FROM stock_minute + FROM stock.stock_minute WHERE code = %(code)s AND timestamp \ > %(order_time)s @@ -18432,7 +18574,7 @@ def get_next_minute_price(stock_code, order_time): LIMIT 1 \ """ - result = client.execute(query, { + result = client.execute(fallback_query, { 'code': stock_code, 'order_time': order_time }) diff --git a/requirements.txt b/requirements.txt index 9268672d..1461aa1c 100755 --- a/requirements.txt +++ b/requirements.txt @@ -26,4 +26,4 @@ itsdangerous==2.1.2 APScheduler==3.10.4 elasticsearch==8.15.0 PyJWT==2.8.0 -PyAPNs2==0.7.2 \ No newline at end of file +PyAPNs2==2.0.0 \ No newline at end of file