#!/usr/bin/env python3 """ 概念涨跌幅数据导出脚本 从 MySQL 导出最新的热门概念数据到静态 JSON 文件 使用方法: python export_concept_data.py # 导出最新数据 python export_concept_data.py --limit 100 # 限制导出数量 输出:public/data/concept/latest.json """ import os import json import argparse import pymysql from datetime import datetime import logging # 配置 MYSQL_CONFIG = { 'host': '192.168.1.5', 'port': 3306, 'user': 'root', 'password': 'Zzl5588161!', 'db': 'stock', 'charset': 'utf8mb4', } # 输出文件路径 OUTPUT_FILE = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'public', 'data', 'concept', 'latest.json' ) # 层级结构文件 HIERARCHY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'concept_hierarchy_v3.json') # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 层级映射缓存 concept_to_hierarchy = {} def load_hierarchy(): """加载层级结构""" global concept_to_hierarchy if not os.path.exists(HIERARCHY_FILE): logger.warning(f"层级文件不存在: {HIERARCHY_FILE}") return try: with open(HIERARCHY_FILE, 'r', encoding='utf-8') as f: hierarchy_data = json.load(f) for lv1 in hierarchy_data.get('hierarchy', []): lv1_name = lv1.get('lv1', '') lv1_id = lv1.get('lv1_id', '') for child in lv1.get('children', []): lv2_name = child.get('lv2', '') lv2_id = child.get('lv2_id', '') if 'children' in child: for lv3_child in child.get('children', []): lv3_name = lv3_child.get('lv3', '') lv3_id = lv3_child.get('lv3_id', '') for concept in lv3_child.get('concepts', []): concept_to_hierarchy[concept] = { 'lv1': lv1_name, 'lv1_id': lv1_id, 'lv2': lv2_name, 'lv2_id': lv2_id, 'lv3': lv3_name, 'lv3_id': lv3_id } else: for concept in child.get('concepts', []): concept_to_hierarchy[concept] = { 'lv1': lv1_name, 'lv1_id': lv1_id, 'lv2': lv2_name, 'lv2_id': lv2_id, 'lv3': None, 'lv3_id': None } logger.info(f"加载层级结构完成,共 {len(concept_to_hierarchy)} 个概念") except Exception as e: logger.error(f"加载层级结构失败: {e}") def get_connection(): """获取数据库连接""" return pymysql.connect(**MYSQL_CONFIG) def export_latest(limit=100): """导出最新的热门概念数据""" conn = get_connection() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # 获取最新交易日期 cursor.execute(""" SELECT MAX(trade_date) as max_date FROM concept_daily_stats WHERE concept_type = 'leaf' """) result = cursor.fetchone() if not result or not result['max_date']: logger.error("无可用数据") return None trade_date = result['max_date'] logger.info(f"最新交易日期: {trade_date}") # 按涨跌幅降序获取概念列表 cursor.execute(""" SELECT concept_id, concept_name, concept_type, trade_date, avg_change_pct, stock_count FROM concept_daily_stats WHERE trade_date = %s AND concept_type = 'leaf' ORDER BY avg_change_pct DESC LIMIT %s """, (trade_date, limit)) rows = cursor.fetchall() concepts = [] for row in rows: concept_name = row['concept_name'] hierarchy = concept_to_hierarchy.get(concept_name) concepts.append({ 'concept_id': row['concept_id'], 'concept': concept_name, 'price_info': { 'trade_date': row['trade_date'].strftime('%Y-%m-%d'), 'avg_change_pct': float(row['avg_change_pct']) if row['avg_change_pct'] else None }, 'stock_count': row['stock_count'], 'hierarchy': hierarchy }) data = { 'trade_date': trade_date.strftime('%Y-%m-%d'), 'total': len(concepts), 'results': concepts, 'updated_at': datetime.now().isoformat() } # 确保目录存在 os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True) # 保存文件 with open(OUTPUT_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"已保存: {OUTPUT_FILE} ({len(concepts)} 个概念)") return data finally: conn.close() def main(): parser = argparse.ArgumentParser(description='导出热门概念涨跌幅数据') parser.add_argument('--limit', type=int, default=100, help='导出的概念数量限制') args = parser.parse_args() load_hierarchy() export_latest(args.limit) logger.info("导出完成!") if __name__ == '__main__': main()