190 lines
5.8 KiB
Python
190 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
概念涨跌幅数据导出脚本
|
||
从 MySQL 导出最新的热门概念数据到静态 JSON 文件
|
||
|
||
使用方法:
|
||
python export_concept_data.py # 导出最新数据
|
||
python export_concept_data.py --limit 100 # 限制导出数量
|
||
|
||
输出:public/data/concept/latest.json
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import argparse
|
||
import pymysql
|
||
from datetime import datetime
|
||
import logging
|
||
|
||
# 配置
|
||
MYSQL_CONFIG = {
|
||
'host': '192.168.1.5',
|
||
'port': 3306,
|
||
'user': 'root',
|
||
'password': 'Zzl5588161!',
|
||
'db': 'stock',
|
||
'charset': 'utf8mb4',
|
||
}
|
||
|
||
# 输出文件路径
|
||
OUTPUT_FILE = os.path.join(
|
||
os.path.dirname(os.path.abspath(__file__)),
|
||
'public', 'data', 'concept', 'latest.json'
|
||
)
|
||
|
||
# 层级结构文件
|
||
HIERARCHY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'concept_hierarchy_v3.json')
|
||
|
||
# 日志配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 层级映射缓存
|
||
concept_to_hierarchy = {}
|
||
|
||
|
||
def load_hierarchy():
|
||
"""加载层级结构"""
|
||
global concept_to_hierarchy
|
||
|
||
if not os.path.exists(HIERARCHY_FILE):
|
||
logger.warning(f"层级文件不存在: {HIERARCHY_FILE}")
|
||
return
|
||
|
||
try:
|
||
with open(HIERARCHY_FILE, 'r', encoding='utf-8') as f:
|
||
hierarchy_data = json.load(f)
|
||
|
||
for lv1 in hierarchy_data.get('hierarchy', []):
|
||
lv1_name = lv1.get('lv1', '')
|
||
lv1_id = lv1.get('lv1_id', '')
|
||
|
||
for child in lv1.get('children', []):
|
||
lv2_name = child.get('lv2', '')
|
||
lv2_id = child.get('lv2_id', '')
|
||
|
||
if 'children' in child:
|
||
for lv3_child in child.get('children', []):
|
||
lv3_name = lv3_child.get('lv3', '')
|
||
lv3_id = lv3_child.get('lv3_id', '')
|
||
|
||
for concept in lv3_child.get('concepts', []):
|
||
concept_to_hierarchy[concept] = {
|
||
'lv1': lv1_name,
|
||
'lv1_id': lv1_id,
|
||
'lv2': lv2_name,
|
||
'lv2_id': lv2_id,
|
||
'lv3': lv3_name,
|
||
'lv3_id': lv3_id
|
||
}
|
||
else:
|
||
for concept in child.get('concepts', []):
|
||
concept_to_hierarchy[concept] = {
|
||
'lv1': lv1_name,
|
||
'lv1_id': lv1_id,
|
||
'lv2': lv2_name,
|
||
'lv2_id': lv2_id,
|
||
'lv3': None,
|
||
'lv3_id': None
|
||
}
|
||
|
||
logger.info(f"加载层级结构完成,共 {len(concept_to_hierarchy)} 个概念")
|
||
|
||
except Exception as e:
|
||
logger.error(f"加载层级结构失败: {e}")
|
||
|
||
|
||
def get_connection():
|
||
"""获取数据库连接"""
|
||
return pymysql.connect(**MYSQL_CONFIG)
|
||
|
||
|
||
def export_latest(limit=100):
|
||
"""导出最新的热门概念数据"""
|
||
conn = get_connection()
|
||
try:
|
||
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 获取最新交易日期
|
||
cursor.execute("""
|
||
SELECT MAX(trade_date) as max_date
|
||
FROM concept_daily_stats
|
||
WHERE concept_type = 'leaf'
|
||
""")
|
||
result = cursor.fetchone()
|
||
if not result or not result['max_date']:
|
||
logger.error("无可用数据")
|
||
return None
|
||
|
||
trade_date = result['max_date']
|
||
logger.info(f"最新交易日期: {trade_date}")
|
||
|
||
# 按涨跌幅降序获取概念列表
|
||
cursor.execute("""
|
||
SELECT
|
||
concept_id,
|
||
concept_name,
|
||
concept_type,
|
||
trade_date,
|
||
avg_change_pct,
|
||
stock_count
|
||
FROM concept_daily_stats
|
||
WHERE trade_date = %s AND concept_type = 'leaf'
|
||
ORDER BY avg_change_pct DESC
|
||
LIMIT %s
|
||
""", (trade_date, limit))
|
||
rows = cursor.fetchall()
|
||
|
||
concepts = []
|
||
for row in rows:
|
||
concept_name = row['concept_name']
|
||
hierarchy = concept_to_hierarchy.get(concept_name)
|
||
|
||
concepts.append({
|
||
'concept_id': row['concept_id'],
|
||
'concept': concept_name,
|
||
'price_info': {
|
||
'trade_date': row['trade_date'].strftime('%Y-%m-%d'),
|
||
'avg_change_pct': float(row['avg_change_pct']) if row['avg_change_pct'] else None
|
||
},
|
||
'stock_count': row['stock_count'],
|
||
'hierarchy': hierarchy
|
||
})
|
||
|
||
data = {
|
||
'trade_date': trade_date.strftime('%Y-%m-%d'),
|
||
'total': len(concepts),
|
||
'results': concepts,
|
||
'updated_at': datetime.now().isoformat()
|
||
}
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True)
|
||
|
||
# 保存文件
|
||
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"已保存: {OUTPUT_FILE} ({len(concepts)} 个概念)")
|
||
return data
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='导出热门概念涨跌幅数据')
|
||
parser.add_argument('--limit', type=int, default=100, help='导出的概念数量限制')
|
||
args = parser.parse_args()
|
||
|
||
load_hierarchy()
|
||
export_latest(args.limit)
|
||
logger.info("导出完成!")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|