343 lines
11 KiB
Python
343 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
涨停分析数据导出脚本
|
||
从 Elasticsearch 导出数据到静态 JSON 文件,供前端直接读取
|
||
|
||
使用方法:
|
||
python export_zt_data.py # 导出最近 30 天数据
|
||
python export_zt_data.py --days 7 # 导出最近 7 天
|
||
python export_zt_data.py --date 20251212 # 导出指定日期
|
||
python export_zt_data.py --all # 导出所有数据
|
||
|
||
输出目录:data/zt/
|
||
├── dates.json # 可用日期列表
|
||
├── daily/
|
||
│ └── {date}.json # 每日分析数据
|
||
└── stocks.jsonl # 所有股票记录(用于关键词搜索)
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import argparse
|
||
from datetime import datetime, timedelta
|
||
from collections import defaultdict
|
||
from elasticsearch import Elasticsearch
|
||
import logging
|
||
|
||
# 配置
|
||
ES_HOST = os.environ.get('ES_HOST', 'http://127.0.0.1:9200')
|
||
# 输出到 public 目录,这样前端可以直接访问
|
||
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'public', 'data', 'zt')
|
||
|
||
# 日志配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ES 连接
|
||
es = Elasticsearch([ES_HOST], timeout=60, retry_on_timeout=True, max_retries=3)
|
||
|
||
|
||
def ensure_dirs():
|
||
"""确保输出目录存在"""
|
||
os.makedirs(os.path.join(OUTPUT_DIR, 'daily'), exist_ok=True)
|
||
logger.info(f"输出目录: {OUTPUT_DIR}")
|
||
|
||
|
||
def get_available_dates():
|
||
"""获取所有可用日期"""
|
||
query = {
|
||
"size": 0,
|
||
"aggs": {
|
||
"dates": {
|
||
"terms": {
|
||
"field": "date",
|
||
"size": 10000,
|
||
"order": {"_key": "desc"}
|
||
},
|
||
"aggs": {
|
||
"stock_count": {
|
||
"cardinality": {"field": "scode"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
result = es.search(index="zt_stocks", body=query)
|
||
|
||
dates = []
|
||
for bucket in result['aggregations']['dates']['buckets']:
|
||
date = bucket['key']
|
||
count = bucket['doc_count']
|
||
# 格式化日期 YYYYMMDD -> YYYY-MM-DD
|
||
formatted = f"{date[:4]}-{date[4:6]}-{date[6:]}"
|
||
dates.append({
|
||
'date': date,
|
||
'formatted_date': formatted,
|
||
'count': count
|
||
})
|
||
|
||
return dates
|
||
|
||
|
||
def get_daily_stats(date):
|
||
"""获取指定日期的统计数据"""
|
||
query = {
|
||
"query": {"term": {"date": date}},
|
||
"_source": ["sector_stats", "word_freq", "chart_data"]
|
||
}
|
||
|
||
result = es.search(index="zt_daily_stats", body=query, size=1)
|
||
|
||
if result['hits']['total']['value'] > 0:
|
||
return result['hits']['hits'][0]['_source']
|
||
return {}
|
||
|
||
|
||
def get_daily_stocks(date):
|
||
"""获取指定日期的所有股票"""
|
||
query = {
|
||
"query": {"term": {"date": date}},
|
||
"size": 10000,
|
||
"sort": [{"zt_time": "asc"}],
|
||
"_source": {
|
||
"exclude": ["content_embedding"] # 排除向量字段
|
||
}
|
||
}
|
||
|
||
result = es.search(index="zt_stocks", body=query)
|
||
|
||
stocks = []
|
||
for hit in result['hits']['hits']:
|
||
stock = hit['_source']
|
||
# 格式化涨停时间
|
||
if 'zt_time' in stock:
|
||
try:
|
||
zt_time = datetime.fromisoformat(stock['zt_time'].replace('Z', '+00:00'))
|
||
stock['formatted_time'] = zt_time.strftime('%H:%M:%S')
|
||
except:
|
||
stock['formatted_time'] = ''
|
||
stocks.append(stock)
|
||
|
||
return stocks
|
||
|
||
|
||
def process_sector_data(sector_stats, stocks):
|
||
"""处理板块数据"""
|
||
if sector_stats:
|
||
# 从预计算的 sector_stats 生成
|
||
sector_data = {}
|
||
for sector_info in sector_stats:
|
||
sector_name = sector_info['sector_name']
|
||
sector_data[sector_name] = {
|
||
'count': sector_info['count'],
|
||
'stock_codes': sector_info.get('stock_codes', [])
|
||
}
|
||
else:
|
||
# 从股票数据生成
|
||
sector_stocks = defaultdict(list)
|
||
sector_counts = defaultdict(int)
|
||
|
||
for stock in stocks:
|
||
for sector in stock.get('core_sectors', []):
|
||
sector_counts[sector] += 1
|
||
|
||
small_sectors = {s for s, c in sector_counts.items() if c < 2}
|
||
|
||
for stock in stocks:
|
||
scode = stock.get('scode', '')
|
||
valid_sectors = [s for s in stock.get('core_sectors', []) if s not in small_sectors]
|
||
|
||
if valid_sectors:
|
||
for sector in valid_sectors:
|
||
sector_stocks[sector].append(scode)
|
||
else:
|
||
sector_stocks['其他'].append(scode)
|
||
|
||
sector_data = {
|
||
sector: {'count': len(codes), 'stock_codes': codes}
|
||
for sector, codes in sector_stocks.items()
|
||
}
|
||
|
||
# 排序:公告优先,然后按数量降序,其他放最后
|
||
sorted_items = []
|
||
announcement = sector_data.pop('公告', None)
|
||
other = sector_data.pop('其他', None)
|
||
|
||
normal_items = sorted(sector_data.items(), key=lambda x: -x[1]['count'])
|
||
|
||
if announcement:
|
||
sorted_items.append(('公告', announcement))
|
||
sorted_items.extend(normal_items)
|
||
if other:
|
||
sorted_items.append(('其他', other))
|
||
|
||
return dict(sorted_items)
|
||
|
||
|
||
def calculate_sector_relations_top10(stocks):
|
||
"""计算板块关联 TOP10"""
|
||
relations = defaultdict(int)
|
||
stock_sectors = defaultdict(set)
|
||
|
||
for stock in stocks:
|
||
scode = stock['scode']
|
||
for sector in stock.get('core_sectors', []):
|
||
stock_sectors[scode].add(sector)
|
||
|
||
for scode, sectors in stock_sectors.items():
|
||
sector_list = list(sectors)
|
||
for i in range(len(sector_list)):
|
||
for j in range(i + 1, len(sector_list)):
|
||
pair = tuple(sorted([sector_list[i], sector_list[j]]))
|
||
relations[pair] += 1
|
||
|
||
sorted_relations = sorted(relations.items(), key=lambda x: -x[1])[:10]
|
||
|
||
return {
|
||
'labels': [f"{p[0]} - {p[1]}" for p, _ in sorted_relations],
|
||
'counts': [c for _, c in sorted_relations]
|
||
}
|
||
|
||
|
||
def export_daily_analysis(date):
|
||
"""导出单日分析数据"""
|
||
logger.info(f"导出日期: {date}")
|
||
|
||
# 获取数据
|
||
stats = get_daily_stats(date)
|
||
stocks = get_daily_stocks(date)
|
||
|
||
if not stocks:
|
||
logger.warning(f"日期 {date} 无数据")
|
||
return None
|
||
|
||
# 处理板块数据
|
||
sector_data = process_sector_data(stats.get('sector_stats', []), stocks)
|
||
|
||
# 计算板块关联
|
||
sector_relations = calculate_sector_relations_top10(stocks)
|
||
|
||
# 生成图表数据
|
||
chart_data = stats.get('chart_data', {
|
||
'labels': [s for s in sector_data.keys() if s not in ['其他', '公告']],
|
||
'counts': [d['count'] for s, d in sector_data.items() if s not in ['其他', '公告']]
|
||
})
|
||
|
||
# 组装分析数据
|
||
analysis = {
|
||
'date': date,
|
||
'formatted_date': f"{date[:4]}-{date[4:6]}-{date[6:]}",
|
||
'total_stocks': len(stocks),
|
||
'sector_data': sector_data,
|
||
'chart_data': chart_data,
|
||
'word_freq_data': stats.get('word_freq', []),
|
||
'sector_relations_top10': sector_relations,
|
||
'stocks': stocks # 包含完整股票列表
|
||
}
|
||
|
||
# 保存文件
|
||
output_path = os.path.join(OUTPUT_DIR, 'daily', f'{date}.json')
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(analysis, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"已保存: {output_path} ({len(stocks)} 只股票)")
|
||
return analysis
|
||
|
||
|
||
def export_dates_index(dates):
|
||
"""导出日期索引"""
|
||
output_path = os.path.join(OUTPUT_DIR, 'dates.json')
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
'dates': dates,
|
||
'total': len(dates),
|
||
'updated_at': datetime.now().isoformat()
|
||
}, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"已保存日期索引: {output_path} ({len(dates)} 个日期)")
|
||
|
||
|
||
def export_stocks_for_search(dates_to_export):
|
||
"""导出股票数据用于搜索(JSONL 格式)"""
|
||
output_path = os.path.join(OUTPUT_DIR, 'stocks.jsonl')
|
||
|
||
total_count = 0
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
for date_info in dates_to_export:
|
||
date = date_info['date']
|
||
stocks = get_daily_stocks(date)
|
||
|
||
for stock in stocks:
|
||
# 只保留搜索需要的字段
|
||
search_record = {
|
||
'date': stock.get('date'),
|
||
'scode': stock.get('scode'),
|
||
'sname': stock.get('sname'),
|
||
'brief': stock.get('brief', ''),
|
||
'core_sectors': stock.get('core_sectors', []),
|
||
'zt_time': stock.get('zt_time'),
|
||
'formatted_time': stock.get('formatted_time', ''),
|
||
'continuous_days': stock.get('continuous_days', '')
|
||
}
|
||
f.write(json.dumps(search_record, ensure_ascii=False) + '\n')
|
||
total_count += 1
|
||
|
||
logger.info(f"已保存搜索数据: {output_path} ({total_count} 条记录)")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='导出涨停分析数据到 JSON 文件')
|
||
parser.add_argument('--days', type=int, default=30, help='导出最近 N 天的数据')
|
||
parser.add_argument('--date', type=str, help='导出指定日期 (YYYYMMDD)')
|
||
parser.add_argument('--all', action='store_true', help='导出所有数据')
|
||
parser.add_argument('--no-search', action='store_true', help='不导出搜索数据')
|
||
args = parser.parse_args()
|
||
|
||
ensure_dirs()
|
||
|
||
# 获取所有可用日期
|
||
all_dates = get_available_dates()
|
||
logger.info(f"ES 中共有 {len(all_dates)} 个日期的数据")
|
||
|
||
if not all_dates:
|
||
logger.error("未找到任何数据")
|
||
return
|
||
|
||
# 确定要导出的日期
|
||
if args.date:
|
||
dates_to_export = [d for d in all_dates if d['date'] == args.date]
|
||
if not dates_to_export:
|
||
logger.error(f"未找到日期 {args.date} 的数据")
|
||
return
|
||
elif args.all:
|
||
dates_to_export = all_dates
|
||
else:
|
||
# 默认导出最近 N 天
|
||
dates_to_export = all_dates[:args.days]
|
||
|
||
logger.info(f"将导出 {len(dates_to_export)} 个日期的数据")
|
||
|
||
# 导出每日分析数据
|
||
for date_info in dates_to_export:
|
||
try:
|
||
export_daily_analysis(date_info['date'])
|
||
except Exception as e:
|
||
logger.error(f"导出 {date_info['date']} 失败: {e}")
|
||
|
||
# 导出日期索引(使用所有日期)
|
||
export_dates_index(all_dates)
|
||
|
||
# 导出搜索数据
|
||
if not args.no_search:
|
||
export_stocks_for_search(dates_to_export)
|
||
|
||
logger.info("导出完成!")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|