From 9603adbd313906ff8104dd213d74a7f7cfbfd68d Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Mon, 15 Dec 2025 12:07:36 +0800 Subject: [PATCH] update pay ui --- app.py | 210 +++++++++++++++------------------------------------------ 1 file changed, 55 insertions(+), 155 deletions(-) diff --git a/app.py b/app.py index ba1e381f..e9b6987c 100755 --- a/app.py +++ b/app.py @@ -7174,12 +7174,11 @@ def get_transmission_chain(event_id): @app.route('/api/stock/quotes', methods=['GET', 'POST']) def get_stock_quotes(): """ - 获取股票行情数据(优化版:使用 Redis 缓存) - 缓存策略: - - 股票名称:缓存 24 小时 - - 交易日数据:缓存 1 小时 - - 前一交易日收盘价:盘中缓存到收盘 - - 实时价格:缓存 3 秒(避免重复查询) + 获取股票行情数据(无缓存版本,直接查询数据库) + - 股票名称:从 MySQL ea_stocklist 查询 + - 交易日数据:从 MySQL trading_days 查询 + - 前一交易日收盘价:从 MySQL ea_trade 查询 + - 实时价格:从 ClickHouse stock_minute 查询 """ try: if request.method == 'GET': @@ -7217,28 +7216,14 @@ def get_stock_quotes(): current_time = datetime.now() - # ==================== 优化1: 缓存股票名称(24小时)使用 MGET 批量获取 ==================== + # ==================== 查询股票名称(直接查 MySQL) ==================== stock_names = {} base_codes = list(set([code.split('.')[0] for code in codes])) - uncached_base_codes = [] - # 使用 MGET 批量获取已缓存的股票名称(减少网络往返) - try: - cache_keys = [f"stock:name:{code}" for code in base_codes] - cached_values = redis_client.mget(cache_keys) - for i, base_code in enumerate(base_codes): - if cached_values[i]: - stock_names[base_code] = cached_values[i] - else: - uncached_base_codes.append(base_code) - except: - uncached_base_codes = base_codes - - # 只查询未缓存的股票名称 - if uncached_base_codes: + if base_codes: with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(uncached_base_codes))]) - params = {f'code{i}': code for i, code in enumerate(uncached_base_codes)} + placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) + params = {f'code{i}': code for i, code in enumerate(base_codes)} result = conn.execute(text( f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" ), params).fetchall() @@ -7246,11 +7231,6 @@ def get_stock_quotes(): for row in result: base_code, name = row[0], row[1] stock_names[base_code] = name - # 缓存到 Redis(24小时) - try: - redis_client.setex(f"stock:name:{base_code}", 86400, name) - except: - pass # 构建完整的名称映射 full_stock_names = {} @@ -7260,38 +7240,17 @@ def get_stock_quotes(): full_stock_names[orig_code] = name full_stock_names[norm_code] = name - # ==================== 优化2: 缓存交易日判断(1小时) ==================== - def get_trading_day_cached(date_str): - """检查是否为交易日(带缓存)""" - cache_key = f"trading:is_day:{date_str}" - try: - cached = redis_client.get(cache_key) - if cached is not None: - return cached == '1' - except: - pass - + # ==================== 交易日判断(直接查 MySQL) ==================== + def is_trading_day(date_str): + """检查是否为交易日""" with engine.connect() as conn: - is_trading = conn.execute(text( + result = conn.execute(text( "SELECT 1 FROM trading_days WHERE EXCHANGE_DATE = :date" - ), {"date": date_str}).fetchone() is not None - - try: - redis_client.setex(cache_key, 3600, '1' if is_trading else '0') - except: - pass - return is_trading - - def get_next_trading_day_cached(date_str): - """获取下一个交易日(带缓存)""" - cache_key = f"trading:next:{date_str}" - try: - cached = redis_client.get(cache_key) - if cached: - return datetime.strptime(cached, '%Y-%m-%d').date() - except: - pass + ), {"date": date_str}).fetchone() + return result is not None + def get_next_trading_day(date_str): + """获取下一个交易日""" with engine.connect() as conn: result = conn.execute(text(""" SELECT EXCHANGE_DATE FROM trading_days @@ -7299,24 +7258,11 @@ def get_stock_quotes(): """), {"date": date_str}).fetchone() if result: - next_day = result[0].date() if hasattr(result[0], 'date') else result[0] - try: - redis_client.setex(cache_key, 3600, next_day.strftime('%Y-%m-%d')) - except: - pass - return next_day + return result[0].date() if hasattr(result[0], 'date') else result[0] return None - def get_prev_trading_day_cached(date_str): - """获取前一个交易日(带缓存)""" - cache_key = f"trading:prev:{date_str}" - try: - cached = redis_client.get(cache_key) - if cached: - return datetime.strptime(cached, '%Y-%m-%d').date() - except: - pass - + def get_prev_trading_day(date_str): + """获取前一个交易日""" with engine.connect() as conn: result = conn.execute(text(""" SELECT EXCHANGE_DATE FROM trading_days @@ -7324,12 +7270,7 @@ def get_stock_quotes(): """), {"date": date_str}).fetchone() if result: - prev_day = result[0].date() if hasattr(result[0], 'date') else result[0] - try: - redis_client.setex(cache_key, 3600, prev_day.strftime('%Y-%m-%d')) - except: - pass - return prev_day + return result[0].date() if hasattr(result[0], 'date') else result[0] return None def get_trading_day_and_times(event_datetime): @@ -7340,18 +7281,16 @@ def get_stock_quotes(): market_open = dt_time(9, 30) market_close = dt_time(15, 0) - is_trading_day = get_trading_day_cached(date_str) - - if is_trading_day: + if is_trading_day(date_str): if event_time_val < market_open: return event_date, market_open, market_close elif event_time_val > market_close: - next_day = get_next_trading_day_cached(date_str) + next_day = get_next_trading_day(date_str) return (next_day, market_open, market_close) else: return event_date, event_time_val, market_close else: - next_day = get_next_trading_day_cached(date_str) + next_day = get_next_trading_day(date_str) return (next_day, market_open, market_close) trading_day, start_time, end_time = get_trading_day_and_times(event_time) @@ -7366,92 +7305,55 @@ def get_stock_quotes(): start_datetime = datetime.combine(trading_day, start_time) end_datetime = datetime.combine(trading_day, end_time) - # 获取前一个交易日(使用缓存) - prev_trading_day = get_prev_trading_day_cached(trading_day.strftime('%Y-%m-%d')) + # 获取前一个交易日 + prev_trading_day = get_prev_trading_day(trading_day.strftime('%Y-%m-%d')) - # If the trading day is in the future relative to current time, - # return only names without data + # 如果交易日在未来,只返回名称 if trading_day > current_time.date(): return jsonify({ 'success': True, 'data': {code: {'name': name, 'price': None, 'change': None} - for code, name in stock_names.items()} + for code, name in full_stock_names.items()} }) results = {} print(f"批量处理 {len(codes)} 只股票: {codes[:5]}{'...' if len(codes) > 5 else ''}, 交易日: {trading_day}, 时间范围: {start_datetime} - {end_datetime}") - # ==================== 优化3: 缓存前一交易日收盘价(24小时) ==================== - # 使用 IN 子句一次查询所有股票,避免逐只循环查询 + # ==================== 查询前一交易日收盘价(直接查 MySQL) ==================== try: - # 先从 Redis 缓存获取前一交易日的收盘价(使用 MGET 批量获取) prev_close_map = {} if prev_trading_day: prev_day_str = prev_trading_day.strftime('%Y-%m-%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day) base_codes = list(set([code.split('.')[0] for code in codes])) - uncached_close_codes = [] base_close_map = {} - # 使用 MGET 批量获取已缓存的收盘价(减少网络往返) - try: - cache_keys = [f"stock:close:{prev_day_str}:{code}" for code in base_codes] - cached_values = redis_client.mget(cache_keys) - for i, base_code in enumerate(base_codes): - cached_close = cached_values[i] - if cached_close is not None: - base_close_map[base_code] = float(cached_close) if cached_close != 'null' else None - else: - uncached_close_codes.append(base_code) - except: - uncached_close_codes = base_codes + # 直接从 MySQL 批量查询 + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) + params = {f'code{i}': code for i, code in enumerate(base_codes)} + params['trade_date'] = prev_trading_day - # 只查询未缓存的股票收盘价 - if uncached_close_codes: - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(uncached_close_codes))]) - params = {f'code{i}': code for i, code in enumerate(uncached_close_codes)} - params['trade_date'] = prev_trading_day + prev_close_result = conn.execute(text(f""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE SECCODE IN ({placeholders}) + AND TRADEDATE = :trade_date + """), params).fetchall() - prev_close_result = conn.execute(text(f""" - SELECT SECCODE, F007N as close_price - FROM ea_trade - WHERE SECCODE IN ({placeholders}) - AND TRADEDATE = :trade_date - """), params).fetchall() + for row in prev_close_result: + base_code, close_price = row[0], row[1] + close_val = float(close_price) if close_price else None + base_close_map[base_code] = close_val - # 缓存查询结果到 Redis(24小时) - queried_codes = set() - for row in prev_close_result: - base_code, close_price = row[0], row[1] - close_val = float(close_price) if close_price else None - base_close_map[base_code] = close_val - queried_codes.add(base_code) - try: - cache_key = f"stock:close:{prev_day_str}:{base_code}" - redis_client.setex(cache_key, 86400, str(close_val) if close_val else 'null') - except: - pass + print(f"前一交易日({prev_day_str})收盘价: 查询到 {len(prev_close_result)} 条") - # 对于查询不到的股票也缓存空值,避免重复查询 - for base_code in uncached_close_codes: - if base_code not in queried_codes: - try: - cache_key = f"stock:close:{prev_day_str}:{base_code}" - redis_client.setex(cache_key, 86400, 'null') - except: - pass - - print(f"前一交易日({prev_day_str})收盘价: 缓存命中 {len(base_codes) - len(uncached_close_codes)}, 新查询 {len(prev_close_result)}") - else: - print(f"前一交易日({prev_day_str})收盘价: 全部命中缓存 {len(base_codes)} 只股票") - - # 为每个标准化代码(带后缀)分配收盘价,用于 ClickHouse 查询结果匹配 + # 为每个标准化代码分配收盘价 for norm_code in normalized_codes: base_code = norm_code.split('.')[0] if base_code in base_close_map: prev_close_map[norm_code] = base_close_map[base_code] - # 批量查询当前价格数据 + # 批量查询当前价格数据(从 ClickHouse) batch_price_query = """ WITH last_prices AS ( SELECT @@ -7469,21 +7371,21 @@ def get_stock_quotes(): """ batch_data = client.execute(batch_price_query, { - 'codes': normalized_codes, # 使用标准化后的代码查询 ClickHouse + 'codes': normalized_codes, 'start': start_datetime, 'end': end_datetime }) print(f"批量查询返回 {len(batch_data)} 条价格数据") - # 解析批量查询结果,使用前一交易日收盘价计算涨跌幅 + # 解析批量查询结果 price_data_map = {} for row in batch_data: code = row[0] last_price = float(row[1]) if row[1] is not None else None prev_close = prev_close_map.get(code) - # 计算涨跌幅:(当前价 - 前一交易日收盘价) / 前一交易日收盘价 * 100 + # 计算涨跌幅 change_pct = None if last_price is not None and prev_close is not None and prev_close > 0: change_pct = (last_price - prev_close) / prev_close * 100 @@ -7493,7 +7395,7 @@ def get_stock_quotes(): 'change': change_pct } - # 组装结果(所有股票)- 使用原始代码作为 key 返回 + # 组装结果 for orig_code in original_codes: norm_code = code_mapping[orig_code] price_info = price_data_map.get(norm_code) @@ -7504,7 +7406,6 @@ def get_stock_quotes(): 'name': full_stock_names.get(orig_code, f'股票{orig_code.split(".")[0]}') } else: - # 批量查询没有返回的股票 results[orig_code] = { 'price': None, 'change': None, @@ -7512,12 +7413,12 @@ def get_stock_quotes(): } except Exception as e: - print(f"批量查询 ClickHouse 失败: {e},回退到逐只查询") - # 降级方案:逐只股票查询(使用前一交易日收盘价计算涨跌幅) + print(f"批量查询失败: {e},回退到逐只查询") + # 降级方案:逐只股票查询 for orig_code in original_codes: norm_code = code_mapping[orig_code] try: - # 查询当前价格(使用标准化代码查询 ClickHouse) + # 查询当前价格 current_data = client.execute(""" SELECT close FROM stock_minute WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s @@ -7526,7 +7427,7 @@ def get_stock_quotes(): last_price = float(current_data[0][0]) if current_data and current_data[0] and current_data[0][0] else None - # 从 MySQL ea_trade 表查询前一交易日收盘价 + # 查询前一交易日收盘价 prev_close = None if prev_trading_day and last_price is not None: base_code = orig_code.split('.')[0] @@ -7543,7 +7444,6 @@ def get_stock_quotes(): if last_price is not None and prev_close is not None and prev_close > 0: change_pct = (last_price - prev_close) / prev_close * 100 - # 使用原始代码作为 key 返回 results[orig_code] = { 'price': last_price, 'change': change_pct,