diff --git a/app.py b/app.py index 02d251d2..c8494ba9 100755 --- a/app.py +++ b/app.py @@ -43,6 +43,7 @@ else: import base64 import csv import io +import threading import time import urllib import uuid @@ -219,11 +220,12 @@ load_trading_days() engine = create_engine( "mysql+pymysql://root:Zzl33818!@127.0.0.1:3306/stock?charset=utf8mb4", echo=False, - pool_size=10, - pool_recycle=3600, - pool_pre_ping=True, - pool_timeout=30, - max_overflow=20 + pool_size=50, # 每个 worker 常驻连接数 + pool_recycle=1800, # 连接回收时间 30 分钟(原 1 小时) + pool_pre_ping=True, # 使用前检测连接是否有效 + pool_timeout=20, # 获取连接超时时间(秒) + max_overflow=100 # 每个 worker 临时溢出连接数 + # 每个 worker 最多 150 个连接,32 workers 总共最多 4800 个连接 ) # Elasticsearch 客户端初始化 @@ -371,6 +373,197 @@ def wechat_session_exists(state): print(f"❌ Redis 检查 wechat session 失败: {e}") return False # ============ 微信登录 Session 管理结束 ============ + +# ============ 股票数据 Redis 缓存(股票名称 + 前收盘价) ============ +STOCK_NAME_PREFIX = "vf:stock:name:" # 股票名称缓存前缀 +STOCK_NAME_EXPIRE = 86400 # 股票名称缓存24小时 +PREV_CLOSE_PREFIX = "vf:stock:prev_close:" # 前收盘价缓存前缀 +PREV_CLOSE_EXPIRE = 86400 # 前收盘价缓存24小时(当日有效) + + +def get_cached_stock_names(base_codes): + """ + 批量获取股票名称(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :return: dict {code: name} + """ + if not base_codes: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取 + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{STOCK_NAME_PREFIX}{code}") + cached_values = pipe.execute() + + for code, cached_name in zip(base_codes, cached_values): + if cached_name: + result[code] = cached_name + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取股票名称失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的股票名称 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + db_result = conn.execute(text( + f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" + ), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, name = row[0], row[1] + result[code] = name + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存股票名称失败: {e}") + except Exception as e: + print(f"❌ 数据库查询股票名称失败: {e}") + + return result + + +def get_cached_prev_close(base_codes, trade_date_str): + """ + 批量获取前收盘价(优先从 Redis 缓存读取) + :param base_codes: 股票代码列表(不带后缀,如 ['600000', '000001']) + :param trade_date_str: 交易日期字符串(格式 YYYYMMDD) + :return: dict {code: close_price} + """ + if not base_codes or not trade_date_str: + return {} + + result = {} + missing_codes = [] + + try: + # 批量从 Redis 获取(缓存键包含日期,确保不会跨日混用) + pipe = redis_client.pipeline() + for code in base_codes: + pipe.get(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}") + cached_values = pipe.execute() + + for code, cached_price in zip(base_codes, cached_values): + if cached_price: + result[code] = float(cached_price) + else: + missing_codes.append(code) + except Exception as e: + print(f"⚠️ Redis 批量获取前收盘价失败: {e},降级为数据库查询") + missing_codes = base_codes + + # 从数据库查询缺失的前收盘价 + if missing_codes: + try: + with engine.connect() as conn: + placeholders = ','.join([f':code{i}' for i in range(len(missing_codes))]) + params = {f'code{i}': code for i, code in enumerate(missing_codes)} + params['trade_date'] = trade_date_str + db_result = conn.execute(text(f""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE SECCODE IN ({placeholders}) + AND TRADEDATE = :trade_date + AND F007N > 0 + """), params).fetchall() + + # 写入 Redis 缓存 + pipe = redis_client.pipeline() + for row in db_result: + code, close_price = row[0], float(row[1]) if row[1] else None + if close_price: + result[code] = close_price + pipe.setex(f"{PREV_CLOSE_PREFIX}{trade_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + + try: + pipe.execute() + except Exception as e: + print(f"⚠️ Redis 缓存前收盘价失败: {e}") + except Exception as e: + print(f"❌ 数据库查询前收盘价失败: {e}") + + return result + + +def preload_stock_cache(): + """ + 预热股票缓存(定时任务,每天 9:25 执行) + - 批量加载所有股票名称 + - 批量加载前一交易日收盘价 + """ + from datetime import datetime, timedelta + print(f"[缓存预热] 开始预热股票缓存... {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + try: + # 1. 预热股票名称(全量加载) + with engine.connect() as conn: + result = conn.execute(text("SELECT SECCODE, SECNAME FROM ea_stocklist")).fetchall() + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, name = row[0], row[1] + if code and name: + pipe.setex(f"{STOCK_NAME_PREFIX}{code}", STOCK_NAME_EXPIRE, name) + count += 1 + pipe.execute() + print(f"[缓存预热] 股票名称: {count} 条已加载到 Redis") + + # 2. 预热前收盘价(获取前一交易日) + today = datetime.now().date() + today_str = today.strftime('%Y-%m-%d') + + prev_trading_day = None + if 'trading_days' in globals() and trading_days: + for td in reversed(trading_days): + if td < today_str: + prev_trading_day = td + break + + if prev_trading_day: + prev_date_str = prev_trading_day.replace('-', '') # YYYYMMDD 格式 + with engine.connect() as conn: + result = conn.execute(text(""" + SELECT SECCODE, F007N as close_price + FROM ea_trade + WHERE TRADEDATE = :trade_date AND F007N > 0 + """), {'trade_date': prev_date_str}).fetchall() + + pipe = redis_client.pipeline() + count = 0 + for row in result: + code, close_price = row[0], row[1] + if code and close_price: + pipe.setex(f"{PREV_CLOSE_PREFIX}{prev_date_str}:{code}", PREV_CLOSE_EXPIRE, str(close_price)) + count += 1 + pipe.execute() + print(f"[缓存预热] 前收盘价({prev_trading_day}): {count} 条已加载到 Redis") + else: + print(f"[缓存预热] 未找到前一交易日,跳过前收盘价预热") + + print(f"[缓存预热] 预热完成 ✅ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + except Exception as e: + print(f"[缓存预热] 预热失败 ❌: {e}") + import traceback + traceback.print_exc() + + +print(f"📦 股票缓存: Redis, 名称过期 {STOCK_NAME_EXPIRE}秒, 收盘价过期 {PREV_CLOSE_EXPIRE}秒") +# ============ 股票数据 Redis 缓存结束 ============ + # 腾讯云短信配置 SMS_SECRET_ID = 'AKID2we9TacdTAhCjCSYTErHVimeJo9Yr00s' SMS_SECRET_KEY = 'pMlBWijlkgT9fz5ziEXdWEnAPTJzRfkf' @@ -517,11 +710,12 @@ app.config['COMPRESS_MIMETYPES'] = [ app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:Zzl33818!@127.0.0.1:3306/stock?charset=utf8mb4' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { - 'pool_size': 10, - 'pool_recycle': 3600, - 'pool_pre_ping': True, - 'pool_timeout': 30, - 'max_overflow': 20 + 'pool_size': 50, # 每个 worker 常驻连接数 + 'pool_recycle': 1800, # 连接回收时间 30 分钟(原 1 小时) + 'pool_pre_ping': True, # 使用前检测连接是否有效 + 'pool_timeout': 20, # 获取连接超时时间(秒) + 'max_overflow': 100 # 每个 worker 临时溢出连接数 + # 每个 worker 最多 150 个连接,32 workers 总共最多 4800 个连接 } # Cache directory setup CACHE_DIR = Path('cache') @@ -6465,50 +6659,14 @@ class RelatedData(db.Model): class RelatedConcepts(db.Model): - """关联数据模型""" + """相关概念模型(AI分析结果)""" + __tablename__ = 'related_concepts' id = db.Column(db.Integer, primary_key=True) event_id = db.Column(db.Integer, db.ForeignKey('event.id')) - concept_code = db.Column(db.String(20)) # 数据标题 - concept = db.Column(db.String(100)) # 数据类型 - reason = db.Column(db.Text) # 数据描述 - image_paths = db.Column(db.JSON) # 数据内容(JSON格式) + concept = db.Column(db.String(255)) # 概念名称 + reason = db.Column(db.Text) # 关联原因(AI分析) created_at = db.Column(db.DateTime, default=beijing_now) - @property - def image_paths_list(self): - """返回解析后的图片路径列表""" - if not self.image_paths: - return [] - - try: - # 如果是字符串,先解析成JSON - if isinstance(self.image_paths, str): - paths = json.loads(self.image_paths) - else: - paths = self.image_paths - - # 确保paths是列表 - if not isinstance(paths, list): - paths = [paths] - - # 从每个对象中提取path字段 - return [item['path'] if isinstance(item, dict) and 'path' in item - else item for item in paths] - except Exception as e: - print(f"Error processing image paths: {e}") - return [] - - def get_first_image_path(self): - """获取第一张图片的完整路径""" - paths = self.image_paths_list - if not paths: - return None - - # 获取第一个路径 - first_path = paths[0] - # 返回完整路径 - return first_path - class EventHotHistory(db.Model): """事件热度历史记录""" @@ -6981,23 +7139,21 @@ def get_events_by_stocks(): @app.route('/api/events//concepts', methods=['GET']) def get_related_concepts(event_id): - """获取相关概念列表""" + """获取相关概念列表(AI分析结果)""" try: # 订阅控制:相关概念需要 Pro 及以上 if not _has_required_level('pro'): return jsonify({'success': False, 'error': '需要Pro订阅', 'required_level': 'pro'}), 403 - event = Event.query.get_or_404(event_id) - concepts = event.related_concepts.all() + + # 直接查询 related_concepts 表 + concepts = RelatedConcepts.query.filter_by(event_id=event_id).all() concepts_data = [] for concept in concepts: concepts_data.append({ 'id': concept.id, - 'concept_code': concept.concept_code, 'concept': concept.concept, 'reason': concept.reason, - 'image_paths': concept.image_paths_list, - 'first_image_path': concept.get_first_image_path(), 'created_at': concept.created_at.isoformat() if concept.created_at else None }) @@ -7310,21 +7466,9 @@ def get_stock_quotes(): current_time = datetime.now() - # ==================== 查询股票名称(直接查 MySQL) ==================== - stock_names = {} + # ==================== 查询股票名称(使用 Redis 缓存) ==================== base_codes = list(set([code.split('.')[0] for code in codes])) - - if base_codes: - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - - for row in result: - base_code, name = row[0], row[1] - stock_names[base_code] = name + stock_names = get_cached_stock_names(base_codes) # 构建完整的名称映射 full_stock_names = {} @@ -7355,34 +7499,17 @@ def get_stock_quotes(): # 初始化 ClickHouse 客户端 client = get_clickhouse_client() - # ==================== 查询前一交易日收盘价(直接查 MySQL) ==================== + # ==================== 查询前一交易日收盘价(使用 Redis 缓存) ==================== try: prev_close_map = {} if prev_trading_day: # ea_trade 表的 TRADEDATE 格式是 YYYYMMDD(无连字符) prev_day_str = prev_trading_day.strftime('%Y%m%d') if hasattr(prev_trading_day, 'strftime') else str(prev_trading_day).replace('-', '') base_codes = list(set([code.split('.')[0] for code in codes])) - base_close_map = {} - # 直接从 MySQL 批量查询 - with engine.connect() as conn: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_day_str - - prev_close_result = conn.execute(text(f""" - SELECT SECCODE, F007N as close_price - FROM ea_trade - WHERE SECCODE IN ({placeholders}) - AND TRADEDATE = :trade_date - """), params).fetchall() - - for row in prev_close_result: - base_code, close_price = row[0], row[1] - close_val = float(close_price) if close_price else None - base_close_map[base_code] = close_val - - print(f"前一交易日({prev_day_str})收盘价: 查询到 {len(prev_close_result)} 条") + # 使用 Redis 缓存获取前收盘价 + base_close_map = get_cached_prev_close(base_codes, prev_day_str) + print(f"前一交易日({prev_day_str})收盘价: 获取到 {len(base_close_map)} 条(Redis缓存)") # 为每个标准化代码分配收盘价 for norm_code in normalized_codes: @@ -7391,20 +7518,16 @@ def get_stock_quotes(): prev_close_map[norm_code] = base_close_map[base_code] # 批量查询当前价格数据(从 ClickHouse) + # 使用 argMax 函数获取最新价格,比窗口函数效率高很多 batch_price_query = """ - WITH last_prices AS ( - SELECT - code, - close as last_price, - ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp DESC) as rn - FROM stock_minute - WHERE code IN %(codes)s - AND timestamp >= %(start)s - AND timestamp <= %(end)s - ) - SELECT code, last_price - FROM last_prices - WHERE rn = 1 + SELECT + code, + argMax(close, timestamp) as last_price + FROM stock_minute + WHERE code IN %(codes)s + AND timestamp >= %(start)s + AND timestamp <= %(end)s + GROUP BY code """ batch_data = client.execute(batch_price_query, { @@ -7500,14 +7623,25 @@ def get_stock_quotes(): return jsonify({'success': False, 'error': str(e)}), 500 +# ==================== ClickHouse 连接池(单例模式) ==================== +_clickhouse_client = None +_clickhouse_client_lock = threading.Lock() + def get_clickhouse_client(): - return Cclient( - host='127.0.0.1', - port=9000, - user='default', - password='Zzl33818!', - database='stock' - ) + """获取 ClickHouse 客户端(单例模式,避免重复创建连接)""" + global _clickhouse_client + if _clickhouse_client is None: + with _clickhouse_client_lock: + if _clickhouse_client is None: + _clickhouse_client = Cclient( + host='127.0.0.1', + port=9000, + user='default', + password='Zzl33818!', + database='stock' + ) + print("[ClickHouse] 创建新连接(单例)") + return _clickhouse_client @app.route('/api/account/calendar/events', methods=['GET', 'POST']) @@ -8142,18 +8276,9 @@ def get_batch_kline_data(): client = get_clickhouse_client() - # 批量获取股票名称 - stock_names = {} - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - result = conn.execute(text( - f"SELECT SECCODE, SECNAME FROM ea_stocklist WHERE SECCODE IN ({placeholders})" - ), params).fetchall() - for row in result: - stock_names[row[0]] = row[1] + # 批量获取股票名称(使用 Redis 缓存) + base_codes = list(set([code.split('.')[0] for code in codes])) + stock_names = get_cached_stock_names(base_codes) # 确定目标交易日和涨跌幅基准日(处理跨周末场景) # - 周五15:00后到周一15:00前,分时图显示周一行情,涨跌幅基于周五收盘价 @@ -8172,24 +8297,14 @@ def get_batch_kline_data(): results = {} if chart_type == 'timeline': - # 批量获取前收盘价(从 MySQL ea_trade 表) + # 批量获取前收盘价(使用 Redis 缓存) # 使用 prev_trading_day 作为基准日期(处理跨周末场景) prev_close_map = {} if prev_trading_day: prev_date_str = prev_trading_day.strftime('%Y%m%d') - with engine.connect() as conn: - base_codes = list(set([code.split('.')[0] for code in codes])) - if base_codes: - placeholders = ','.join([f':code{i}' for i in range(len(base_codes))]) - params = {f'code{i}': code for i, code in enumerate(base_codes)} - params['trade_date'] = prev_date_str - result = conn.execute(text(f""" - SELECT SECCODE, F007N FROM ea_trade - WHERE SECCODE IN ({placeholders}) AND TRADEDATE = :trade_date AND F007N > 0 - """), params).fetchall() - for row in result: - prev_close_map[row[0]] = float(row[1]) - print(f"分时图基准日期: {prev_trading_day}, 查询到 {len(prev_close_map)} 条前收盘价") + base_codes = list(set([code.split('.')[0] for code in codes])) + prev_close_map = get_cached_prev_close(base_codes, prev_date_str) + print(f"分时图基准日期: {prev_trading_day}, 获取到 {len(prev_close_map)} 条前收盘价(Redis缓存)") # 批量查询分时数据(使用标准化代码查询 ClickHouse) batch_data = client.execute(""" @@ -8686,8 +8801,6 @@ def get_stock_quote_detail(stock_code): 'eps': None, 'market_cap': None, 'circ_mv': None, - 'total_shares': None, # 发行总股本(亿股) - 'float_shares': None, # 流通股本(亿股) 'turnover_rate': None, 'week52_high': None, 'week52_low': None, @@ -8734,9 +8847,6 @@ def get_stock_quote_detail(stock_code): if trade_result: row = row_to_dict(trade_result) - # 调试日志:打印所有字段 - app.logger.info(f"[quote-detail] stock={base_code}, row keys={list(row.keys())}") - app.logger.info(f"[quote-detail] total_shares={row.get('total_shares')}, float_shares={row.get('float_shares')}, pe_ratio={row.get('pe_ratio')}") result_data['name'] = row.get('SECNAME') or '' result_data['current_price'] = float(row.get('close_price') or 0) result_data['change_percent'] = float(row.get('change_pct') or 0) @@ -8744,31 +8854,16 @@ def get_stock_quote_detail(stock_code): result_data['yesterday_close'] = float(row.get('pre_close') or 0) result_data['today_high'] = float(row.get('high') or 0) result_data['today_low'] = float(row.get('low') or 0) - pe_value = row.get('pe_ratio') or row.get('F026N') - result_data['pe'] = float(pe_value) if pe_value else None + result_data['pe'] = float(row.get('pe_ratio') or 0) if row.get('pe_ratio') else None result_data['turnover_rate'] = float(row.get('turnover_rate') or 0) result_data['sw_industry_l1'] = row.get('sw_industry_l1') or '' result_data['sw_industry_l2'] = row.get('sw_industry_l2') or '' result_data['industry_l1'] = row.get('industry_l1') or '' result_data['industry'] = row.get('sw_industry_l2') or row.get('sw_industry_l1') or '' - # 计算股本和市值(兼容别名和原始字段名) - total_shares = float(row.get('total_shares') or row.get('F020N') or 0) - float_shares = float(row.get('float_shares') or row.get('F021N') or 0) - close_price = float(row.get('close_price') or row.get('F007N') or 0) - app.logger.info(f"[quote-detail] calculated: total_shares={total_shares}, float_shares={float_shares}") - - # 发行总股本(亿股) - if total_shares > 0: - total_shares_yi = total_shares / 100000000 # 转为亿股 - result_data['total_shares'] = round(total_shares_yi, 2) - - # 流通股本(亿股) - if float_shares > 0: - float_shares_yi = float_shares / 100000000 # 转为亿股 - result_data['float_shares'] = round(float_shares_yi, 2) - # 计算流通市值(亿元) + float_shares = float(row.get('float_shares') or 0) + close_price = float(row.get('close_price') or 0) if float_shares > 0 and close_price > 0: circ_mv = (float_shares * close_price) / 100000000 # 转为亿 result_data['circ_mv'] = round(circ_mv, 2) @@ -10477,7 +10572,10 @@ def api_get_events(): include_related_data = request.args.get('include_related_data', 'false').lower() == 'true' # ==================== 构建查询 ==================== - query = Event.query + from sqlalchemy.orm import joinedload + + # 使用 joinedload 预加载 creator,解决 N+1 查询问题 + query = Event.query.options(joinedload(Event.creator)) # 只返回有关联股票的事件(没有关联股票的事件不计入列表) from sqlalchemy import exists @@ -11883,6 +11981,18 @@ def initialize_event_polling(): name='检查新事件并推送', replace_existing=True ) + + # 每天 9:25 预热股票缓存(开盘前 5 分钟) + from apscheduler.triggers.cron import CronTrigger + scheduler.add_job( + func=preload_stock_cache, + trigger=CronTrigger(hour=9, minute=25), + id='preload_stock_cache', + name='预热股票缓存(股票名称+前收盘价)', + replace_existing=True + ) + print(f'[缓存] 已添加定时任务: 每天 9:25 预热股票缓存') + scheduler.start() print(f'[轮询] APScheduler 调度器已启动 (PID: {os.getpid()}),每 30 秒检查一次新事件') @@ -18708,5 +18818,12 @@ if __name__ == '__main__': # 初始化事件轮询机制(WebSocket 推送) initialize_event_polling() + # 启动时预热股票缓存(股票名称 + 前收盘价) + print("[启动] 正在预热股票缓存...") + try: + preload_stock_cache() + except Exception as e: + print(f"[启动] 预热缓存失败(不影响服务启动): {e}") + # 使用 socketio.run 替代 app.run 以支持 WebSocket socketio.run(app, host='0.0.0.0', port=5001, debug=False, allow_unsafe_werkzeug=True) \ No newline at end of file diff --git a/get_related_chg.py b/get_related_chg.py new file mode 100644 index 00000000..58813cbe --- /dev/null +++ b/get_related_chg.py @@ -0,0 +1,491 @@ +from clickhouse_driver import Client as Cclient +from sqlalchemy import create_engine, text +from datetime import datetime, time as dt_time, timedelta +import time +import pandas as pd +import os + +# 读取交易日数据 +script_dir = os.path.dirname(os.path.abspath(__file__)) +TRADING_DAYS_FILE = os.path.join(script_dir, 'tdays.csv') +trading_days_df = pd.read_csv(TRADING_DAYS_FILE) +trading_days_df['DateTime'] = pd.to_datetime(trading_days_df['DateTime']).dt.date +TRADING_DAYS = sorted(trading_days_df['DateTime'].tolist()) # 排序后的交易日列表 + + +def get_clickhouse_client(): + return Cclient( + host='127.0.0.1', + port=9000, + user='default', + password='Zzl33818!', + database='stock' + ) + + +def get_mysql_engine(): + return create_engine( + "mysql+pymysql://root:Zzl33818!@127.0.0.1:3306/stock", + echo=False + ) + + +def is_trading_time(check_datetime=None): + """判断是否在交易时间内 + + Args: + check_datetime: 要检查的时间,默认为当前时间 + + Returns: + bool: True表示在交易时间内 + """ + if check_datetime is None: + check_datetime = datetime.now() + + # 检查是否是交易日 + check_date = check_datetime.date() + if check_date not in TRADING_DAYS: + return False + + # 检查是否在交易时段内 + check_time = check_datetime.time() + + # 上午时段: 9:30 - 11:30 + morning_start = dt_time(9, 30) + morning_end = dt_time(11, 30) + + # 下午时段: 13:00 - 15:00 + afternoon_start = dt_time(13, 0) + afternoon_end = dt_time(15, 0) + + is_morning = morning_start <= check_time <= morning_end + is_afternoon = afternoon_start <= check_time <= afternoon_end + + return is_morning or is_afternoon + + +def get_next_trading_time(): + """获取下一个交易时段的开始时间""" + now = datetime.now() + current_date = now.date() + current_time = now.time() + + # 如果今天是交易日 + if current_date in TRADING_DAYS: + morning_start = dt_time(9, 30) + afternoon_start = dt_time(13, 0) + + # 如果还没到上午开盘 + if current_time < morning_start: + return datetime.combine(current_date, morning_start) + # 如果在上午休市后,下午还没开盘 + elif dt_time(11, 30) < current_time < afternoon_start: + return datetime.combine(current_date, afternoon_start) + + # 否则找下一个交易日的上午开盘时间 + for td in TRADING_DAYS: + if td > current_date: + return datetime.combine(td, dt_time(9, 30)) + + # 如果没有找到未来交易日,返回明天上午9:30(可能需要更新交易日数据) + return datetime.combine(current_date + timedelta(days=1), dt_time(9, 30)) + + +def get_next_trading_day(date): + """获取下一个交易日""" + for td in TRADING_DAYS: + if td > date: + return td + return None + + +def get_nth_trading_day_after(start_date, n=7): + """获取start_date之后的第n个交易日""" + try: + start_idx = TRADING_DAYS.index(start_date) + target_idx = start_idx + n + if target_idx < len(TRADING_DAYS): + return TRADING_DAYS[target_idx] + except (ValueError, IndexError): + pass + + # 如果start_date不在交易日列表中,找到它之后的交易日 + future_days = [d for d in TRADING_DAYS if d > start_date] + if len(future_days) >= n: + return future_days[n - 1] + elif future_days: + return future_days[-1] # 返回最后一个可用的交易日 + + return None + + +def get_trading_day_info(event_datetime): + """获取事件对应的交易日信息""" + event_date = event_datetime.date() + market_close = dt_time(15, 0) + + # 如果是交易日且在收盘前,使用当天 + if event_date in TRADING_DAYS and event_datetime.time() <= market_close: + return event_date + + # 否则使用下一个交易日 + return get_next_trading_day(event_date) + + +def calculate_stock_changes(stock_codes, event_datetime, ch_client, debug=False): + """批量计算一个事件关联的所有股票涨跌幅""" + + if not stock_codes: + return None, None, None + + event_date = event_datetime.date() + event_time = event_datetime.time() + market_open = dt_time(9, 30) + market_close = dt_time(15, 0) + + # 确定起始时间点(事件发生后的第一个有效价格点) + if event_date in TRADING_DAYS and market_open <= event_time <= market_close: + # 事件在交易时间内发生 → 用事件发生时的价格作为起点 + start_datetime = event_datetime + trading_date = event_date + end_datetime = datetime.combine(trading_date, market_close) + if debug: + print(f" 事件在交易时间内: {event_datetime} -> 起点={start_datetime}") + else: + # 事件在交易时间外发生 → 用下一个交易日开盘价作为起点 + trading_date = get_trading_day_info(event_datetime) + if not trading_date: + if debug: + print(f" 找不到交易日: {event_datetime}") + return None, None, None + start_datetime = datetime.combine(trading_date, market_open) + end_datetime = datetime.combine(trading_date, market_close) + if debug: + print(f" 事件在非交易时间: {event_datetime} -> 下一交易日={trading_date}, 起点={start_datetime}") + + # 获取7个交易日后的日期 + week_trading_date = get_nth_trading_day_after(trading_date, 7) + if not week_trading_date: + # 降级:如果没有足够的未来交易日,就用当前能找到的最远日期 + week_trading_date = trading_date + timedelta(days=10) + + week_end_datetime = datetime.combine(week_trading_date, market_close) + + if debug: + print(f" 查询范围: {start_datetime} -> 当日={end_datetime}, 周末={week_end_datetime}") + print(f" 股票代码: {stock_codes}") + + # 一次性查询所有股票的价格数据 + results = ch_client.execute(""" + SELECT code, + -- 起始价格:事件发生时或之后的第一个价格 + argMin(close, timestamp) as start_price, + -- 当日收盘价:当日交易结束时的最后一个价格 + argMax( + close, if(timestamp <= %(end)s, timestamp, toDateTime('1970-01-01')) + ) as day_close_price, + -- 周后收盘价:7个交易日后的收盘价 + argMax( + close, if(timestamp <= %(week_end)s, timestamp, toDateTime('1970-01-01')) + ) as week_close_price + FROM stock_minute + WHERE code IN %(codes)s + AND timestamp >= %(start)s + AND timestamp <= %(week_end)s + GROUP BY code + HAVING start_price > 0 + """, { + 'codes': tuple(stock_codes), + 'start': start_datetime, + 'end': end_datetime, + 'week_end': week_end_datetime + }) + + if debug: + print(f" 查询到 {len(results)} 只股票的数据") + + if not results: + return None, None, None + + # 计算涨跌幅 + day_changes = [] + week_changes = [] + + for code, start_price, day_close, week_close in results: + if start_price and start_price > 0: + # 当日涨跌幅(从事件发生到当日收盘) + if day_close and day_close > 0: + day_change = (day_close - start_price) / start_price * 100 + day_changes.append(day_change) + + # 周度涨跌幅(从事件发生到第7个交易日收盘) + if week_close and week_close > 0: + week_change = (week_close - start_price) / start_price * 100 + week_changes.append(week_change) + + # 计算统计值 + avg_change = sum(day_changes) / len(day_changes) if day_changes else None + max_change = max(day_changes) if day_changes else None + avg_week_change = sum(week_changes) / len(week_changes) if week_changes else None + + if debug: + print( + f" 结果: 日均={avg_change:.2f}% 日最大={max_change:.2f}% 周均={avg_week_change:.2f}%" if avg_change else " 结果: 无有效数据") + + return avg_change, max_change, avg_week_change + + +def update_event_statistics(start_date=None, end_date=None, force_update=False, debug_mode=False): + """更新事件统计数据 + + Args: + start_date: 开始日期 + end_date: 结束日期 + force_update: 是否强制更新(忽略已有数据) + debug_mode: 是否开启调试模式 + """ + try: + print("[DEBUG] 开始 update_event_statistics") + print(f"[DEBUG] 参数: start_date={start_date}, end_date={end_date}, force_update={force_update}") + + mysql_engine = get_mysql_engine() + print("[DEBUG] MySQL 引擎创建成功") + + ch_client = get_clickhouse_client() + print("[DEBUG] ClickHouse 客户端创建成功") + + with mysql_engine.connect() as mysql_conn: + print("[DEBUG] MySQL 连接已建立") + # 构建SQL查询 + query = """ + SELECT e.id, \ + e.created_at, \ + GROUP_CONCAT(rs.stock_code) as stock_codes, + e.related_avg_chg, \ + e.related_max_chg, \ + e.related_week_chg + FROM event e + JOIN related_stock rs ON e.id = rs.event_id \ + """ + + conditions = [] + params = {} + + if start_date: + conditions.append("e.created_at >= :start_date") + params["start_date"] = start_date + + if end_date: + conditions.append("e.created_at <= :end_date") + params["end_date"] = end_date + + if not force_update: + # 只更新没有数据的记录 + conditions.append("(e.related_avg_chg IS NULL OR e.related_max_chg IS NULL)") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + query += """ + GROUP BY e.id, e.created_at, e.related_avg_chg, e.related_max_chg, e.related_week_chg + ORDER BY e.created_at DESC + """ + + print(f"[DEBUG] 执行查询SQL:\n{query}") + print(f"[DEBUG] 查询参数: {params}") + + events = mysql_conn.execute(text(query), params).fetchall() + + print(f"[DEBUG] 查询返回 {len(events)} 条事件记录") + print(f"Found {len(events)} events to update (force_update={force_update})") + if debug_mode and len(events) > 0: + print(f"Date range: {events[-1][1]} to {events[0][1]}") + + # 准备批量更新数据 + update_data = [] + + for idx, event in enumerate(events, 1): + try: + event_id = event[0] + created_at = event[1] + stock_codes = event[2].split(',') if event[2] else [] + existing_avg = event[3] + existing_max = event[4] + existing_week = event[5] + + if not stock_codes: + continue + + if debug_mode and idx <= 3: # 只调试前3个事件 + print(f"\n[Event {event_id}] created_at={created_at}") + if not force_update and existing_avg is not None: + print( + f" 已有数据: avg={existing_avg:.2f}% max={existing_max:.2f}% week={existing_week:.2f}%") + + # 批量计算该事件所有股票的涨跌幅 + avg_change, max_change, week_change = calculate_stock_changes( + stock_codes, created_at, ch_client, debug=(debug_mode and idx <= 3) + ) + + # 收集更新数据 + if any(x is not None for x in (avg_change, max_change, week_change)): + update_data.append({ + "avg_chg": avg_change, + "max_chg": max_change, + "week_chg": week_change, + "event_id": event_id + }) + if idx <= 5: # 前5条显示详情 + print(f"[DEBUG] 事件 {event_id}: avg={avg_change}, max={max_change}, week={week_change}") + else: + if idx <= 5: + print(f"[DEBUG] 事件 {event_id}: 计算结果全为None,跳过") + + # 每处理10个事件打印一次进度 + if idx % 10 == 0: + print(f"Processed {idx}/{len(events)} events...") + + except Exception as e: + print(f"Error processing event {event[0]}: {str(e)}") + if debug_mode: + import traceback + traceback.print_exc() + continue + + # 批量更新MySQL + print(f"\n[DEBUG] ====== 准备写入数据库 ======") + print(f"[DEBUG] update_data 长度: {len(update_data)}") + if update_data: + print(f"[DEBUG] 前3条待更新数据: {update_data[:3]}") + print(f"[DEBUG] 执行 UPDATE 语句...") + + result = mysql_conn.execute(text(""" + UPDATE event + SET related_avg_chg = :avg_chg, + related_max_chg = :max_chg, + related_week_chg = :week_chg + WHERE id = :event_id + """), update_data) + print(f"[DEBUG] UPDATE 执行完成, rowcount={result.rowcount}") + + # 关键:显式提交事务!SQLAlchemy 2.0 需要手动 commit + print("[DEBUG] 准备提交事务 (commit)...") + mysql_conn.commit() + print("[DEBUG] 事务已提交!") + + print(f"Successfully updated {len(update_data)} events") + else: + print("[DEBUG] update_data 为空,没有数据需要更新!") + + except Exception as e: + print(f"Error in update_event_statistics: {str(e)}") + raise + + +def run_monitor(): + """运行监控循环 - 仅在交易时间段内每2分钟强制更新最近7天数据""" + print("=" * 60) + print("启动交易时段监控模式") + print("运行规则: 仅在交易日的9:30-11:30和13:00-15:00运行") + print("更新频率: 每2分钟一次") + print("更新模式: 强制更新(force_update=True)") + print("更新范围: 最近7天的事件数据") + print("=" * 60) + + while True: + try: + now = datetime.now() + + # 检查是否在交易时间内 + if is_trading_time(now): + seven_days_ago = now - timedelta(days=7) + + print(f"\n{'=' * 60}") + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] 交易时段 - 开始更新...") + print(f"{'=' * 60}") + + update_event_statistics( + start_date=seven_days_ago, + force_update=True, # 强制更新所有数据 + debug_mode=False + ) + + print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] 更新完成") + print(f"等待2分钟后执行下次更新...\n") + time.sleep(120) # 2分钟 + + else: + # 不在交易时间,计算下次交易时间 + next_trading_time = get_next_trading_time() + wait_seconds = (next_trading_time - now).total_seconds() + wait_minutes = int(wait_seconds / 60) + + print(f"\n{'=' * 60}") + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] 非交易时段") + print(f"下次交易时间: {next_trading_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"等待时长: {wait_minutes} 分钟") + print(f"{'=' * 60}\n") + + # 等待到下一个交易时段(每5分钟检查一次,避免程序僵死) + check_interval = 300 # 5分钟检查一次 + while not is_trading_time(): + time.sleep(min(check_interval, max(1, wait_seconds))) + wait_seconds = (get_next_trading_time() - datetime.now()).total_seconds() + if wait_seconds <= 0: + break + + except KeyboardInterrupt: + print("\n程序被用户中断") + break + except Exception as e: + print(f"Error in monitor loop: {str(e)}") + import traceback + traceback.print_exc() + print("等待1分钟后重试...") + time.sleep(60) # 发生错误等待1分钟后重试 + + +if __name__ == "__main__": + import sys + + # 支持命令行参数 + # python get_related_chg.py --test # 测试模式:只更新昨天和今天,开启调试 + # python get_related_chg.py --once # 单次强制更新最近7天 + # python get_related_chg.py # 正常运行:交易时段每2分钟强制更新 + + if len(sys.argv) > 1: + if sys.argv[1] == '--test': + # 测试模式:更新昨天和今天的数据,开启调试 + print("=" * 60) + print("测试模式:更新昨天和今天的数据") + print("=" * 60) + yesterday = (datetime.now() - timedelta(days=2)).replace(hour=15, minute=0, second=0) + tomorrow = datetime.now() + timedelta(days=1) + update_event_statistics( + start_date=yesterday, + end_date=tomorrow, + force_update=True, + debug_mode=True + ) + print("\n测试完成!") + + elif sys.argv[1] == '--once': + # 单次强制更新模式 + print("=" * 60) + print("单次强制更新模式:重新计算最近7天所有数据") + print("=" * 60) + seven_days_ago = datetime.now() - timedelta(days=7) + update_event_statistics( + start_date=seven_days_ago, + force_update=True, + debug_mode=False + ) + print("\n强制更新完成!") + else: + print("未知参数。支持的参数:") + print(" --test : 测试模式(更新昨天和今天,开启调试)") + print(" --once : 单次强制更新最近7天") + print(" (无参数): 交易时段监控模式(每2分钟强制更新)") + else: + # 正常监控模式:仅在交易时间段运行 + run_monitor() \ No newline at end of file diff --git a/gunicorn_eventlet_config.py b/gunicorn_eventlet_config.py index 3c07fe43..fa31790d 100644 --- a/gunicorn_eventlet_config.py +++ b/gunicorn_eventlet_config.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- """ -Gunicorn 配置文件 - Eventlet 极限高并发配置(110.42.32.207 专用) +Gunicorn 配置文件 - Eventlet 高并发配置(48核128GB 专用) 服务器配置: 48核心 128GB 内存 -目标并发: 160,000+ 并发连接 +目标并发: 5,000-10,000 实际并发(理论 320,000 连接) 使用方式: # 设置环境变量后启动 @@ -14,10 +14,12 @@ Gunicorn 配置文件 - Eventlet 极限高并发配置(110.42.32.207 专用) REDIS_HOST=127.0.0.1 gunicorn -c gunicorn_eventlet_config.py app:app 架构说明: - - 16 个 Eventlet Worker(每个占用 1 核心,预留 32 核给系统/Redis/MySQL) + - 32 个 Eventlet Worker(每个占用 1 核心,预留 16 核给系统/Redis/MySQL) - 每个 Worker 处理 10000+ 并发连接(协程异步 I/O) + - 数据库连接池: 32 workers × 150 = 4800 连接(实际瓶颈) - Redis 消息队列同步跨 Worker 的 WebSocket 消息 - - 总并发能力: 16 × 10000 = 160,000+ 连接 + - 理论并发能力: 32 × 10000 = 320,000 连接 + - 实际并发能力: 5,000-10,000(受数据库连接限制) """ import os @@ -32,9 +34,9 @@ os.environ.setdefault('REDIS_HOST', '127.0.0.1') bind = '0.0.0.0:5001' # Worker 进程数 -# 48 核心机器: 16 Workers(预留资源给 Redis/MySQL/系统) +# 48 核心机器: 32 Workers(目标 5000-10000 并发) # 每个 Eventlet Worker 是单线程但支持协程并发 -workers = 16 +workers = 32 # Worker 类型 - eventlet 异步模式 worker_class = 'eventlet' @@ -97,14 +99,17 @@ def on_starting(server): workers = server.app.cfg.workers connections = server.app.cfg.worker_connections total = workers * connections + db_pool = workers * 150 # pool_size=50 + max_overflow=100 print("=" * 70) - print("🚀 Gunicorn + Eventlet 极限高并发服务器正在启动...") + print("🚀 Gunicorn + Eventlet 高并发服务器正在启动...") print("=" * 70) print(f" 服务器配置: 48核心 128GB 内存") print(f" Workers: {workers} 个 Eventlet 协程进程") print(f" 每 Worker 连接数: {connections:,}") - print(f" 总并发能力: {total:,} 连接") + print(f" 理论并发能力: {total:,} 连接") + print(f" 数据库连接池: {db_pool:,} 连接(实际瓶颈)") + print(f" 目标实际并发: 5,000-10,000") print("-" * 70) print(f" Bind: {server.app.cfg.bind}") print(f" Max Requests: {server.app.cfg.max_requests:,}") @@ -122,18 +127,21 @@ def when_ready(server): workers = server.app.cfg.workers connections = server.app.cfg.worker_connections total = workers * connections + db_pool = workers * 150 print("=" * 70) print(f"✅ Gunicorn + Eventlet 服务准备就绪!") print(f" {workers} 个 Worker 已启动") - print(f" 总并发能力: {total:,} 连接") + print(f" 理论并发能力: {total:,} 连接") + print(f" 数据库连接池: {db_pool:,} 连接") + print(f" 目标实际并发: 5,000-10,000") print(f" WebSocket + HTTP API 混合高并发已启用") print("=" * 70) def post_worker_init(worker): """Worker 初始化完成后调用""" - print(f"✅ Eventlet Worker {worker.pid} 已初始化 (10,000 并发连接就绪)") + print(f"✅ Eventlet Worker {worker.pid} 已初始化 (10,000 并发连接 + 150 数据库连接就绪)") # 触发事件轮询初始化(使用 Redis 锁确保只有一个 Worker 启动调度器) try: diff --git a/src/components/EventDetailPanel/DynamicNewsDetailPanel.js b/src/components/EventDetailPanel/DynamicNewsDetailPanel.js index d6ceb26a..a2d16d1b 100644 --- a/src/components/EventDetailPanel/DynamicNewsDetailPanel.js +++ b/src/components/EventDetailPanel/DynamicNewsDetailPanel.js @@ -198,10 +198,6 @@ const DynamicNewsDetailPanel = ({ event, showHeader = true }) => { } }, [sectionState.stocks, stocks.length, refreshQuotes]); - // 相关概念 - 展开/收起(无需加载) - const handleConceptsToggle = useCallback(() => { - dispatchSection({ type: 'TOGGLE', section: 'concepts' }); - }, []); // 历史事件对比 - 数据已预加载,只需切换展开状态 const handleHistoricalToggle = useCallback(() => { @@ -350,13 +346,10 @@ const DynamicNewsDetailPanel = ({ event, showHeader = true }) => { )} - {/* 相关概念(可折叠) - 需要 PRO 权限 */} + {/* 相关概念(手风琴样式) - 需要 PRO 权限 */} : null} isLocked={!canAccessConcepts} onLockedClick={() => handleLockedClick('相关概念', 'pro')} diff --git a/src/components/EventDetailPanel/RelatedConceptsSection/DetailedConceptCard.js b/src/components/EventDetailPanel/RelatedConceptsSection/DetailedConceptCard.js index 24728d60..f75474e6 100644 --- a/src/components/EventDetailPanel/RelatedConceptsSection/DetailedConceptCard.js +++ b/src/components/EventDetailPanel/RelatedConceptsSection/DetailedConceptCard.js @@ -19,8 +19,9 @@ import ConceptStockItem from './ConceptStockItem'; /** * 详细概念卡片组件 * @param {Object} props - * @param {Object} props.concept - 概念对象(兼容 v1/v2 API) + * @param {Object} props.concept - 概念对象(兼容 v1/v2 API 和 related_concepts 表数据) * - concept: 概念名称 + * - reason: 关联原因(来自 related_concepts 表) * - stock_count: 相关股票数量 * - score: 相关度(0-1) * - price_info.avg_change_pct: 平均涨跌幅 @@ -34,6 +35,8 @@ const DetailedConceptCard = ({ concept, onClick }) => { const borderColor = useColorModeValue('gray.200', 'gray.600'); const headingColor = useColorModeValue('gray.700', 'gray.200'); const stockCountColor = useColorModeValue('gray.500', 'gray.400'); + const reasonBg = useColorModeValue('blue.50', 'blue.900'); + const reasonColor = useColorModeValue('gray.700', 'gray.200'); // 计算相关度百分比 const relevanceScore = Math.round((concept.score || 0) * 100); @@ -43,6 +46,9 @@ const DetailedConceptCard = ({ concept, onClick }) => { const changeColor = changePct > 0 ? 'red' : changePct < 0 ? 'green' : 'gray'; const changeSymbol = changePct > 0 ? '+' : ''; + // 判断是否来自数据库(有 reason 字段) + const isFromDatabase = !!concept.reason; + return ( { {concept.concept} - - 相关度: {relevanceScore}% - - - {concept.stock_count} 只股票 - + {/* 数据库数据显示"AI分析"标签,搜索数据显示相关度 */} + {isFromDatabase ? ( + + AI 分析 + + ) : ( + + 相关度: {relevanceScore}% + + )} + {/* 只有搜索数据才显示股票数量 */} + {!isFromDatabase && concept.stock_count > 0 && ( + + {concept.stock_count} 只股票 + + )} - {/* 右侧:涨跌幅 */} - {concept.price_info?.avg_change_pct && ( + {/* 右侧:涨跌幅(仅搜索数据有) */} + {!isFromDatabase && concept.price_info?.avg_change_pct && ( 平均涨跌幅 @@ -97,8 +113,30 @@ const DetailedConceptCard = ({ concept, onClick }) => { - {/* 概念描述 */} - {concept.description && ( + {/* 关联原因(来自数据库,突出显示) */} + {concept.reason && ( + + + 关联原因 + + + {concept.reason} + + + )} + + {/* 概念描述(仅搜索数据有,且没有 reason 时显示) */} + {!concept.reason && concept.description && ( { const changeColor = changePct !== null ? (changePct > 0 ? 'red' : changePct < 0 ? 'green' : 'gray') : null; const changeSymbol = changePct !== null && changePct > 0 ? '+' : ''; + // 判断是否来自数据库(有 reason 字段) + const isFromDatabase = !!concept.reason; + return ( { wordBreak="break-word" lineHeight="1.4" > - {concept.concept}{' '} - - ({concept.stock_count}) - + {concept.concept} + {/* 只有搜索数据才显示股票数量 */} + {!isFromDatabase && concept.stock_count > 0 && ( + + {' '}({concept.stock_count}) + + )} - {/* 第二行:相关度 + 涨跌幅 */} + {/* 第二行:标签 */} - {/* 相关度标签 */} - - - 相关度: {relevanceScore}% - - + {/* 数据库数据显示"AI分析",搜索数据显示相关度 */} + {isFromDatabase ? ( + + AI 分析 + + ) : ( + + + 相关度: {relevanceScore}% + + + )} - {/* 涨跌幅数据 */} - {changePct !== null && ( + {/* 涨跌幅数据(仅搜索数据有) */} + {!isFromDatabase && changePct !== null && ( { + const itemBg = useColorModeValue('white', 'gray.700'); + const itemHoverBg = useColorModeValue('gray.50', 'gray.650'); + const borderColor = useColorModeValue('gray.200', 'gray.600'); + const conceptColor = useColorModeValue('blue.600', 'blue.300'); + const reasonBg = useColorModeValue('blue.50', 'gray.800'); + const reasonColor = useColorModeValue('gray.700', 'gray.200'); + const iconColor = useColorModeValue('gray.500', 'gray.400'); + + return ( + + {/* 概念标题行 - 可点击展开 */} + + + + { + e.stopPropagation(); + onNavigate(concept); + }} + > + {concept.concept} + + + AI 分析 + + + + + {/* 关联原因 - 可折叠 */} + + + + {concept.reason || '暂无关联原因说明'} + + + + + ); +}; + /** * 相关概念区组件 * @param {Object} props - * @param {string} props.eventTitle - 事件标题(用于搜索概念) - * @param {string} props.effectiveTradingDate - 有效交易日期(涨跌幅数据日期) - * @param {string|Object} props.eventTime - 事件发生时间 + * @param {number} props.eventId - 事件ID(用于获取 related_concepts 表数据) + * @param {string} props.eventTitle - 事件标题(备用) * @param {React.ReactNode} props.subscriptionBadge - 订阅徽章组件(可选) - * @param {boolean} props.isLocked - 是否锁定详细模式(需要付费) - * @param {Function} props.onLockedClick - 锁定时的点击回调(触发付费弹窗) + * @param {boolean} props.isLocked - 是否锁定(需要付费) + * @param {Function} props.onLockedClick - 锁定时的点击回调 */ const RelatedConceptsSection = ({ + eventId, eventTitle, - effectiveTradingDate, - eventTime, subscriptionBadge = null, isLocked = false, onLockedClick = null, - isOpen = undefined, // 新增:受控模式(外部控制展开状态) - onToggle = undefined // 新增:受控模式(外部控制展开回调) }) => { - // 使用外部 isOpen,如果没有则使用内部 useState - const [internalExpanded, setInternalExpanded] = useState(false); - const isExpanded = onToggle !== undefined ? isOpen : internalExpanded; const [concepts, setConcepts] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); + // 记录每个概念的展开状态 + const [expandedItems, setExpandedItems] = useState({}); const navigate = useNavigate(); // 颜色配置 const sectionBg = useColorModeValue('gray.50', 'gray.750'); const headingColor = useColorModeValue('gray.700', 'gray.200'); const textColor = useColorModeValue('gray.600', 'gray.400'); + const countBadgeBg = useColorModeValue('blue.100', 'blue.800'); + const countBadgeColor = useColorModeValue('blue.700', 'blue.200'); - console.log('[RelatedConceptsSection] 组件渲染', { - eventTitle, - effectiveTradingDate, - eventTime, - loading, - conceptsCount: concepts?.length || 0, - error - }); - - // 搜索相关概念 + // 获取相关概念 useEffect(() => { - const searchConcepts = async () => { - console.log('[RelatedConceptsSection] useEffect 触发', { - eventTitle, - effectiveTradingDate - }); - - if (!eventTitle || !effectiveTradingDate) { - console.log('[RelatedConceptsSection] 缺少必要参数,跳过搜索', { - hasEventTitle: !!eventTitle, - hasEffectiveTradingDate: !!effectiveTradingDate - }); + const fetchConcepts = async () => { + if (!eventId) { setLoading(false); return; } @@ -86,178 +143,103 @@ const RelatedConceptsSection = ({ setLoading(true); setError(null); - // 格式化交易日期 - 统一使用 moment 处理 - let formattedTradeDate; - try { - // 不管传入的是什么格式,都用 moment 解析并格式化为 YYYY-MM-DD - formattedTradeDate = dayjs(effectiveTradingDate).format('YYYY-MM-DD'); - - // 验证日期是否有效 - if (!dayjs(formattedTradeDate, 'YYYY-MM-DD', true).isValid()) { - console.warn('[RelatedConceptsSection] 无效日期,使用当前日期'); - formattedTradeDate = dayjs().format('YYYY-MM-DD'); - } - } catch (error) { - console.warn('[RelatedConceptsSection] 日期格式化失败,使用当前日期', error); - formattedTradeDate = dayjs().format('YYYY-MM-DD'); - } - - const requestBody = { - query: eventTitle, - size: 5, - page: 1, - sort_by: "_score", - trade_date: formattedTradeDate - }; - - const apiUrl = `${getApiBase()}/concept-api/search`; - console.log('[RelatedConceptsSection] 发送请求', { - url: apiUrl, - requestBody - }); - logger.debug('RelatedConceptsSection', '搜索概念', requestBody); - + const apiUrl = `${getApiBase()}/api/events/${eventId}/concepts`; const response = await fetch(apiUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(requestBody) - }); - - console.log('[RelatedConceptsSection] 响应状态', { - ok: response.ok, - status: response.status, - statusText: response.statusText + method: 'GET', + headers: { 'Content-Type': 'application/json' }, + credentials: 'include' }); if (!response.ok) { + if (response.status === 403) { + setConcepts([]); + setLoading(false); + return; + } throw new Error(`HTTP error! status: ${response.status}`); } const data = await response.json(); - console.log('[RelatedConceptsSection] 响应数据', { - hasResults: !!data.results, - resultsCount: data.results?.length || 0, - hasDataConcepts: !!(data.data && data.data.concepts), - data: data - }); - logger.debug('RelatedConceptsSection', '概念搜索响应', { - hasResults: !!data.results, - resultsCount: data.results?.length || 0 - }); - - // 设置概念数据 - if (data.results && Array.isArray(data.results)) { - console.log('[RelatedConceptsSection] 设置概念数据 (results)', data.results); - setConcepts(data.results); - } else if (data.data && data.data.concepts) { - // 向后兼容 - console.log('[RelatedConceptsSection] 设置概念数据 (data.concepts)', data.data.concepts); - setConcepts(data.data.concepts); + if (data.success && Array.isArray(data.data)) { + setConcepts(data.data); + // 默认展开第一个 + if (data.data.length > 0) { + setExpandedItems({ 0: true }); + } } else { - console.log('[RelatedConceptsSection] 没有找到概念数据,设置为空数组'); setConcepts([]); } } catch (err) { - console.error('[RelatedConceptsSection] 搜索概念失败', err); - logger.error('RelatedConceptsSection', 'searchConcepts', err); + console.error('[RelatedConceptsSection] 获取概念失败', err); + logger.error('RelatedConceptsSection', 'fetchConcepts', err); setError('加载概念数据失败'); setConcepts([]); } finally { - console.log('[RelatedConceptsSection] 加载完成'); setLoading(false); } }; - searchConcepts(); - }, [eventTitle, effectiveTradingDate]); + fetchConcepts(); + }, [eventId]); + + // 切换某个概念的展开状态 + const toggleItem = (index) => { + if (isLocked && onLockedClick) { + onLockedClick(); + return; + } + setExpandedItems(prev => ({ + ...prev, + [index]: !prev[index] + })); + }; + + // 跳转到概念中心 + const handleNavigate = (concept) => { + navigate(`/concepts?q=${encodeURIComponent(concept.concept)}`); + }; // 加载中状态 if (loading) { return (
- + 加载相关概念中...
); } - // 判断是否有数据 const hasNoConcepts = !concepts || concepts.length === 0; - /** - * 根据相关度获取颜色(浅色背景 + 深色文字) - * @param {number} relevance - 相关度(0-100) - * @returns {Object} 包含背景色和文字色 - */ - const getRelevanceColor = (relevance) => { - if (relevance >= 90) { - return { bg: 'purple.50', color: 'purple.800' }; // 极高相关 - } else if (relevance >= 80) { - return { bg: 'pink.50', color: 'pink.800' }; // 高相关 - } else if (relevance >= 70) { - return { bg: 'orange.50', color: 'orange.800' }; // 中等相关 - } else { - return { bg: 'gray.100', color: 'gray.700' }; // 低相关 - } - }; - - /** - * 处理概念点击 - * @param {Object} concept - 概念对象 - */ - const handleConceptClick = (concept) => { - // 跳转到概念中心,并搜索该概念 - navigate(`/concepts?q=${encodeURIComponent(concept.concept)}`); - }; - return ( - {/* 标题栏 - 两行布局 */} - - {/* 第一行:标题 + Badge + 按钮 */} - - - - 相关概念 - - {/* 订阅徽章 */} - {subscriptionBadge} - - - - {/* 第二行:交易日期信息 */} - - + {/* 标题栏 */} + + + + 相关概念 + + {!hasNoConcepts && ( + + {concepts.length} + + )} + {subscriptionBadge} + + - {/* 简单模式:横向卡片列表(总是显示) */} + {/* 概念列表 - 手风琴样式 */} {hasNoConcepts ? ( - + {error ? ( {error} ) : ( @@ -265,41 +247,18 @@ const RelatedConceptsSection = ({ )} ) : ( - + {concepts.map((concept, index) => ( - toggleItem(index)} + onNavigate={handleNavigate} /> ))} - + )} - - {/* 详细模式:卡片网格(可折叠) */} - - {hasNoConcepts ? ( - - {error ? ( - {error} - ) : ( - 暂无详细数据 - )} - - ) : ( - /* 详细概念卡片网格 */ - - {concepts.map((concept, index) => ( - - ))} - - )} - ); };