#!/usr/bin/env python3 """ 涨停分析数据导出脚本 从 Elasticsearch 导出数据到静态 JSON 文件,供前端直接读取 使用方法: python export_zt_data.py # 导出最近 30 天数据 python export_zt_data.py --days 7 # 导出最近 7 天 python export_zt_data.py --date 20251212 # 导出指定日期 python export_zt_data.py --all # 导出所有数据 输出目录:data/zt/ ├── dates.json # 可用日期列表 ├── daily/ │ └── {date}.json # 每日分析数据 └── stocks.jsonl # 所有股票记录(用于关键词搜索) """ import os import json import argparse from datetime import datetime, timedelta from collections import defaultdict from elasticsearch import Elasticsearch import logging # 配置 ES_HOST = os.environ.get('ES_HOST', 'http://127.0.0.1:9200') # 输出到 public 目录,这样前端可以直接访问 OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'public', 'data', 'zt') # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ES 连接 es = Elasticsearch([ES_HOST], timeout=60, retry_on_timeout=True, max_retries=3) def ensure_dirs(): """确保输出目录存在""" os.makedirs(os.path.join(OUTPUT_DIR, 'daily'), exist_ok=True) logger.info(f"输出目录: {OUTPUT_DIR}") def get_available_dates(): """获取所有可用日期""" query = { "size": 0, "aggs": { "dates": { "terms": { "field": "date", "size": 10000, "order": {"_key": "desc"} }, "aggs": { "stock_count": { "cardinality": {"field": "scode"} } } } } } result = es.search(index="zt_stocks", body=query) dates = [] for bucket in result['aggregations']['dates']['buckets']: date = bucket['key'] count = bucket['doc_count'] # 格式化日期 YYYYMMDD -> YYYY-MM-DD formatted = f"{date[:4]}-{date[4:6]}-{date[6:]}" dates.append({ 'date': date, 'formatted_date': formatted, 'count': count }) return dates def get_daily_stats(date): """获取指定日期的统计数据""" query = { "query": {"term": {"date": date}}, "_source": ["sector_stats", "word_freq", "chart_data"] } result = es.search(index="zt_daily_stats", body=query, size=1) if result['hits']['total']['value'] > 0: return result['hits']['hits'][0]['_source'] return {} def get_daily_stocks(date): """获取指定日期的所有股票""" query = { "query": {"term": {"date": date}}, "size": 10000, "sort": [{"zt_time": "asc"}], "_source": { "exclude": ["content_embedding"] # 排除向量字段 } } result = es.search(index="zt_stocks", body=query) stocks = [] for hit in result['hits']['hits']: stock = hit['_source'] # 格式化涨停时间 if 'zt_time' in stock: try: zt_time = datetime.fromisoformat(stock['zt_time'].replace('Z', '+00:00')) stock['formatted_time'] = zt_time.strftime('%H:%M:%S') except: stock['formatted_time'] = '' stocks.append(stock) return stocks def process_sector_data(sector_stats, stocks): """处理板块数据""" if sector_stats: # 从预计算的 sector_stats 生成 sector_data = {} for sector_info in sector_stats: sector_name = sector_info['sector_name'] sector_data[sector_name] = { 'count': sector_info['count'], 'stock_codes': sector_info.get('stock_codes', []) } else: # 从股票数据生成 sector_stocks = defaultdict(list) sector_counts = defaultdict(int) for stock in stocks: for sector in stock.get('core_sectors', []): sector_counts[sector] += 1 small_sectors = {s for s, c in sector_counts.items() if c < 2} for stock in stocks: scode = stock.get('scode', '') valid_sectors = [s for s in stock.get('core_sectors', []) if s not in small_sectors] if valid_sectors: for sector in valid_sectors: sector_stocks[sector].append(scode) else: sector_stocks['其他'].append(scode) sector_data = { sector: {'count': len(codes), 'stock_codes': codes} for sector, codes in sector_stocks.items() } # 排序:公告优先,然后按数量降序,其他放最后 sorted_items = [] announcement = sector_data.pop('公告', None) other = sector_data.pop('其他', None) normal_items = sorted(sector_data.items(), key=lambda x: -x[1]['count']) if announcement: sorted_items.append(('公告', announcement)) sorted_items.extend(normal_items) if other: sorted_items.append(('其他', other)) return dict(sorted_items) def calculate_sector_relations_top10(stocks): """计算板块关联 TOP10""" relations = defaultdict(int) stock_sectors = defaultdict(set) for stock in stocks: scode = stock['scode'] for sector in stock.get('core_sectors', []): stock_sectors[scode].add(sector) for scode, sectors in stock_sectors.items(): sector_list = list(sectors) for i in range(len(sector_list)): for j in range(i + 1, len(sector_list)): pair = tuple(sorted([sector_list[i], sector_list[j]])) relations[pair] += 1 sorted_relations = sorted(relations.items(), key=lambda x: -x[1])[:10] return { 'labels': [f"{p[0]} - {p[1]}" for p, _ in sorted_relations], 'counts': [c for _, c in sorted_relations] } def export_daily_analysis(date): """导出单日分析数据""" logger.info(f"导出日期: {date}") # 获取数据 stats = get_daily_stats(date) stocks = get_daily_stocks(date) if not stocks: logger.warning(f"日期 {date} 无数据") return None # 处理板块数据 sector_data = process_sector_data(stats.get('sector_stats', []), stocks) # 计算板块关联 sector_relations = calculate_sector_relations_top10(stocks) # 生成图表数据 chart_data = stats.get('chart_data', { 'labels': [s for s in sector_data.keys() if s not in ['其他', '公告']], 'counts': [d['count'] for s, d in sector_data.items() if s not in ['其他', '公告']] }) # 组装分析数据 analysis = { 'date': date, 'formatted_date': f"{date[:4]}-{date[4:6]}-{date[6:]}", 'total_stocks': len(stocks), 'sector_data': sector_data, 'chart_data': chart_data, 'word_freq_data': stats.get('word_freq', []), 'sector_relations_top10': sector_relations, 'stocks': stocks # 包含完整股票列表 } # 保存文件 output_path = os.path.join(OUTPUT_DIR, 'daily', f'{date}.json') with open(output_path, 'w', encoding='utf-8') as f: json.dump(analysis, f, ensure_ascii=False, indent=2) logger.info(f"已保存: {output_path} ({len(stocks)} 只股票)") return analysis def export_dates_index(dates): """导出日期索引""" output_path = os.path.join(OUTPUT_DIR, 'dates.json') with open(output_path, 'w', encoding='utf-8') as f: json.dump({ 'dates': dates, 'total': len(dates), 'updated_at': datetime.now().isoformat() }, f, ensure_ascii=False, indent=2) logger.info(f"已保存日期索引: {output_path} ({len(dates)} 个日期)") def export_stocks_for_search(dates_to_export): """导出股票数据用于搜索(JSONL 格式)""" output_path = os.path.join(OUTPUT_DIR, 'stocks.jsonl') total_count = 0 with open(output_path, 'w', encoding='utf-8') as f: for date_info in dates_to_export: date = date_info['date'] stocks = get_daily_stocks(date) for stock in stocks: # 只保留搜索需要的字段 search_record = { 'date': stock.get('date'), 'scode': stock.get('scode'), 'sname': stock.get('sname'), 'brief': stock.get('brief', ''), 'core_sectors': stock.get('core_sectors', []), 'zt_time': stock.get('zt_time'), 'formatted_time': stock.get('formatted_time', ''), 'continuous_days': stock.get('continuous_days', '') } f.write(json.dumps(search_record, ensure_ascii=False) + '\n') total_count += 1 logger.info(f"已保存搜索数据: {output_path} ({total_count} 条记录)") def main(): parser = argparse.ArgumentParser(description='导出涨停分析数据到 JSON 文件') parser.add_argument('--days', type=int, default=30, help='导出最近 N 天的数据') parser.add_argument('--date', type=str, help='导出指定日期 (YYYYMMDD)') parser.add_argument('--all', action='store_true', help='导出所有数据') parser.add_argument('--no-search', action='store_true', help='不导出搜索数据') args = parser.parse_args() ensure_dirs() # 获取所有可用日期 all_dates = get_available_dates() logger.info(f"ES 中共有 {len(all_dates)} 个日期的数据") if not all_dates: logger.error("未找到任何数据") return # 确定要导出的日期 if args.date: dates_to_export = [d for d in all_dates if d['date'] == args.date] if not dates_to_export: logger.error(f"未找到日期 {args.date} 的数据") return elif args.all: dates_to_export = all_dates else: # 默认导出最近 N 天 dates_to_export = all_dates[:args.days] logger.info(f"将导出 {len(dates_to_export)} 个日期的数据") # 导出每日分析数据 for date_info in dates_to_export: try: export_daily_analysis(date_info['date']) except Exception as e: logger.error(f"导出 {date_info['date']} 失败: {e}") # 导出日期索引(使用所有日期) export_dates_index(all_dates) # 导出搜索数据 if not args.no_search: export_stocks_for_search(dates_to_export) logger.info("导出完成!") if __name__ == '__main__': main()