update pay ui

This commit is contained in:
2025-12-14 16:06:06 +08:00
parent 71ec9668be
commit 3d89cbef2b
6 changed files with 1034 additions and 50 deletions

342
export_zt_data.py Normal file
View File

@@ -0,0 +1,342 @@
#!/usr/bin/env python3
"""
涨停分析数据导出脚本
从 Elasticsearch 导出数据到静态 JSON 文件,供前端直接读取
使用方法:
python export_zt_data.py # 导出最近 30 天数据
python export_zt_data.py --days 7 # 导出最近 7 天
python export_zt_data.py --date 20251212 # 导出指定日期
python export_zt_data.py --all # 导出所有数据
输出目录data/zt/
├── dates.json # 可用日期列表
├── daily/
│ └── {date}.json # 每日分析数据
└── stocks.jsonl # 所有股票记录(用于关键词搜索)
"""
import os
import json
import argparse
from datetime import datetime, timedelta
from collections import defaultdict
from elasticsearch import Elasticsearch
import logging
# 配置
ES_HOST = os.environ.get('ES_HOST', 'http://127.0.0.1:9200')
# 输出到 public 目录,这样前端可以直接访问
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'public', 'data', 'zt')
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ES 连接
es = Elasticsearch([ES_HOST], timeout=60, retry_on_timeout=True, max_retries=3)
def ensure_dirs():
"""确保输出目录存在"""
os.makedirs(os.path.join(OUTPUT_DIR, 'daily'), exist_ok=True)
logger.info(f"输出目录: {OUTPUT_DIR}")
def get_available_dates():
"""获取所有可用日期"""
query = {
"size": 0,
"aggs": {
"dates": {
"terms": {
"field": "date",
"size": 10000,
"order": {"_key": "desc"}
},
"aggs": {
"stock_count": {
"cardinality": {"field": "scode"}
}
}
}
}
}
result = es.search(index="zt_stocks", body=query)
dates = []
for bucket in result['aggregations']['dates']['buckets']:
date = bucket['key']
count = bucket['doc_count']
# 格式化日期 YYYYMMDD -> YYYY-MM-DD
formatted = f"{date[:4]}-{date[4:6]}-{date[6:]}"
dates.append({
'date': date,
'formatted_date': formatted,
'count': count
})
return dates
def get_daily_stats(date):
"""获取指定日期的统计数据"""
query = {
"query": {"term": {"date": date}},
"_source": ["sector_stats", "word_freq", "chart_data"]
}
result = es.search(index="zt_daily_stats", body=query, size=1)
if result['hits']['total']['value'] > 0:
return result['hits']['hits'][0]['_source']
return {}
def get_daily_stocks(date):
"""获取指定日期的所有股票"""
query = {
"query": {"term": {"date": date}},
"size": 10000,
"sort": [{"zt_time": "asc"}],
"_source": {
"exclude": ["content_embedding"] # 排除向量字段
}
}
result = es.search(index="zt_stocks", body=query)
stocks = []
for hit in result['hits']['hits']:
stock = hit['_source']
# 格式化涨停时间
if 'zt_time' in stock:
try:
zt_time = datetime.fromisoformat(stock['zt_time'].replace('Z', '+00:00'))
stock['formatted_time'] = zt_time.strftime('%H:%M:%S')
except:
stock['formatted_time'] = ''
stocks.append(stock)
return stocks
def process_sector_data(sector_stats, stocks):
"""处理板块数据"""
if sector_stats:
# 从预计算的 sector_stats 生成
sector_data = {}
for sector_info in sector_stats:
sector_name = sector_info['sector_name']
sector_data[sector_name] = {
'count': sector_info['count'],
'stock_codes': sector_info.get('stock_codes', [])
}
else:
# 从股票数据生成
sector_stocks = defaultdict(list)
sector_counts = defaultdict(int)
for stock in stocks:
for sector in stock.get('core_sectors', []):
sector_counts[sector] += 1
small_sectors = {s for s, c in sector_counts.items() if c < 2}
for stock in stocks:
scode = stock.get('scode', '')
valid_sectors = [s for s in stock.get('core_sectors', []) if s not in small_sectors]
if valid_sectors:
for sector in valid_sectors:
sector_stocks[sector].append(scode)
else:
sector_stocks['其他'].append(scode)
sector_data = {
sector: {'count': len(codes), 'stock_codes': codes}
for sector, codes in sector_stocks.items()
}
# 排序:公告优先,然后按数量降序,其他放最后
sorted_items = []
announcement = sector_data.pop('公告', None)
other = sector_data.pop('其他', None)
normal_items = sorted(sector_data.items(), key=lambda x: -x[1]['count'])
if announcement:
sorted_items.append(('公告', announcement))
sorted_items.extend(normal_items)
if other:
sorted_items.append(('其他', other))
return dict(sorted_items)
def calculate_sector_relations_top10(stocks):
"""计算板块关联 TOP10"""
relations = defaultdict(int)
stock_sectors = defaultdict(set)
for stock in stocks:
scode = stock['scode']
for sector in stock.get('core_sectors', []):
stock_sectors[scode].add(sector)
for scode, sectors in stock_sectors.items():
sector_list = list(sectors)
for i in range(len(sector_list)):
for j in range(i + 1, len(sector_list)):
pair = tuple(sorted([sector_list[i], sector_list[j]]))
relations[pair] += 1
sorted_relations = sorted(relations.items(), key=lambda x: -x[1])[:10]
return {
'labels': [f"{p[0]} - {p[1]}" for p, _ in sorted_relations],
'counts': [c for _, c in sorted_relations]
}
def export_daily_analysis(date):
"""导出单日分析数据"""
logger.info(f"导出日期: {date}")
# 获取数据
stats = get_daily_stats(date)
stocks = get_daily_stocks(date)
if not stocks:
logger.warning(f"日期 {date} 无数据")
return None
# 处理板块数据
sector_data = process_sector_data(stats.get('sector_stats', []), stocks)
# 计算板块关联
sector_relations = calculate_sector_relations_top10(stocks)
# 生成图表数据
chart_data = stats.get('chart_data', {
'labels': [s for s in sector_data.keys() if s not in ['其他', '公告']],
'counts': [d['count'] for s, d in sector_data.items() if s not in ['其他', '公告']]
})
# 组装分析数据
analysis = {
'date': date,
'formatted_date': f"{date[:4]}-{date[4:6]}-{date[6:]}",
'total_stocks': len(stocks),
'sector_data': sector_data,
'chart_data': chart_data,
'word_freq_data': stats.get('word_freq', []),
'sector_relations_top10': sector_relations,
'stocks': stocks # 包含完整股票列表
}
# 保存文件
output_path = os.path.join(OUTPUT_DIR, 'daily', f'{date}.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(analysis, f, ensure_ascii=False, indent=2)
logger.info(f"已保存: {output_path} ({len(stocks)} 只股票)")
return analysis
def export_dates_index(dates):
"""导出日期索引"""
output_path = os.path.join(OUTPUT_DIR, 'dates.json')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump({
'dates': dates,
'total': len(dates),
'updated_at': datetime.now().isoformat()
}, f, ensure_ascii=False, indent=2)
logger.info(f"已保存日期索引: {output_path} ({len(dates)} 个日期)")
def export_stocks_for_search(dates_to_export):
"""导出股票数据用于搜索JSONL 格式)"""
output_path = os.path.join(OUTPUT_DIR, 'stocks.jsonl')
total_count = 0
with open(output_path, 'w', encoding='utf-8') as f:
for date_info in dates_to_export:
date = date_info['date']
stocks = get_daily_stocks(date)
for stock in stocks:
# 只保留搜索需要的字段
search_record = {
'date': stock.get('date'),
'scode': stock.get('scode'),
'sname': stock.get('sname'),
'brief': stock.get('brief', ''),
'core_sectors': stock.get('core_sectors', []),
'zt_time': stock.get('zt_time'),
'formatted_time': stock.get('formatted_time', ''),
'continuous_days': stock.get('continuous_days', '')
}
f.write(json.dumps(search_record, ensure_ascii=False) + '\n')
total_count += 1
logger.info(f"已保存搜索数据: {output_path} ({total_count} 条记录)")
def main():
parser = argparse.ArgumentParser(description='导出涨停分析数据到 JSON 文件')
parser.add_argument('--days', type=int, default=30, help='导出最近 N 天的数据')
parser.add_argument('--date', type=str, help='导出指定日期 (YYYYMMDD)')
parser.add_argument('--all', action='store_true', help='导出所有数据')
parser.add_argument('--no-search', action='store_true', help='不导出搜索数据')
args = parser.parse_args()
ensure_dirs()
# 获取所有可用日期
all_dates = get_available_dates()
logger.info(f"ES 中共有 {len(all_dates)} 个日期的数据")
if not all_dates:
logger.error("未找到任何数据")
return
# 确定要导出的日期
if args.date:
dates_to_export = [d for d in all_dates if d['date'] == args.date]
if not dates_to_export:
logger.error(f"未找到日期 {args.date} 的数据")
return
elif args.all:
dates_to_export = all_dates
else:
# 默认导出最近 N 天
dates_to_export = all_dates[:args.days]
logger.info(f"将导出 {len(dates_to_export)} 个日期的数据")
# 导出每日分析数据
for date_info in dates_to_export:
try:
export_daily_analysis(date_info['date'])
except Exception as e:
logger.error(f"导出 {date_info['date']} 失败: {e}")
# 导出日期索引(使用所有日期)
export_dates_index(all_dates)
# 导出搜索数据
if not args.no_search:
export_stocks_for_search(dates_to_export)
logger.info("导出完成!")
if __name__ == '__main__':
main()