diff --git a/__pycache__/app.cpython-310.pyc b/__pycache__/app.cpython-310.pyc index 88dd3f9a..4f57ded3 100644 Binary files a/__pycache__/app.cpython-310.pyc and b/__pycache__/app.cpython-310.pyc differ diff --git a/app.py b/app.py index f1c91b38..954b5c66 100755 --- a/app.py +++ b/app.py @@ -5813,18 +5813,29 @@ def get_stock_quotes(): current_time = datetime.now() client = get_clickhouse_client() - # Get stock names from MySQL + # Get stock names from MySQL(批量查询优化) stock_names = {} with engine.connect() as conn: - for code in codes: - codez = code.split('.')[0] + # 提取不带后缀的股票代码 + base_codes = list(set([code.split('.')[0] for code in codes])) + if base_codes: + # 批量查询所有股票名称 + placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) + params = {f'code{i}': code for i, code in enumerate(base_codes)} result = conn.execute(text( - "SELECT SECNAME FROM ea_stocklist WHERE SECCODE = :code" - ), {"code": codez}).fetchone() - if result: - stock_names[code] = result[0] - else: - stock_names[code] = f"股票{codez}" + f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" + ), params).fetchall() + + # 构建代码到名称的映射 + base_name_map = {row[0]: row[1] for row in result} + + # 为每个完整代码(带后缀)分配名称 + for code in codes: + base_code = code.split('.')[0] + if base_code in base_name_map: + stock_names[code] = base_name_map[base_code] + else: + stock_names[code] = f"股票{base_code}" def get_trading_day_and_times(event_datetime): event_date = event_datetime.date() @@ -5896,65 +5907,111 @@ def get_stock_quotes(): }) results = {} - print(f"处理股票代码: {codes}, 交易日: {trading_day}, 时间范围: {start_datetime} - {end_datetime}") + print(f"批量处理 {len(codes)} 只股票: {codes[:5]}{'...' if len(codes) > 5 else ''}, 交易日: {trading_day}, 时间范围: {start_datetime} - {end_datetime}") - for code in codes: - try: - print(f"正在查询股票 {code} 的价格数据...") - # Get the first price and last price for the trading period - data = client.execute(""" - WITH first_price AS (SELECT close - FROM stock_minute - WHERE code = %(code)s - AND timestamp >= %(start)s - AND timestamp <= %(end)s - ORDER BY timestamp - LIMIT 1 - ), - last_price AS ( - SELECT close - FROM stock_minute - WHERE code = %(code)s - AND timestamp >= %(start)s - AND timestamp <= %(end)s - ORDER BY timestamp DESC - LIMIT 1 - ) - SELECT last_price.close as last_price, - (last_price.close - first_price.close) / first_price.close * 100 as change - FROM last_price - CROSS JOIN first_price - WHERE EXISTS (SELECT 1 FROM first_price) - AND EXISTS (SELECT 1 FROM last_price) - """, { - 'code': code, - 'start': start_datetime, - 'end': end_datetime - }) + # ==================== 性能优化:批量查询所有股票数据 ==================== + # 使用 IN 子句一次查询所有股票,避免逐只循环查询 + try: + # 批量查询价格和涨跌幅数据(使用窗口函数) + batch_price_query = """ + WITH first_prices AS ( + SELECT + code, + close as first_price, + ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp ASC) as rn + FROM stock_minute + WHERE code IN %(codes)s + AND timestamp >= %(start)s + AND timestamp <= %(end)s + ), + last_prices AS ( + SELECT + code, + close as last_price, + ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp DESC) as rn + FROM stock_minute + WHERE code IN %(codes)s + AND timestamp >= %(start)s + AND timestamp <= %(end)s + ) + SELECT + fp.code, + lp.last_price, + (lp.last_price - fp.first_price) / fp.first_price * 100 as change_pct + FROM first_prices fp + INNER JOIN last_prices lp ON fp.code = lp.code + WHERE fp.rn = 1 AND lp.rn = 1 + """ - print(f"股票 {code} 查询结果: {data}") - if data and data[0] and data[0][0] is not None: - price = float(data[0][0]) if data[0][0] is not None else None - change = float(data[0][1]) if data[0][1] is not None else None + batch_data = client.execute(batch_price_query, { + 'codes': codes, + 'start': start_datetime, + 'end': end_datetime + }) + print(f"批量查询返回 {len(batch_data)} 条价格数据") + + # 解析批量查询结果 + price_data_map = {} + for row in batch_data: + code = row[0] + last_price = float(row[1]) if row[1] is not None else None + change_pct = float(row[2]) if row[2] is not None else None + price_data_map[code] = { + 'price': last_price, + 'change': change_pct + } + + # 组装结果(所有股票) + for code in codes: + price_info = price_data_map.get(code) + if price_info: results[code] = { - 'price': price, - 'change': change, + 'price': price_info['price'], + 'change': price_info['change'], 'name': stock_names.get(code, f'股票{code.split(".")[0]}') } else: + # 批量查询没有返回的股票 results[code] = { 'price': None, 'change': None, 'name': stock_names.get(code, f'股票{code.split(".")[0]}') } - except Exception as e: - print(f"Error processing stock {code}: {e}") - results[code] = { - 'price': None, - 'change': None, - 'name': stock_names.get(code, f'股票{code.split(".")[0]}') - } + + except Exception as e: + print(f"批量查询 ClickHouse 失败: {e},回退到逐只查询") + # 降级方案:逐只股票查询(保持向后兼容) + for code in codes: + try: + data = client.execute(""" + WITH first_price AS ( + SELECT close FROM stock_minute + WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s + ORDER BY timestamp LIMIT 1 + ), + last_price AS ( + SELECT close FROM stock_minute + WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s + ORDER BY timestamp DESC LIMIT 1 + ) + SELECT last_price.close as last_price, + (last_price.close - first_price.close) / first_price.close * 100 as change + FROM last_price CROSS JOIN first_price + WHERE EXISTS (SELECT 1 FROM first_price) AND EXISTS (SELECT 1 FROM last_price) + """, {'code': code, 'start': start_datetime, 'end': end_datetime}) + + if data and data[0] and data[0][0] is not None: + results[code] = { + 'price': float(data[0][0]) if data[0][0] is not None else None, + 'change': float(data[0][1]) if data[0][1] is not None else None, + 'name': stock_names.get(code, f'股票{code.split(".")[0]}') + } + else: + results[code] = {'price': None, 'change': None, 'name': stock_names.get(code, f'股票{code.split(".")[0]}')} + except Exception as inner_e: + print(f"Error processing stock {code}: {inner_e}") + results[code] = {'price': None, 'change': None, 'name': stock_names.get(code, f'股票{code.split(".")[0]}')} # 返回标准格式 return jsonify({'success': True, 'data': results})