From 524781fd0144b909f372f4a8269192ecb34c8310 Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Sun, 14 Dec 2025 15:39:54 +0800 Subject: [PATCH] update pay ui --- app.py | 291 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 192 insertions(+), 99 deletions(-) diff --git a/app.py b/app.py index ece5f162..063ddcdb 100755 --- a/app.py +++ b/app.py @@ -7154,35 +7154,37 @@ def get_transmission_chain(event_id): # 修复股票报价API - 支持GET和POST方法 @app.route('/api/stock/quotes', methods=['GET', 'POST']) def get_stock_quotes(): + """ + 获取股票行情数据(优化版:使用 Redis 缓存) + 缓存策略: + - 股票名称:缓存 24 小时 + - 交易日数据:缓存 1 小时 + - 前一交易日收盘价:盘中缓存到收盘 + - 实时价格:缓存 3 秒(避免重复查询) + """ try: if request.method == 'GET': - # GET 请求从查询参数获取数据 codes_str = request.args.get('codes', '') codes = [code.strip() for code in codes_str.split(',') if code.strip()] event_time_str = request.args.get('event_time') else: - # POST 请求从 JSON 获取数据 codes = request.json.get('codes', []) event_time_str = request.json.get('event_time') if not codes: return jsonify({'success': False, 'error': '请提供股票代码'}), 400 - # 标准化股票代码(确保带后缀,用于 ClickHouse 查询) + # 标准化股票代码 def normalize_stock_code(code): - """将股票代码标准化为带后缀格式(如 300274.SZ)""" if '.' in code: - return code # 已经带后缀 - # 根据代码规则添加后缀:6/0/3开头为深圳,其他为上海 + return code if code.startswith(('6',)): return f"{code}.SH" else: return f"{code}.SZ" - # 保留原始代码用于返回结果,同时创建标准化代码用于 ClickHouse 查询 original_codes = codes normalized_codes = [normalize_stock_code(code) for code in codes] - # 创建原始代码到标准化代码的映射 code_mapping = dict(zip(original_codes, normalized_codes)) # 处理事件时间 @@ -7195,77 +7197,143 @@ def get_stock_quotes(): event_time = datetime.now() current_time = datetime.now() - client = get_clickhouse_client() - # Get stock names from MySQL(批量查询优化) + # ==================== 优化1: 缓存股票名称(24小时)使用 MGET 批量获取 ==================== stock_names = {} - with engine.connect() as conn: - # 提取不带后缀的股票代码 - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - # 批量查询所有股票名称 - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} + base_codes = list(set([code.split('.')[0] for code in codes])) + uncached_base_codes = [] + + # 使用 MGET 批量获取已缓存的股票名称(减少网络往返) + try: + cache_keys = [f"stock:name:{code}" for code in base_codes] + cached_values = redis_client.mget(cache_keys) + for i, base_code in enumerate(base_codes): + if cached_values[i]: + stock_names[base_code] = cached_values[i] + else: + uncached_base_codes.append(base_code) + except: + uncached_base_codes = base_codes + + # 只查询未缓存的股票名称 + if uncached_base_codes: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(uncached_base_codes))]) + params = {f'code{i}': code for i, code in enumerate(uncached_base_codes)} result = conn.execute(text( f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" ), params).fetchall() - # 构建代码到名称的映射 - base_name_map = {row[0]: row[1] for row in result} + for row in result: + base_code, name = row[0], row[1] + stock_names[base_code] = name + # 缓存到 Redis(24小时) + try: + redis_client.setex(f"stock:name:{base_code}", 86400, name) + except: + pass - # 为原始代码和标准化代码都分配名称 - for orig_code, norm_code in code_mapping.items(): - base_code = orig_code.split('.')[0] - name = base_name_map.get(base_code, f"股票{base_code}") - stock_names[orig_code] = name - stock_names[norm_code] = name + # 构建完整的名称映射 + full_stock_names = {} + for orig_code, norm_code in code_mapping.items(): + base_code = orig_code.split('.')[0] + name = stock_names.get(base_code, f"股票{base_code}") + full_stock_names[orig_code] = name + full_stock_names[norm_code] = name + + # ==================== 优化2: 缓存交易日判断(1小时) ==================== + def get_trading_day_cached(date_str): + """检查是否为交易日(带缓存)""" + cache_key = f"trading:is_day:{date_str}" + try: + cached = redis_client.get(cache_key) + if cached is not None: + return cached == '1' + except: + pass + + with engine.connect() as conn: + is_trading = conn.execute(text( + "SELECT 1 FROM trading_days WHERE EXCHANGE_DATE = :date" + ), {"date": date_str}).fetchone() is not None + + try: + redis_client.setex(cache_key, 3600, '1' if is_trading else '0') + except: + pass + return is_trading + + def get_next_trading_day_cached(date_str): + """获取下一个交易日(带缓存)""" + cache_key = f"trading:next:{date_str}" + try: + cached = redis_client.get(cache_key) + if cached: + return datetime.strptime(cached, '%Y-%m-%d').date() + except: + pass + + with engine.connect() as conn: + result = conn.execute(text(""" + SELECT EXCHANGE_DATE FROM trading_days + WHERE EXCHANGE_DATE > :date ORDER BY EXCHANGE_DATE LIMIT 1 + """), {"date": date_str}).fetchone() + + if result: + next_day = result[0].date() if hasattr(result[0], 'date') else result[0] + try: + redis_client.setex(cache_key, 3600, next_day.strftime('%Y-%m-%d')) + except: + pass + return next_day + return None + + def get_prev_trading_day_cached(date_str): + """获取前一个交易日(带缓存)""" + cache_key = f"trading:prev:{date_str}" + try: + cached = redis_client.get(cache_key) + if cached: + return datetime.strptime(cached, '%Y-%m-%d').date() + except: + pass + + with engine.connect() as conn: + result = conn.execute(text(""" + SELECT EXCHANGE_DATE FROM trading_days + WHERE EXCHANGE_DATE < :date ORDER BY EXCHANGE_DATE DESC LIMIT 1 + """), {"date": date_str}).fetchone() + + if result: + prev_day = result[0].date() if hasattr(result[0], 'date') else result[0] + try: + redis_client.setex(cache_key, 3600, prev_day.strftime('%Y-%m-%d')) + except: + pass + return prev_day + return None def get_trading_day_and_times(event_datetime): event_date = event_datetime.date() - event_time = event_datetime.time() + event_time_val = event_datetime.time() + date_str = event_date.strftime('%Y-%m-%d') - # Trading hours market_open = dt_time(9, 30) market_close = dt_time(15, 0) - with engine.connect() as conn: - # First check if the event date itself is a trading day - is_trading_day = conn.execute(text(""" - SELECT 1 - FROM trading_days - WHERE EXCHANGE_DATE = :date - """), {"date": event_date}).fetchone() is not None + is_trading_day = get_trading_day_cached(date_str) - if is_trading_day: - # If it's a trading day, determine time period based on event time - if event_time < market_open: - # Before market opens - use full trading day - return event_date, market_open, market_close - elif event_time > market_close: - # After market closes - get next trading day - next_trading_day = conn.execute(text(""" - SELECT EXCHANGE_DATE - FROM trading_days - WHERE EXCHANGE_DATE > :date - ORDER BY EXCHANGE_DATE LIMIT 1 - """), {"date": event_date}).fetchone() - # Convert to date object if we found a next trading day - return (next_trading_day[0].date() if next_trading_day else None, - market_open, market_close) - else: - # During trading hours - return event_date, event_time, market_close + if is_trading_day: + if event_time_val < market_open: + return event_date, market_open, market_close + elif event_time_val > market_close: + next_day = get_next_trading_day_cached(date_str) + return (next_day, market_open, market_close) else: - # If not a trading day, get next trading day - next_trading_day = conn.execute(text(""" - SELECT EXCHANGE_DATE - FROM trading_days - WHERE EXCHANGE_DATE > :date - ORDER BY EXCHANGE_DATE LIMIT 1 - """), {"date": event_date}).fetchone() - # Convert to date object if we found a next trading day - return (next_trading_day[0].date() if next_trading_day else None, - market_open, market_close) + return event_date, event_time_val, market_close + else: + next_day = get_next_trading_day_cached(date_str) + return (next_day, market_open, market_close) trading_day, start_time, end_time = get_trading_day_and_times(event_time) @@ -7273,27 +7341,14 @@ def get_stock_quotes(): return jsonify({ 'success': True, 'data': {code: {'name': name, 'price': None, 'change': None} - for code, name in stock_names.items()} + for code, name in full_stock_names.items()} }) - # For historical dates, ensure we're using actual data start_datetime = datetime.combine(trading_day, start_time) end_datetime = datetime.combine(trading_day, end_time) - # 获取前一个交易日(用于计算涨跌幅基准) - prev_trading_day = None - with engine.connect() as conn: - result = conn.execute(text(""" - SELECT EXCHANGE_DATE - FROM trading_days - WHERE EXCHANGE_DATE < :date - ORDER BY EXCHANGE_DATE DESC - LIMIT 1 - """), {"date": trading_day}).fetchone() - if result: - prev_trading_day = result[0].date() if hasattr(result[0], 'date') else result[0] - - print(f"当前交易日: {trading_day}, 前一交易日: {prev_trading_day}") + # 获取前一个交易日(使用缓存) + prev_trading_day = get_prev_trading_day_cached(trading_day.strftime('%Y-%m-%d')) # If the trading day is in the future relative to current time, # return only names without data @@ -7307,18 +7362,35 @@ def get_stock_quotes(): results = {} print(f"批量处理 {len(codes)} 只股票: {codes[:5]}{'...' if len(codes) > 5 else ''}, 交易日: {trading_day}, 时间范围: {start_datetime} - {end_datetime}") - # ==================== 性能优化:批量查询所有股票数据 ==================== + # ==================== 优化3: 缓存前一交易日收盘价(24小时) ==================== # 使用 IN 子句一次查询所有股票,避免逐只循环查询 try: - # 先从 MySQL ea_trade 表查询前一交易日的收盘价(日线数据,查询更快) + # 先从 Redis 缓存获取前一交易日的收盘价(使用 MGET 批量获取) prev_close_map = {} if prev_trading_day: - with engine.connect() as conn: - # 提取不带后缀的股票代码用于查询 - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} + prev_day_str = prev_trading_day.strftime('%Y-%m-%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day) + base_codes = list(set([code.split('.')[0] for code in codes])) + uncached_close_codes = [] + base_close_map = {} + + # 使用 MGET 批量获取已缓存的收盘价(减少网络往返) + try: + cache_keys = [f"stock:close:{prev_day_str}:{code}" for code in base_codes] + cached_values = redis_client.mget(cache_keys) + for i, base_code in enumerate(base_codes): + cached_close = cached_values[i] + if cached_close is not None: + base_close_map[base_code] = float(cached_close) if cached_close != 'null' else None + else: + uncached_close_codes.append(base_code) + except: + uncached_close_codes = base_codes + + # 只查询未缓存的股票收盘价 + if uncached_close_codes: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(uncached_close_codes))]) + params = {f'code{i}': code for i, code in enumerate(uncached_close_codes)} params['trade_date'] = prev_trading_day prev_close_result = conn.execute(text(f""" @@ -7328,16 +7400,37 @@ def get_stock_quotes(): AND TRADEDATE = :trade_date """), params).fetchall() - # 构建代码到收盘价的映射(需要匹配完整代码格式) - base_close_map = {row[0]: float(row[1]) if row[1] else None for row in prev_close_result} + # 缓存查询结果到 Redis(24小时) + queried_codes = set() + for row in prev_close_result: + base_code, close_price = row[0], row[1] + close_val = float(close_price) if close_price else None + base_close_map[base_code] = close_val + queried_codes.add(base_code) + try: + cache_key = f"stock:close:{prev_day_str}:{base_code}" + redis_client.setex(cache_key, 86400, str(close_val) if close_val else 'null') + except: + pass - # 为每个标准化代码(带后缀)分配收盘价,用于 ClickHouse 查询结果匹配 - for norm_code in normalized_codes: - base_code = norm_code.split('.')[0] - if base_code in base_close_map: - prev_close_map[norm_code] = base_close_map[base_code] + # 对于查询不到的股票也缓存空值,避免重复查询 + for base_code in uncached_close_codes: + if base_code not in queried_codes: + try: + cache_key = f"stock:close:{prev_day_str}:{base_code}" + redis_client.setex(cache_key, 86400, 'null') + except: + pass - print(f"前一交易日({prev_trading_day})收盘价查询返回 {len(prev_close_result)} 条数据") + print(f"前一交易日({prev_day_str})收盘价: 缓存命中 {len(base_codes) - len(uncached_close_codes)}, 新查询 {len(prev_close_result)}") + else: + print(f"前一交易日({prev_day_str})收盘价: 全部命中缓存 {len(base_codes)} 只股票") + + # 为每个标准化代码(带后缀)分配收盘价,用于 ClickHouse 查询结果匹配 + for norm_code in normalized_codes: + base_code = norm_code.split('.')[0] + if base_code in base_close_map: + prev_close_map[norm_code] = base_close_map[base_code] # 批量查询当前价格数据 batch_price_query = """ @@ -7389,14 +7482,14 @@ def get_stock_quotes(): results[orig_code] = { 'price': price_info['price'], 'change': price_info['change'], - 'name': stock_names.get(orig_code, stock_names.get(norm_code, f'股票{orig_code.split(".")[0]}')) + 'name': full_stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}') } else: # 批量查询没有返回的股票 results[orig_code] = { 'price': None, 'change': None, - 'name': stock_names.get(orig_code, stock_names.get(norm_code, f'股票{orig_code.split(".")[0]}')) + 'name': full_stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}') } except Exception as e: @@ -7435,11 +7528,11 @@ def get_stock_quotes(): results[orig_code] = { 'price': last_price, 'change': change_pct, - 'name': stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}') + 'name': full_stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}') } except Exception as inner_e: print(f"Error processing stock {orig_code}: {inner_e}") - results[orig_code] = {'price': None, 'change': None, 'name': stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}')} + results[orig_code] = {'price': None, 'change': None, 'name': full_stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}')} # 返回标准格式 return jsonify({'success': True, 'data': results})