From c7033481eed5f4bcb1395568db456fd59b985968 Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Thu, 18 Dec 2025 21:54:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0Company=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E7=9A=84UI=E4=B8=BAFUI=E9=A3=8E=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 284 ++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 223 insertions(+), 61 deletions(-) diff --git a/app.py b/app.py index d180e949..7b360dbb 100755 --- a/app.py +++ b/app.py @@ -372,6 +372,197 @@ def wechat_session_exists(state): print(f"❌ Redis 检查 wechat session 失败: {e}") return False # ============ 微信登录 Session 管理结束 ============ + +# ============ 股票数据 Redis 缓存(股票名称 + 前收盘价) ============ +STOCK_NAME_PREFIX = "vf:stock:name:" # 股票名称缓存前缀 +STOCK_NAME_EXPIRE = 86400 # 股票名称缓存24小时 +PREV_CLOSE_PREFIX = "vf:stock:prev_close:" # 前收盘价缓存前缀 +PREV_CLOSE_EXPIRE = 86400 # 前收盘价缓存24小时(当日有效) + + +def get_cached_stock_names(base_codes): + """ + 批量获取股票名称(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :return: dict {code: name} + """ + if not base_codes: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取 + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{STOCK_NAME_PREFIX}{code}") + cached_values = pipe.execute() + + for code, cached_name in zip(base_codes, cached_values): + if cached_name: + result[code] = cached_name + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取股票名称失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的股票名称 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + db_result = conn.execute(text( + f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" + ), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, name = row[0], row[1] + result[code] = name + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存股票名称失败: {e}") + except Exception as e: + print(f"❌ 数据库查询股票名称失败: {e}") + + return result + + +def get_cached_prev_close(base_codes, trade_date_str): + """ + 批量获取前收盘价(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :param trade_date_str: 交易日期字符串(格式 YYYYMMDD) + :return: dict {code: close_price} + """ + if not base_codes or not trade_date_str: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取(缓存键包含日期,确保不会跨日混用) + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}") + cached_values = pipe.execute() + + for code, cached_price in zip(base_codes, cached_values): + if cached_price: + result[code] = float(cached_price) + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取前收盘价失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的前收盘价 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + params['trade_date'] = trade_date_str + db_result = conn.execute(text(f""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE SECCODE IN ({placeholders}) + AND TRADEDATE = :trade_date + AND F007N > 0 + """), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, close_price = row[0], float(row[1]) if row[1] else None + if close_price: + result[code] = close_price + pipe.setex(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存前收盘价失败: {e}") + except Exception as e: + print(f"❌ 数据库查询前收盘价失败: {e}") + + return result + + +def preload_stock_cache(): + """ + 预热股票缓存(定时任务,每天 9:25 执行) + - 批量加载所有股票名称 + - 批量加载前一交易日收盘价 + """ + from datetime import datetime, timedelta + print(f"[缓存预热] 开始预热股票缓存... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + try: + # 1. 预热股票名称(全量加载) + with engine.connect() as conn: + result = conn.execute(text("SELECT SECCODE, SECNAME FROM ea_stocklist")).fetchall() + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, name = row[0], row[1] + if code and name: + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + count += 1 + pipe.execute() + print(f"[缓存预热] 股票名称: {count} 条已加载到 Redis") + + # 2. 预热前收盘价(获取前一交易日) + today = datetime.now().date() + today_str = today.strftime('%Y-%m-%d') + + prev_trading_day = None + if 'trading_days' in globals() and trading_days: + for td in reversed(trading_days): + if td < today_str: + prev_trading_day = td + break + + if prev_trading_day: + prev_date_str = prev_trading_day.replace('-', '') # YYYYMMDD 格式 + with engine.connect() as conn: + result = conn.execute(text(""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE TRADEDATE = :trade_date AND F007N > 0 + """), {'trade_date': prev_date_str}).fetchall() + + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, close_price = row[0], row[1] + if code and close_price: + pipe.setex(f"{PREV_CLOSE_PREFIX}{prev_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + count += 1 + pipe.execute() + print(f"[缓存预热] 前收盘价({prev_trading_day}): {count} 条已加载到 Redis") + else: + print(f"[缓存预热] 未找到前一交易日,跳过前收盘价预热") + + print(f"[缓存预热] 预热完成 ✅ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + except Exception as e: + print(f"[缓存预热] 预热失败 ❌: {e}") + import traceback + traceback.print_exc() + + +print(f"📦 股票缓存: Redis, 名称过期 {STOCK_NAME_EXPIRE}秒, 收盘价过期 {PREV_CLOSE_EXPIRE}秒") +# ============ 股票数据 Redis 缓存结束 ============ + # 腾讯云短信配置 SMS_SECRET_ID = 'AKID2we9TacdTAhCjCSYTErHVimeJo9Yr00s' SMS_SECRET_KEY = 'pMlBWijlkgT9fz5ziEXdWEnAPTJzRfkf' @@ -7273,21 +7464,9 @@ def get_stock_quotes(): current_time = datetime.now() - # ==================== 查询股票名称(直接查 MySQL) ==================== - stock_names = {} + # ==================== 查询股票名称(使用 Redis 缓存) ==================== base_codes = list(set([code.split('.')[0] for code in codes])) - - if base_codes: - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - - for row in result: - base_code, name = row[0], row[1] - stock_names[base_code] = name + stock_names = get_cached_stock_names(base_codes) # 构建完整的名称映射 full_stock_names = {} @@ -7318,34 +7497,17 @@ def get_stock_quotes(): # 初始化 ClickHouse 客户端 client = get_clickhouse_client() - # ==================== 查询前一交易日收盘价(直接查 MySQL) ==================== + # ==================== 查询前一交易日收盘价(使用 Redis 缓存) ==================== try: prev_close_map = {} if prev_trading_day: # ea_trade 表的 TRADEDATE 格式是 YYYYMMDD(无连字符) prev_day_str = prev_trading_day.strftime('%Y%m%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day).replace('-', '') base_codes = list(set([code.split('.')[0] for code in codes])) - base_close_map = {} - # 直接从 MySQL 批量查询 - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_day_str - - prev_close_result = conn.execute(text(f""" - SELECT SECCODE, F007N as close_price - FROM ea_trade - WHERE SECCODE IN ({placeholders}) - AND TRADEDATE = :trade_date - """), params).fetchall() - - for row in prev_close_result: - base_code, close_price = row[0], row[1] - close_val = float(close_price) if close_price else None - base_close_map[base_code] = close_val - - print(f"前一交易日({prev_day_str})收盘价: 查询到 {len(prev_close_result)} 条") + # 使用 Redis 缓存获取前收盘价 + base_close_map = get_cached_prev_close(base_codes, prev_day_str) + print(f"前一交易日({prev_day_str})收盘价: 获取到 {len(base_close_map)} 条(Redis缓存)") # 为每个标准化代码分配收盘价 for norm_code in normalized_codes: @@ -8112,18 +8274,9 @@ def get_batch_kline_data(): client = get_clickhouse_client() - # 批量获取股票名称 - stock_names = {} - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - for row in result: - stock_names[row[0]] = row[1] + # 批量获取股票名称(使用 Redis 缓存) + base_codes = list(set([code.split('.')[0] for code in codes])) + stock_names = get_cached_stock_names(base_codes) # 确定目标交易日和涨跌幅基准日(处理跨周末场景) # - 周五15:00后到周一15:00前,分时图显示周一行情,涨跌幅基于周五收盘价 @@ -8142,24 +8295,14 @@ def get_batch_kline_data(): results = {} if chart_type == 'timeline': - # 批量获取前收盘价(从 MySQL ea_trade 表) + # 批量获取前收盘价(使用 Redis 缓存) # 使用 prev_trading_day 作为基准日期(处理跨周末场景) prev_close_map = {} if prev_trading_day: prev_date_str = prev_trading_day.strftime('%Y%m%d') - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_date_str - result = conn.execute(text(f""" - SELECT SECCODE, F007N FROM ea_trade - WHERE SECCODE IN ({placeholders}) AND TRADEDATE = :trade_date AND F007N > 0 - """), params).fetchall() - for row in result: - prev_close_map[row[0]] = float(row[1]) - print(f"分时图基准日期: {prev_trading_day}, 查询到 {len(prev_close_map)} 条前收盘价") + base_codes = list(set([code.split('.')[0] for code in codes])) + prev_close_map = get_cached_prev_close(base_codes, prev_date_str) + print(f"分时图基准日期: {prev_trading_day}, 获取到 {len(prev_close_map)} 条前收盘价(Redis缓存)") # 批量查询分时数据(使用标准化代码查询 ClickHouse) batch_data = client.execute(""" @@ -11836,6 +11979,18 @@ def initialize_event_polling(): name='检查新事件并推送', replace_existing=True ) + + # 每天 9:25 预热股票缓存(开盘前 5 分钟) + from apscheduler.triggers.cron import CronTrigger + scheduler.add_job( + func=preload_stock_cache, + trigger=CronTrigger(hour=9, minute=25), + id='preload_stock_cache', + name='预热股票缓存(股票名称+前收盘价)', + replace_existing=True + ) + print(f'[缓存] 已添加定时任务: 每天 9:25 预热股票缓存') + scheduler.start() print(f'[轮询] APScheduler 调度器已启动 (PID: {os.getpid()}),每 30 秒检查一次新事件') @@ -18661,5 +18816,12 @@ if __name__ == '__main__': # 初始化事件轮询机制(WebSocket 推送) initialize_event_polling() + # 启动时预热股票缓存(股票名称 + 前收盘价) + print("[启动] 正在预热股票缓存...") + try: + preload_stock_cache() + except Exception as e: + print(f"[启动] 预热缓存失败(不影响服务启动): {e}") + # 使用 socketio.run 替代 app.run 以支持 WebSocket socketio.run(app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True) \ No newline at end of file