From d25c77353a06a4b8a504846f918eefc8d9370f20 Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Thu, 18 Dec 2025 14:20:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0Company=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E7=9A=84UI=E4=B8=BAFUI=E9=A3=8E=E6=A0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- get_related_chg.py | 462 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 get_related_chg.py diff --git a/get_related_chg.py b/get_related_chg.py new file mode 100644 index 00000000..275acc39 --- /dev/null +++ b/get_related_chg.py @@ -0,0 +1,462 @@ +from clickhouse_driver import Client as Cclient +from sqlalchemy import create_engine, text +from datetime import datetime, time as dt_time, timedelta +import time +import pandas as pd +import os + +# 读取交易日数据 +script_dir = os.path.dirname(os.path.abspath(__file__)) +TRADING_DAYS_FILE = os.path.join(script_dir, 'tdays.csv') +trading_days_df = pd.read_csv(TRADING_DAYS_FILE) +trading_days_df['DateTime'] = pd.to_datetime(trading_days_df['DateTime']).dt.date +TRADING_DAYS = sorted(trading_days_df['DateTime'].tolist()) # 排序后的交易日列表 + + +def get_clickhouse_client(): + return Cclient( + host='127.0.0.1', + port=9000, + user='default', + password='Zzl33818!', + database='stock' + ) + + +def get_mysql_engine(): + return create_engine( + "mysql+pymysql://root:Zzl33818!@127.0.0.1:3306/stock", + echo=False + ) + + +def is_trading_time(check_datetime=None): + """判断是否在交易时间内 + + Args: + check_datetime: 要检查的时间,默认为当前时间 + + Returns: + bool: True表示在交易时间内 + """ + if check_datetime is None: + check_datetime = datetime.now() + + # 检查是否是交易日 + check_date = check_datetime.date() + if check_date not in TRADING_DAYS: + return False + + # 检查是否在交易时段内 + check_time = check_datetime.time() + + # 上午时段: 9:30 - 11:30 + morning_start = dt_time(9, 30) + morning_end = dt_time(11, 30) + + # 下午时段: 13:00 - 15:00 + afternoon_start = dt_time(13, 0) + afternoon_end = dt_time(15, 0) + + is_morning = morning_start <= check_time <= morning_end + is_afternoon = afternoon_start <= check_time <= afternoon_end + + return is_morning or is_afternoon + + +def get_next_trading_time(): + """获取下一个交易时段的开始时间""" + now = datetime.now() + current_date = now.date() + current_time = now.time() + + # 如果今天是交易日 + if current_date in TRADING_DAYS: + morning_start = dt_time(9, 30) + afternoon_start = dt_time(13, 0) + + # 如果还没到上午开盘 + if current_time < morning_start: + return datetime.combine(current_date, morning_start) + # 如果在上午休市后,下午还没开盘 + elif dt_time(11, 30) < current_time < afternoon_start: + return datetime.combine(current_date, afternoon_start) + + # 否则找下一个交易日的上午开盘时间 + for td in TRADING_DAYS: + if td > current_date: + return datetime.combine(td, dt_time(9, 30)) + + # 如果没有找到未来交易日,返回明天上午9:30(可能需要更新交易日数据) + return datetime.combine(current_date + timedelta(days=1), dt_time(9, 30)) + + +def get_next_trading_day(date): + """获取下一个交易日""" + for td in TRADING_DAYS: + if td > date: + return td + return None + + +def get_nth_trading_day_after(start_date, n=7): + """获取start_date之后的第n个交易日""" + try: + start_idx = TRADING_DAYS.index(start_date) + target_idx = start_idx + n + if target_idx < len(TRADING_DAYS): + return TRADING_DAYS[target_idx] + except (ValueError, IndexError): + pass + + # 如果start_date不在交易日列表中,找到它之后的交易日 + future_days = [d for d in TRADING_DAYS if d > start_date] + if len(future_days) >= n: + return future_days[n - 1] + elif future_days: + return future_days[-1] # 返回最后一个可用的交易日 + + return None + + +def get_trading_day_info(event_datetime): + """获取事件对应的交易日信息""" + event_date = event_datetime.date() + market_close = dt_time(15, 0) + + # 如果是交易日且在收盘前,使用当天 + if event_date in TRADING_DAYS and event_datetime.time() <= market_close: + return event_date + + # 否则使用下一个交易日 + return get_next_trading_day(event_date) + + +def calculate_stock_changes(stock_codes, event_datetime, ch_client, debug=False): + """批量计算一个事件关联的所有股票涨跌幅""" + + if not stock_codes: + return None, None, None + + event_date = event_datetime.date() + event_time = event_datetime.time() + market_open = dt_time(9, 30) + market_close = dt_time(15, 0) + + # 确定起始时间点(事件发生后的第一个有效价格点) + if event_date in TRADING_DAYS and market_open <= event_time <= market_close: + # 事件在交易时间内发生 → 用事件发生时的价格作为起点 + start_datetime = event_datetime + trading_date = event_date + end_datetime = datetime.combine(trading_date, market_close) + if debug: + print(f" 事件在交易时间内: {event_datetime} -> 起点={start_datetime}") + else: + # 事件在交易时间外发生 → 用下一个交易日开盘价作为起点 + trading_date = get_trading_day_info(event_datetime) + if not trading_date: + if debug: + print(f" 找不到交易日: {event_datetime}") + return None, None, None + start_datetime = datetime.combine(trading_date, market_open) + end_datetime = datetime.combine(trading_date, market_close) + if debug: + print(f" 事件在非交易时间: {event_datetime} -> 下一交易日={trading_date}, 起点={start_datetime}") + + # 获取7个交易日后的日期 + week_trading_date = get_nth_trading_day_after(trading_date, 7) + if not week_trading_date: + # 降级:如果没有足够的未来交易日,就用当前能找到的最远日期 + week_trading_date = trading_date + timedelta(days=10) + + week_end_datetime = datetime.combine(week_trading_date, market_close) + + if debug: + print(f" 查询范围: {start_datetime} -> 当日={end_datetime}, 周末={week_end_datetime}") + print(f" 股票代码: {stock_codes}") + + # 一次性查询所有股票的价格数据 + results = ch_client.execute(""" + SELECT code, + -- 起始价格:事件发生时或之后的第一个价格 + argMin(close, timestamp) as start_price, + -- 当日收盘价:当日交易结束时的最后一个价格 + argMax( + close, if(timestamp <= %(end)s, timestamp, toDateTime('1970-01-01')) + ) as day_close_price, + -- 周后收盘价:7个交易日后的收盘价 + argMax( + close, if(timestamp <= %(week_end)s, timestamp, toDateTime('1970-01-01')) + ) as week_close_price + FROM stock_minute + WHERE code IN %(codes)s + AND timestamp >= %(start)s + AND timestamp <= %(week_end)s + GROUP BY code + HAVING start_price > 0 + """, { + 'codes': tuple(stock_codes), + 'start': start_datetime, + 'end': end_datetime, + 'week_end': week_end_datetime + }) + + if debug: + print(f" 查询到 {len(results)} 只股票的数据") + + if not results: + return None, None, None + + # 计算涨跌幅 + day_changes = [] + week_changes = [] + + for code, start_price, day_close, week_close in results: + if start_price and start_price > 0: + # 当日涨跌幅(从事件发生到当日收盘) + if day_close and day_close > 0: + day_change = (day_close - start_price) / start_price * 100 + day_changes.append(day_change) + + # 周度涨跌幅(从事件发生到第7个交易日收盘) + if week_close and week_close > 0: + week_change = (week_close - start_price) / start_price * 100 + week_changes.append(week_change) + + # 计算统计值 + avg_change = sum(day_changes) / len(day_changes) if day_changes else None + max_change = max(day_changes) if day_changes else None + avg_week_change = sum(week_changes) / len(week_changes) if week_changes else None + + if debug: + print( + f" 结果: 日均={avg_change:.2f}% 日最大={max_change:.2f}% 周均={avg_week_change:.2f}%" if avg_change else " 结果: 无有效数据") + + return avg_change, max_change, avg_week_change + + +def update_event_statistics(start_date=None, end_date=None, force_update=False, debug_mode=False): + """更新事件统计数据 + + Args: + start_date: 开始日期 + end_date: 结束日期 + force_update: 是否强制更新(忽略已有数据) + debug_mode: 是否开启调试模式 + """ + try: + mysql_engine = get_mysql_engine() + ch_client = get_clickhouse_client() + + with mysql_engine.connect() as mysql_conn: + # 构建SQL查询 + query = """ + SELECT e.id, \ + e.created_at, \ + GROUP_CONCAT(rs.stock_code) as stock_codes, + e.related_avg_chg, \ + e.related_max_chg, \ + e.related_week_chg + FROM event e + JOIN related_stock rs ON e.id = rs.event_id \ + """ + + conditions = [] + params = {} + + if start_date: + conditions.append("e.created_at >= :start_date") + params["start_date"] = start_date + + if end_date: + conditions.append("e.created_at <= :end_date") + params["end_date"] = end_date + + if not force_update: + # 只更新没有数据的记录 + conditions.append("(e.related_avg_chg IS NULL OR e.related_max_chg IS NULL)") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + query += """ + GROUP BY e.id, e.created_at, e.related_avg_chg, e.related_max_chg, e.related_week_chg + ORDER BY e.created_at DESC + """ + + events = mysql_conn.execute(text(query), params).fetchall() + + print(f"Found {len(events)} events to update (force_update={force_update})") + if debug_mode and len(events) > 0: + print(f"Date range: {events[-1][1]} to {events[0][1]}") + + # 准备批量更新数据 + update_data = [] + + for idx, event in enumerate(events, 1): + try: + event_id = event[0] + created_at = event[1] + stock_codes = event[2].split(',') if event[2] else [] + existing_avg = event[3] + existing_max = event[4] + existing_week = event[5] + + if not stock_codes: + continue + + if debug_mode and idx <= 3: # 只调试前3个事件 + print(f"\n[Event {event_id}] created_at={created_at}") + if not force_update and existing_avg is not None: + print( + f" 已有数据: avg={existing_avg:.2f}% max={existing_max:.2f}% week={existing_week:.2f}%") + + # 批量计算该事件所有股票的涨跌幅 + avg_change, max_change, week_change = calculate_stock_changes( + stock_codes, created_at, ch_client, debug=(debug_mode and idx <= 3) + ) + + # 收集更新数据 + if any(x is not None for x in (avg_change, max_change, week_change)): + update_data.append({ + "avg_chg": avg_change, + "max_chg": max_change, + "week_chg": week_change, + "event_id": event_id + }) + + # 每处理10个事件打印一次进度 + if idx % 10 == 0: + print(f"Processed {idx}/{len(events)} events...") + + except Exception as e: + print(f"Error processing event {event[0]}: {str(e)}") + if debug_mode: + import traceback + traceback.print_exc() + continue + + # 批量更新MySQL + if update_data: + mysql_conn.execute(text(""" + UPDATE event + SET related_avg_chg = :avg_chg, + related_max_chg = :max_chg, + related_week_chg = :week_chg + WHERE id = :event_id + """), update_data) + # SQLAlchemy 1.4 的 connect() 会在上下文管理器退出时自动提交 + print(f"Successfully updated {len(update_data)} events") + + except Exception as e: + print(f"Error in update_event_statistics: {str(e)}") + raise + + +def run_monitor(): + """运行监控循环 - 仅在交易时间段内每2分钟强制更新最近7天数据""" + print("=" * 60) + print("启动交易时段监控模式") + print("运行规则: 仅在交易日的9:30-11:30和13:00-15:00运行") + print("更新频率: 每2分钟一次") + print("更新模式: 强制更新(force_update=True)") + print("更新范围: 最近7天的事件数据") + print("=" * 60) + + while True: + try: + now = datetime.now() + + # 检查是否在交易时间内 + if is_trading_time(now): + seven_days_ago = now - timedelta(days=7) + + print(f"\n{'=' * 60}") + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] 交易时段 - 开始更新...") + print(f"{'=' * 60}") + + update_event_statistics( + start_date=seven_days_ago, + force_update=True, # 强制更新所有数据 + debug_mode=False + ) + + print(f"\n[{now.strftime('%Y-%m-%d %H:%M:%S')}] 更新完成") + print(f"等待2分钟后执行下次更新...\n") + time.sleep(120) # 2分钟 + + else: + # 不在交易时间,计算下次交易时间 + next_trading_time = get_next_trading_time() + wait_seconds = (next_trading_time - now).total_seconds() + wait_minutes = int(wait_seconds / 60) + + print(f"\n{'=' * 60}") + print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] 非交易时段") + print(f"下次交易时间: {next_trading_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"等待时长: {wait_minutes} 分钟") + print(f"{'=' * 60}\n") + + # 等待到下一个交易时段(每5分钟检查一次,避免程序僵死) + check_interval = 300 # 5分钟检查一次 + while not is_trading_time(): + time.sleep(min(check_interval, max(1, wait_seconds))) + wait_seconds = (get_next_trading_time() - datetime.now()).total_seconds() + if wait_seconds <= 0: + break + + except KeyboardInterrupt: + print("\n程序被用户中断") + break + except Exception as e: + print(f"Error in monitor loop: {str(e)}") + import traceback + traceback.print_exc() + print("等待1分钟后重试...") + time.sleep(60) # 发生错误等待1分钟后重试 + + +if __name__ == "__main__": + import sys + + # 支持命令行参数 + # python get_related_chg.py --test # 测试模式:只更新昨天和今天,开启调试 + # python get_related_chg.py --once # 单次强制更新最近7天 + # python get_related_chg.py # 正常运行:交易时段每2分钟强制更新 + + if len(sys.argv) > 1: + if sys.argv[1] == '--test': + # 测试模式:更新昨天和今天的数据,开启调试 + print("=" * 60) + print("测试模式:更新昨天和今天的数据") + print("=" * 60) + yesterday = (datetime.now() - timedelta(days=2)).replace(hour=15, minute=0, second=0) + tomorrow = datetime.now() + timedelta(days=1) + update_event_statistics( + start_date=yesterday, + end_date=tomorrow, + force_update=True, + debug_mode=True + ) + print("\n测试完成!") + + elif sys.argv[1] == '--once': + # 单次强制更新模式 + print("=" * 60) + print("单次强制更新模式:重新计算最近7天所有数据") + print("=" * 60) + seven_days_ago = datetime.now() - timedelta(days=7) + update_event_statistics( + start_date=seven_days_ago, + force_update=True, + debug_mode=False + ) + print("\n强制更新完成!") + else: + print("未知参数。支持的参数:") + print(" --test : 测试模式(更新昨天和今天,开启调试)") + print(" --once : 单次强制更新最近7天") + print(" (无参数): 交易时段监控模式(每2分钟强制更新)") + else: + # 正常监控模式:仅在交易时间段运行 + run_monitor() \ No newline at end of file