""" ClickHouse 查询优化方案 - 针对 /api/event//related-stocks-detail 问题分析: 1. N+1 查询问题:每只股票执行 3 次独立查询(共 30+ 次) 2. 重复扫描:first_price 和 last_price 需要扫描表两次 3. 缺少批量查询优化 优化方案对比: ┌─────────────┬──────────────┬──────────────┬────────────┐ │ 方案 │ 查询次数 │ 性能提升 │ 实现难度 │ ├─────────────┼──────────────┼──────────────┼────────────┤ │ 当前代码 │ N * 3 │ 基准 │ - │ │ 方案1 批量 │ 1 │ 80-90% │ 中等 │ │ 方案2 并行 │ N * 3 (并行)│ 40-60% │ 简单 │ │ 方案3 缓存 │ 减少重复 │ 20-40% │ 简单 │ └─────────────┴──────────────┴──────────────┴────────────┘ """ # ============================================================================ # 方案 1: 批量查询(推荐)- 将所有股票的查询合并为一次 # ============================================================================ def get_batch_stock_prices_optimized(client, stock_codes, start_datetime, end_datetime): """ 批量获取多只股票的价格数据(一次查询) 性能对比: - 旧方案:10 只股票 = 20 次查询(first + last) - 新方案:10 只股票 = 1 次查询 - 性能提升:约 20 倍 Args: client: ClickHouse 客户端 stock_codes: 股票代码列表 ['600519.SH', '601088.SH', ...] start_datetime: 开始时间 end_datetime: 结束时间 Returns: dict: { '600519.SH': { 'first_price': 1850.0, 'last_price': 1860.0, 'change_pct': 0.54, 'open': 1850.0, 'high': 1865.0, 'low': 1848.0, 'volume': 1234567, 'amount': 2345678900.0 }, ... } """ if not stock_codes: return {} # 构建批量查询 SQL(使用 IN 子句) query = """ SELECT code, -- 第一个价格(事件发生时) anyIf(close, rownum_asc = 1) as first_price, -- 最后一个价格(当前时间) anyIf(close, rownum_desc = 1) as last_price, -- 涨跌幅 (last_price - first_price) / first_price * 100 as change_pct, -- 涨跌额 last_price - first_price as change_amount, -- 其他价格信息(取最后一条记录) anyIf(open, rownum_desc = 1) as open_price, anyIf(high, rownum_desc = 1) as high_price, anyIf(low, rownum_desc = 1) as low_price, anyIf(volume, rownum_desc = 1) as volume, anyIf(amt, rownum_desc = 1) as amount FROM ( SELECT code, timestamp, close, open, high, low, volume, amt, -- 正序排名(用于获取第一个价格) ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp ASC) as rownum_asc, -- 倒序排名(用于获取最后一个价格) ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp DESC) as rownum_desc FROM stock_minute WHERE code IN %(codes)s AND timestamp >= %(start)s AND timestamp <= %(end)s ) GROUP BY code """ try: # 执行批量查询 data = client.execute(query, { 'codes': tuple(stock_codes), # ClickHouse IN 需要 tuple 'start': start_datetime, 'end': end_datetime }) # 格式化结果为字典 result = {} for row in data: code = row[0] result[code] = { 'first_price': float(row[1]) if row[1] else None, 'last_price': float(row[2]) if row[2] else None, 'change_pct': float(row[3]) if row[3] else None, 'change_amount': float(row[4]) if row[4] else None, 'open_price': float(row[5]) if row[5] else None, 'high_price': float(row[6]) if row[6] else None, 'low_price': float(row[7]) if row[7] else None, 'volume': int(row[8]) if row[8] else None, 'amount': float(row[9]) if row[9] else None, } print(f"批量查询完成,获取了 {len(result)} 只股票的数据") return result except Exception as e: print(f"批量查询失败: {e}") return {} def get_batch_minute_chart_data(client, stock_codes, start_datetime, end_datetime): """ 批量获取多只股票的分时图数据 Args: client: ClickHouse 客户端 stock_codes: 股票代码列表 start_datetime: 开始时间 end_datetime: 结束时间 Returns: dict: { '600519.SH': [ {'time': '09:30', 'close': 1850.0, 'volume': 12345, ...}, {'time': '09:31', 'close': 1851.0, 'volume': 12346, ...}, ... ], ... } """ if not stock_codes: return {} query = """ SELECT code, timestamp, open, high, low, close, volume, amt FROM stock_minute WHERE code IN %(codes)s AND timestamp >= %(start)s AND timestamp <= %(end)s ORDER BY code, timestamp """ try: data = client.execute(query, { 'codes': tuple(stock_codes), 'start': start_datetime, 'end': end_datetime }) # 按股票代码分组 result = {} for row in data: code = row[0] if code not in result: result[code] = [] result[code].append({ 'time': row[1].strftime('%H:%M'), 'open': float(row[2]) if row[2] else None, 'high': float(row[3]) if row[3] else None, 'low': float(row[4]) if row[4] else None, 'close': float(row[5]) if row[5] else None, 'volume': float(row[6]) if row[6] else None, 'amount': float(row[7]) if row[7] else None }) print(f"批量获取分时数据完成,获取了 {len(result)} 只股票的数据") return result except Exception as e: print(f"批量获取分时数据失败: {e}") return {} # ============================================================================ # 使用示例:替换原来的 for 循环 # ============================================================================ def api_event_related_stocks_optimized(event_id): """优化后的端点实现""" try: from datetime import datetime event = Event.query.get_or_404(event_id) related_stocks = event.related_stocks.order_by(RelatedStock.correlation.desc()).all() if not related_stocks: return jsonify({'code': 200, 'data': {'related_stocks': []}}) # 获取 ClickHouse 客户端 client = get_clickhouse_client() # 计算时间范围(省略交易日计算逻辑,与原代码相同) event_time = event.start_time if event.start_time else event.created_at trading_day, start_time, end_time = get_trading_day_and_times(event_time) start_datetime = datetime.combine(trading_day, start_time) end_datetime = datetime.combine(trading_day, end_time) # ✅ 批量查询所有股票的价格数据(只查询 1 次) stock_codes = [stock.stock_code for stock in related_stocks] prices_data = get_batch_stock_prices_optimized( client, stock_codes, start_datetime, end_datetime ) # ✅ 批量查询所有股票的分时图数据(只查询 1 次) minute_data = get_batch_minute_chart_data( client, stock_codes, start_datetime, end_datetime ) # 组装返回数据 stocks_data = [] for stock in related_stocks: # 从批量查询结果中获取数据(无需再次查询) price_info = prices_data.get(stock.stock_code, {}) chart_data = minute_data.get(stock.stock_code, []) # 获取股票基本信息(这里可以考虑也批量查询) stock_info = StockBasicInfo.query.filter_by(SECCODE=stock.stock_code).first() if not stock_info: base_code = stock.stock_code.split('.')[0] stock_info = StockBasicInfo.query.filter_by(SECCODE=base_code).first() stock_data = { 'id': stock.id, 'stock_code': stock.stock_code, 'stock_name': stock.stock_name, 'sector': stock.sector, 'relation_desc': stock.relation_desc, 'correlation': stock.correlation, 'momentum': stock.momentum, 'listing_date': stock_info.F006D.isoformat() if stock_info and stock_info.F006D else None, 'market': stock_info.F005V if stock_info else None, # 交易数据(从批量查询结果获取) 'trade_data': { 'latest_price': price_info.get('last_price'), 'first_price': price_info.get('first_price'), 'open_price': price_info.get('open_price'), 'high_price': price_info.get('high_price'), 'low_price': price_info.get('low_price'), 'change_amount': round(price_info['change_amount'], 2) if price_info.get('change_amount') else None, 'change_pct': round(price_info['change_pct'], 2) if price_info.get('change_pct') else None, 'volume': price_info.get('volume'), 'amount': price_info.get('amount'), 'trade_date': trading_day.isoformat(), }, # 分时图数据 'minute_chart': chart_data } stocks_data.append(stock_data) return jsonify({ 'code': 200, 'message': 'success', 'data': { 'event_id': event_id, 'event_title': event.title, 'related_stocks': stocks_data, 'total_count': len(stocks_data) } }) except Exception as e: print(f"Error in api_event_related_stocks_optimized: {e}") return jsonify({'code': 500, 'message': str(e)}), 500 # ============================================================================ # 方案 2: 异步并行查询(适用于无法批量查询的场景) # ============================================================================ import asyncio from concurrent.futures import ThreadPoolExecutor def get_stock_price_async(client, stock_code, start_datetime, end_datetime): """单个股票的查询函数(线程安全)""" # 与原代码相同的查询逻辑 try: data = client.execute(""" WITH first_price AS ( SELECT close FROM stock_minute WHERE code = %(code)s ... ) ... """, {'code': stock_code, 'start': start_datetime, 'end': end_datetime}) return stock_code, data except Exception as e: return stock_code, None def get_all_stocks_parallel(client, stock_codes, start_datetime, end_datetime): """ 并行查询多只股票(使用线程池) 性能对比: - 串行:10 只股票 * 0.1 秒 = 1 秒 - 并行:max(0.1 秒) = 0.1 秒(10 倍提速) """ with ThreadPoolExecutor(max_workers=10) as executor: # 提交所有查询任务 futures = [ executor.submit(get_stock_price_async, client, code, start_datetime, end_datetime) for code in stock_codes ] # 等待所有任务完成 results = {} for future in futures: stock_code, data = future.result() results[stock_code] = data return results # ============================================================================ # 方案 3: 添加缓存层(Redis) # ============================================================================ import redis import json redis_client = redis.Redis(host='localhost', port=6379, db=0) def get_stock_price_with_cache(client, stock_code, start_datetime, end_datetime): """ 带缓存的查询(适用于历史数据) 缓存策略: - 历史数据(非当日):缓存 24 小时 - 当日数据:缓存 1 分钟 """ from datetime import datetime # 生成缓存键 cache_key = f"stock_price:{stock_code}:{start_datetime.date()}:{end_datetime.date()}" # 尝试从缓存获取 cached_data = redis_client.get(cache_key) if cached_data: print(f"从缓存获取 {stock_code} 数据") return json.loads(cached_data) # 缓存未命中,查询数据库 print(f"从 ClickHouse 查询 {stock_code} 数据") data = client.execute("""...""", { 'code': stock_code, 'start': start_datetime, 'end': end_datetime }) # 格式化数据 result = { 'first_price': float(data[0][2]) if data else None, 'last_price': float(data[0][0]) if data else None, # ... } # 写入缓存 is_today = start_datetime.date() == datetime.now().date() ttl = 60 if is_today else 86400 # 当日数据缓存 1 分钟,历史数据缓存 24 小时 redis_client.setex(cache_key, ttl, json.dumps(result)) return result # ============================================================================ # 方案 4: ClickHouse 查询优化(索引提示) # ============================================================================ def get_stock_price_with_hints(client, stock_code, start_datetime, end_datetime): """ 使用 ClickHouse 特性优化查询 优化点: 1. PREWHERE 子句(提前过滤,减少数据扫描) 2. FINAL 修饰符(如果使用了 ReplacingMergeTree) 3. 分区裁剪(如果表按日期分区) """ query = """ SELECT code, anyLast(close) as last_price, any(close) as first_price, (last_price - first_price) / first_price * 100 as change_pct FROM stock_minute PREWHERE code = %(code)s -- 使用 PREWHERE 提前过滤(比 WHERE 快) WHERE timestamp >= %(start)s AND timestamp <= %(end)s GROUP BY code SETTINGS max_threads = 2 -- 限制线程数(避免资源竞争) """ data = client.execute(query, { 'code': stock_code, 'start': start_datetime, 'end': end_datetime }) return data # ============================================================================ # 数据库层面优化建议 # ============================================================================ """ 1. 确保 stock_minute 表有以下索引: - PRIMARY KEY (code, timestamp) -- 主键索引 - INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 3 -- 时间索引 2. 表分区策略(如果数据量大): CREATE TABLE stock_minute ( code String, timestamp DateTime, ... ) ENGINE = MergeTree() PARTITION BY toYYYYMM(timestamp) -- 按月分区 ORDER BY (code, timestamp) SETTINGS index_granularity = 8192; 3. 使用物化视图预计算(适用于固定查询模式): CREATE MATERIALIZED VIEW stock_minute_summary ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMMDD(timestamp) ORDER BY (code, timestamp) AS SELECT code, toStartOfMinute(timestamp) as minute, anyLast(close) as last_close, any(close) as first_close, ... FROM stock_minute GROUP BY code, minute; 4. 检查表统计信息: SELECT table, partition, rows, bytes_on_disk FROM system.parts WHERE table = 'stock_minute'; """ # ============================================================================ # 性能对比测试 # ============================================================================ def benchmark_query_methods(): """ 性能对比测试 测试场景:查询 10 只股票的价格数据 预期结果: - 原方案(串行 N+1):~1000ms - 方案 1(批量查询):~50ms(20 倍提速) - 方案 2(并行查询):~200ms(5 倍提速) - 方案 3(带缓存):~10ms(100 倍提速,第二次请求) """ import time stock_codes = ['600519.SH', '601088.SH', '600276.SH', '000001.SZ', ...] # 测试方案 1:批量查询 start = time.time() result1 = get_batch_stock_prices_optimized(client, stock_codes, start_dt, end_dt) print(f"批量查询耗时: {(time.time() - start) * 1000:.2f}ms") # 测试方案 2:并行查询 start = time.time() result2 = get_all_stocks_parallel(client, stock_codes, start_dt, end_dt) print(f"并行查询耗时: {(time.time() - start) * 1000:.2f}ms") # 测试原方案(串行) start = time.time() result3 = {} for code in stock_codes: result3[code] = get_stock_price_original(client, code, start_dt, end_dt) print(f"串行查询耗时: {(time.time() - start) * 1000:.2f}ms") # ============================================================================ # 总结与建议 # ============================================================================ """ 推荐实施顺序: 第一步(立即实施):方案 1 - 批量查询 - 实现难度:中等 - 性能提升:80-90% - 风险:低 - 时间:1-2 小时 第二步(可选):方案 3 - 添加缓存 - 实现难度:简单 - 性能提升:额外 20-40% - 风险:低 - 时间:30 分钟 第三步(长期):方案 4 - 数据库优化 - 实现难度:中等 - 性能提升:20-30% - 风险:中(需要测试) - 时间:2-4 小时 监控指标: - 查询时间:目标 < 200ms(当前 > 1000ms) - ClickHouse 查询次数:目标 1-2 次(当前 30+ 次) - 缓存命中率:目标 > 80%(如果使用缓存) """