Files
vf_react/export_zt_data.py
2025-12-14 16:06:06 +08:00

343 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
涨停分析数据导出脚本
从 Elasticsearch 导出数据到静态 JSON 文件,供前端直接读取
使用方法:
python export_zt_data.py # 导出最近 30 天数据
python export_zt_data.py --days 7 # 导出最近 7 天
python export_zt_data.py --date 20251212 # 导出指定日期
python export_zt_data.py --all # 导出所有数据
输出目录data/zt/
├── dates.json # 可用日期列表
├── daily/
│ └── {date}.json # 每日分析数据
└── stocks.jsonl # 所有股票记录(用于关键词搜索)
"""
import os
import json
import argparse
from datetime import datetime, timedelta
from collections import defaultdict
from elasticsearch import Elasticsearch
import logging
# 配置
ES_HOST = os.environ.get('ES_HOST', 'http://127.0.0.1:9200')
# 输出到 public 目录,这样前端可以直接访问
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'public', 'data', 'zt')
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ES 连接
es = Elasticsearch([ES_HOST], timeout=60, retry_on_timeout=True, max_retries=3)
def ensure_dirs():
"""确保输出目录存在"""
os.makedirs(os.path.join(OUTPUT_DIR, 'daily'), exist_ok=True)
logger.info(f"输出目录: {OUTPUT_DIR}")
def get_available_dates():
"""获取所有可用日期"""
query = {
"size": 0,
"aggs": {
"dates": {
"terms": {
"field": "date",
"size": 10000,
"order": {"_key": "desc"}
},
"aggs": {
"stock_count": {
"cardinality": {"field": "scode"}
}
}
}
}
}
result = es.search(index="zt_stocks", body=query)
dates = []
for bucket in result['aggregations']['dates']['buckets']:
date = bucket['key']
count = bucket['doc_count']
# 格式化日期 YYYYMMDD -> YYYY-MM-DD
formatted = f"{date[:4]}-{date[4:6]}-{date[6:]}"
dates.append({
'date': date,
'formatted_date': formatted,
'count': count
})
return dates
def get_daily_stats(date):
"""获取指定日期的统计数据"""
query = {
"query": {"term": {"date": date}},
"_source": ["sector_stats", "word_freq", "chart_data"]
}
result = es.search(index="zt_daily_stats", body=query, size=1)
if result['hits']['total']['value'] > 0:
return result['hits']['hits'][0]['_source']
return {}
def get_daily_stocks(date):
"""获取指定日期的所有股票"""
query = {
"query": {"term": {"date": date}},
"size": 10000,
"sort": [{"zt_time": "asc"}],
"_source": {
"exclude": ["content_embedding"] # 排除向量字段
}
}
result = es.search(index="zt_stocks", body=query)
stocks = []
for hit in result['hits']['hits']:
stock = hit['_source']
# 格式化涨停时间
if 'zt_time' in stock:
try:
zt_time = datetime.fromisoformat(stock['zt_time'].replace('Z', '+00:00'))
stock['formatted_time'] = zt_time.strftime('%H:%M:%S')
except:
stock['formatted_time'] = ''
stocks.append(stock)
return stocks
def process_sector_data(sector_stats, stocks):
"""处理板块数据"""
if sector_stats:
# 从预计算的 sector_stats 生成
sector_data = {}
for sector_info in sector_stats:
sector_name = sector_info['sector_name']
sector_data[sector_name] = {
'count': sector_info['count'],
'stock_codes': sector_info.get('stock_codes', [])
}
else:
# 从股票数据生成
sector_stocks = defaultdict(list)
sector_counts = defaultdict(int)
for stock in stocks:
for sector in stock.get('core_sectors', []):
sector_counts[sector] += 1
small_sectors = {s for s, c in sector_counts.items() if c < 2}
for stock in stocks:
scode = stock.get('scode', '')
valid_sectors = [s for s in stock.get('core_sectors', []) if s not in small_sectors]
if valid_sectors:
for sector in valid_sectors:
sector_stocks[sector].append(scode)
else:
sector_stocks['其他'].append(scode)
sector_data = {
sector: {'count': len(codes), 'stock_codes': codes}
for sector, codes in sector_stocks.items()
}
# 排序:公告优先,然后按数量降序,其他放最后
sorted_items = []
announcement = sector_data.pop('公告', None)
other = sector_data.pop('其他', None)
normal_items = sorted(sector_data.items(), key=lambda x: -x[1]['count'])
if announcement:
sorted_items.append(('公告', announcement))
sorted_items.extend(normal_items)
if other:
sorted_items.append(('其他', other))
return dict(sorted_items)
def calculate_sector_relations_top10(stocks):
"""计算板块关联 TOP10"""
relations = defaultdict(int)
stock_sectors = defaultdict(set)
for stock in stocks:
scode = stock['scode']
for sector in stock.get('core_sectors', []):
stock_sectors[scode].add(sector)
for scode, sectors in stock_sectors.items():
sector_list = list(sectors)
for i in range(len(sector_list)):
for j in range(i + 1, len(sector_list)):
pair = tuple(sorted([sector_list[i], sector_list[j]]))
relations[pair] += 1
sorted_relations = sorted(relations.items(), key=lambda x: -x[1])[:10]
return {
'labels': [f"{p[0]} - {p[1]}" for p, _ in sorted_relations],
'counts': [c for _, c in sorted_relations]
}
def export_daily_analysis(date):
"""导出单日分析数据"""
logger.info(f"导出日期: {date}")
# 获取数据
stats = get_daily_stats(date)
stocks = get_daily_stocks(date)
if not stocks:
logger.warning(f"日期 {date} 无数据")
return None
# 处理板块数据
sector_data = process_sector_data(stats.get('sector_stats', []), stocks)
# 计算板块关联
sector_relations = calculate_sector_relations_top10(stocks)
# 生成图表数据
chart_data = stats.get('chart_data', {
'labels': [s for s in sector_data.keys() if s not in ['其他', '公告']],
'counts': [d['count'] for s, d in sector_data.items() if s not in ['其他', '公告']]
})
# 组装分析数据
analysis = {
'date': date,
'formatted_date': f"{date[:4]}-{date[4:6]}-{date[6:]}",
'total_stocks': len(stocks),
'sector_data': sector_data,
'chart_data': chart_data,
'word_freq_data': stats.get('word_freq', []),
'sector_relations_top10': sector_relations,
'stocks': stocks # 包含完整股票列表
}
# 保存文件
output_path = os.path.join(OUTPUT_DIR, 'daily', f'{date}.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(analysis, f, ensure_ascii=False, indent=2)
logger.info(f"已保存: {output_path} ({len(stocks)} 只股票)")
return analysis
def export_dates_index(dates):
"""导出日期索引"""
output_path = os.path.join(OUTPUT_DIR, 'dates.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump({
'dates': dates,
'total': len(dates),
'updated_at': datetime.now().isoformat()
}, f, ensure_ascii=False, indent=2)
logger.info(f"已保存日期索引: {output_path} ({len(dates)} 个日期)")
def export_stocks_for_search(dates_to_export):
"""导出股票数据用于搜索JSONL 格式)"""
output_path = os.path.join(OUTPUT_DIR, 'stocks.jsonl')
total_count = 0
with open(output_path, 'w', encoding='utf-8') as f:
for date_info in dates_to_export:
date = date_info['date']
stocks = get_daily_stocks(date)
for stock in stocks:
# 只保留搜索需要的字段
search_record = {
'date': stock.get('date'),
'scode': stock.get('scode'),
'sname': stock.get('sname'),
'brief': stock.get('brief', ''),
'core_sectors': stock.get('core_sectors', []),
'zt_time': stock.get('zt_time'),
'formatted_time': stock.get('formatted_time', ''),
'continuous_days': stock.get('continuous_days', '')
}
f.write(json.dumps(search_record, ensure_ascii=False) + '\n')
total_count += 1
logger.info(f"已保存搜索数据: {output_path} ({total_count} 条记录)")
def main():
parser = argparse.ArgumentParser(description='导出涨停分析数据到 JSON 文件')
parser.add_argument('--days', type=int, default=30, help='导出最近 N 天的数据')
parser.add_argument('--date', type=str, help='导出指定日期 (YYYYMMDD)')
parser.add_argument('--all', action='store_true', help='导出所有数据')
parser.add_argument('--no-search', action='store_true', help='不导出搜索数据')
args = parser.parse_args()
ensure_dirs()
# 获取所有可用日期
all_dates = get_available_dates()
logger.info(f"ES 中共有 {len(all_dates)} 个日期的数据")
if not all_dates:
logger.error("未找到任何数据")
return
# 确定要导出的日期
if args.date:
dates_to_export = [d for d in all_dates if d['date'] == args.date]
if not dates_to_export:
logger.error(f"未找到日期 {args.date} 的数据")
return
elif args.all:
dates_to_export = all_dates
else:
# 默认导出最近 N 天
dates_to_export = all_dates[:args.days]
logger.info(f"将导出 {len(dates_to_export)} 个日期的数据")
# 导出每日分析数据
for date_info in dates_to_export:
try:
export_daily_analysis(date_info['date'])
except Exception as e:
logger.error(f"导出 {date_info['date']} 失败: {e}")
# 导出日期索引(使用所有日期)
export_dates_index(all_dates)
# 导出搜索数据
if not args.no_search:
export_stocks_for_search(dates_to_export)
logger.info("导出完成!")
if __name__ == '__main__':
main()