diff --git a/app.py b/app.py index 022ca203..313dfafb 100755 --- a/app.py +++ b/app.py @@ -371,6 +371,199 @@ def wechat_session_exists(state): print(f"❌ Redis 检查 wechat session 失败: {e}") return False # ============ 微信登录 Session 管理结束 ============ + +# ============ 股票数据 Redis 缓存(股票名称 + 前收盘价) ============ +STOCK_NAME_PREFIX = "vf:stock:name:" # 股票名称缓存前缀 +STOCK_NAME_EXPIRE = 86400 # 股票名称缓存24小时 +PREV_CLOSE_PREFIX = "vf:stock:prev_close:" # 前收盘价缓存前缀 +PREV_CLOSE_EXPIRE = 86400 # 前收盘价缓存24小时(当日有效) + + +def get_cached_stock_names(base_codes): + """ + 批量获取股票名称(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :return: dict {code: name} + """ + if not base_codes: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取 + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{STOCK_NAME_PREFIX}{code}") + cached_values = pipe.execute() + + for code, cached_name in zip(base_codes, cached_values): + if cached_name: + result[code] = cached_name + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取股票名称失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的股票名称 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + db_result = conn.execute(text( + f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" + ), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, name = row[0], row[1] + result[code] = name + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存股票名称失败: {e}") + except Exception as e: + print(f"❌ 数据库查询股票名称失败: {e}") + + return result + + +def get_cached_prev_close(base_codes, trade_date_str): + """ + 批量获取前收盘价(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :param trade_date_str: 交易日期字符串(格式 YYYYMMDD) + :return: dict {code: close_price} + """ + if not base_codes or not trade_date_str: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取(缓存键包含日期,确保不会跨日混用) + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}") + cached_values = pipe.execute() + + for code, cached_price in zip(base_codes, cached_values): + if cached_price: + result[code] = float(cached_price) + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取前收盘价失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的前收盘价 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + params['trade_date'] = trade_date_str + db_result = conn.execute(text(f""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE SECCODE IN ({placeholders}) + AND TRADEDATE = :trade_date + AND F007N > 0 + """), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, close_price = row[0], float(row[1]) if row[1] else None + if close_price: + result[code] = close_price + pipe.setex(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存前收盘价失败: {e}") + except Exception as e: + print(f"❌ 数据库查询前收盘价失败: {e}") + + return result + + +def preload_stock_cache(): + """ + 预热股票缓存(定时任务,每天 9:25 执行) + - 批量加载所有股票名称 + - 批量加载前一交易日收盘价 + """ + from datetime import datetime, timedelta + print(f"[缓存预热] 开始预热股票缓存... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + try: + # 1. 预热股票名称(全量加载) + with engine.connect() as conn: + result = conn.execute(text("SELECT SECCODE, SECNAME FROM ea_stocklist")).fetchall() + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, name = row[0], row[1] + if code and name: + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + count += 1 + pipe.execute() + print(f"[缓存预热] 股票名称: {count} 条已加载到 Redis") + + # 2. 预热前收盘价(获取前一交易日) + # 使用全局 trading_days 获取前一交易日 + today = datetime.now().date() + today_str = today.strftime('%Y-%m-%d') + + prev_trading_day = None + if 'trading_days' in globals() and trading_days: + # 找到今天之前最近的交易日 + for td in reversed(trading_days): + if td < today_str: + prev_trading_day = td + break + + if prev_trading_day: + prev_date_str = prev_trading_day.replace('-', '') # YYYYMMDD 格式 + with engine.connect() as conn: + result = conn.execute(text(""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE TRADEDATE = :trade_date AND F007N > 0 + """), {'trade_date': prev_date_str}).fetchall() + + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, close_price = row[0], row[1] + if code and close_price: + pipe.setex(f"{PREV_CLOSE_PREFIX}{prev_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + count += 1 + pipe.execute() + print(f"[缓存预热] 前收盘价({prev_trading_day}): {count} 条已加载到 Redis") + else: + print(f"[缓存预热] 未找到前一交易日,跳过前收盘价预热") + + print(f"[缓存预热] 预热完成 ✅ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + except Exception as e: + print(f"[缓存预热] 预热失败 ❌: {e}") + import traceback + traceback.print_exc() + + +print(f"📦 股票缓存: Redis, 名称过期 {STOCK_NAME_EXPIRE}秒, 收盘价过期 {PREV_CLOSE_EXPIRE}秒") +# ============ 股票数据 Redis 缓存结束 ============ + # 腾讯云短信配置 SMS_SECRET_ID = 'AKID2we9TacdTAhCjCSYTErHVimeJo9Yr00s' SMS_SECRET_KEY = 'pMlBWijlkgT9fz5ziEXdWEnAPTJzRfkf' @@ -7310,21 +7503,9 @@ def get_stock_quotes(): current_time = datetime.now() - # ==================== 查询股票名称(直接查 MySQL) ==================== - stock_names = {} + # ==================== 查询股票名称(使用 Redis 缓存) ==================== base_codes = list(set([code.split('.')[0] for code in codes])) - - if base_codes: - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - - for row in result: - base_code, name = row[0], row[1] - stock_names[base_code] = name + stock_names = get_cached_stock_names(base_codes) # 构建完整的名称映射 full_stock_names = {} @@ -7355,34 +7536,17 @@ def get_stock_quotes(): # 初始化 ClickHouse 客户端 client = get_clickhouse_client() - # ==================== 查询前一交易日收盘价(直接查 MySQL) ==================== + # ==================== 查询前一交易日收盘价(使用 Redis 缓存) ==================== try: prev_close_map = {} if prev_trading_day: # ea_trade 表的 TRADEDATE 格式是 YYYYMMDD(无连字符) prev_day_str = prev_trading_day.strftime('%Y%m%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day).replace('-', '') base_codes = list(set([code.split('.')[0] for code in codes])) - base_close_map = {} - # 直接从 MySQL 批量查询 - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_day_str - - prev_close_result = conn.execute(text(f""" - SELECT SECCODE, F007N as close_price - FROM ea_trade - WHERE SECCODE IN ({placeholders}) - AND TRADEDATE = :trade_date - """), params).fetchall() - - for row in prev_close_result: - base_code, close_price = row[0], row[1] - close_val = float(close_price) if close_price else None - base_close_map[base_code] = close_val - - print(f"前一交易日({prev_day_str})收盘价: 查询到 {len(prev_close_result)} 条") + # 使用 Redis 缓存获取前收盘价 + base_close_map = get_cached_prev_close(base_codes, prev_day_str) + print(f"前一交易日({prev_day_str})收盘价: 获取到 {len(base_close_map)} 条(Redis缓存)") # 为每个标准化代码分配收盘价 for norm_code in normalized_codes: @@ -8142,18 +8306,9 @@ def get_batch_kline_data(): client = get_clickhouse_client() - # 批量获取股票名称 - stock_names = {} - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - for row in result: - stock_names[row[0]] = row[1] + # 批量获取股票名称(使用 Redis 缓存) + base_codes = list(set([code.split('.')[0] for code in codes])) + stock_names = get_cached_stock_names(base_codes) # 确定目标交易日和涨跌幅基准日(处理跨周末场景) # - 周五15:00后到周一15:00前,分时图显示周一行情,涨跌幅基于周五收盘价 @@ -8172,24 +8327,14 @@ def get_batch_kline_data(): results = {} if chart_type == 'timeline': - # 批量获取前收盘价(从 MySQL ea_trade 表) + # 批量获取前收盘价(使用 Redis 缓存) # 使用 prev_trading_day 作为基准日期(处理跨周末场景) prev_close_map = {} if prev_trading_day: prev_date_str = prev_trading_day.strftime('%Y%m%d') - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_date_str - result = conn.execute(text(f""" - SELECT SECCODE, F007N FROM ea_trade - WHERE SECCODE IN ({placeholders}) AND TRADEDATE = :trade_date AND F007N > 0 - """), params).fetchall() - for row in result: - prev_close_map[row[0]] = float(row[1]) - print(f"分时图基准日期: {prev_trading_day}, 查询到 {len(prev_close_map)} 条前收盘价") + base_codes = list(set([code.split('.')[0] for code in codes])) + prev_close_map = get_cached_prev_close(base_codes, prev_date_str) + print(f"分时图基准日期: {prev_trading_day}, 获取到 {len(prev_close_map)} 条前收盘价(Redis缓存)") # 批量查询分时数据(使用标准化代码查询 ClickHouse) batch_data = client.execute(""" @@ -11893,6 +12038,18 @@ def initialize_event_polling(): name='检查新事件并推送', replace_existing=True ) + + # 每天 9:25 预热股票缓存(开盘前 5 分钟) + from apscheduler.triggers.cron import CronTrigger + scheduler.add_job( + func=preload_stock_cache, + trigger=CronTrigger(hour=9, minute=25), + id='preload_stock_cache', + name='预热股票缓存(股票名称+前收盘价)', + replace_existing=True + ) + print(f'[缓存] 已添加定时任务: 每天 9:25 预热股票缓存') + scheduler.start() print(f'[轮询] APScheduler 调度器已启动 (PID: {os.getpid()}),每 30 秒检查一次新事件') diff --git a/get_related_chg.py b/get_related_chg.py index 275acc39..58813cbe 100644 --- a/get_related_chg.py +++ b/get_related_chg.py @@ -245,10 +245,17 @@ def update_event_statistics(start_date=None, end_date=None, force_update=False, debug_mode: 是否开启调试模式 """ try: + print("[DEBUG] 开始 update_event_statistics") + print(f"[DEBUG] 参数: start_date={start_date}, end_date={end_date}, force_update={force_update}") + mysql_engine = get_mysql_engine() + print("[DEBUG] MySQL 引擎创建成功") + ch_client = get_clickhouse_client() + print("[DEBUG] ClickHouse 客户端创建成功") with mysql_engine.connect() as mysql_conn: + print("[DEBUG] MySQL 连接已建立") # 构建SQL查询 query = """ SELECT e.id, \ @@ -284,8 +291,12 @@ def update_event_statistics(start_date=None, end_date=None, force_update=False, ORDER BY e.created_at DESC """ + print(f"[DEBUG] 执行查询SQL:\n{query}") + print(f"[DEBUG] 查询参数: {params}") + events = mysql_conn.execute(text(query), params).fetchall() + print(f"[DEBUG] 查询返回 {len(events)} 条事件记录") print(f"Found {len(events)} events to update (force_update={force_update})") if debug_mode and len(events) > 0: print(f"Date range: {events[-1][1]} to {events[0][1]}") @@ -324,6 +335,11 @@ def update_event_statistics(start_date=None, end_date=None, force_update=False, "week_chg": week_change, "event_id": event_id }) + if idx <= 5: # 前5条显示详情 + print(f"[DEBUG] 事件 {event_id}: avg={avg_change}, max={max_change}, week={week_change}") + else: + if idx <= 5: + print(f"[DEBUG] 事件 {event_id}: 计算结果全为None,跳过") # 每处理10个事件打印一次进度 if idx % 10 == 0: @@ -337,16 +353,29 @@ def update_event_statistics(start_date=None, end_date=None, force_update=False, continue # 批量更新MySQL + print(f"\n[DEBUG] ====== 准备写入数据库 ======") + print(f"[DEBUG] update_data 长度: {len(update_data)}") if update_data: - mysql_conn.execute(text(""" + print(f"[DEBUG] 前3条待更新数据: {update_data[:3]}") + print(f"[DEBUG] 执行 UPDATE 语句...") + + result = mysql_conn.execute(text(""" UPDATE event SET related_avg_chg = :avg_chg, related_max_chg = :max_chg, related_week_chg = :week_chg WHERE id = :event_id """), update_data) - # SQLAlchemy 1.4 的 connect() 会在上下文管理器退出时自动提交 + print(f"[DEBUG] UPDATE 执行完成, rowcount={result.rowcount}") + + # 关键:显式提交事务!SQLAlchemy 2.0 需要手动 commit + print("[DEBUG] 准备提交事务 (commit)...") + mysql_conn.commit() + print("[DEBUG] 事务已提交!") + print(f"Successfully updated {len(update_data)} events") + else: + print("[DEBUG] update_data 为空,没有数据需要更新!") except Exception as e: print(f"Error in update_event_statistics: {str(e)}")