Files
vf_react/zt_api_static.py
2025-12-14 16:06:06 +08:00

350 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
涨停分析 API静态文件版本
从 data/zt/ 目录读取预生成的 JSON 文件,不依赖 Elasticsearch
启动方式:
python zt_api_static.py
端口8800与原 report_zt_api.py 相同,可替换使用)
"""
import os
import json
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS
from datetime import datetime
import logging
app = Flask(__name__)
CORS(app)
# 配置
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data', 'zt')
# 日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 内存缓存
_dates_cache = None
_stocks_cache = None
def load_dates():
"""加载日期索引"""
global _dates_cache
if _dates_cache is None:
dates_file = os.path.join(DATA_DIR, 'dates.json')
if os.path.exists(dates_file):
with open(dates_file, 'r', encoding='utf-8') as f:
_dates_cache = json.load(f)
else:
_dates_cache = {'dates': [], 'total': 0}
return _dates_cache
def load_daily_analysis(date):
"""加载指定日期的分析数据"""
daily_file = os.path.join(DATA_DIR, 'daily', f'{date}.json')
if os.path.exists(daily_file):
with open(daily_file, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def load_stocks_for_search():
"""加载股票数据用于搜索"""
global _stocks_cache
if _stocks_cache is None:
stocks_file = os.path.join(DATA_DIR, 'stocks.jsonl')
if os.path.exists(stocks_file):
_stocks_cache = []
with open(stocks_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
_stocks_cache.append(json.loads(line))
logger.info(f"已加载 {len(_stocks_cache)} 条股票记录用于搜索")
else:
_stocks_cache = []
return _stocks_cache
# ==================== API 路由 ====================
@app.route('/api/v1/dates/available', methods=['GET'])
def get_available_dates():
"""获取所有可用日期"""
try:
data = load_dates()
# 转换为日历事件格式
events = []
for d in data.get('dates', []):
events.append({
'title': f"{d['count']}",
'start': d['formatted_date'],
'end': d['formatted_date'],
'className': 'bg-gradient-primary',
'allDay': True,
'date': d['date'],
'count': d['count']
})
return jsonify({
'success': True,
'events': events,
'total': len(events)
})
except Exception as e:
logger.error(f"获取日期列表失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/analysis/daily/<date>', methods=['GET'])
def get_daily_analysis(date):
"""获取指定日期的分析数据"""
try:
data = load_daily_analysis(date)
if data is None:
return jsonify({
'success': False,
'error': f'日期 {date} 的数据不存在'
}), 404
# 返回数据(与原接口格式兼容)
return jsonify({
'success': True,
'data': {
'date': data['date'],
'formatted_date': data['formatted_date'],
'total_stocks': data['total_stocks'],
'sector_data': data['sector_data'],
'chart_data': data['chart_data'],
'word_freq_data': data['word_freq_data'],
'sector_relations_top10': data['sector_relations_top10']
},
'from_cache': True,
'cache_source': 'static_file'
})
except Exception as e:
logger.error(f"获取日期 {date} 分析数据失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/stocks/batch-detail', methods=['POST'])
def get_stocks_batch_detail():
"""批量获取股票详情"""
try:
data = request.json
stock_codes = data.get('codes', [])
date = data.get('date')
if not stock_codes or not date:
return jsonify({'success': False, 'error': '缺少参数'}), 400
# 从日分析数据中获取股票详情
daily_data = load_daily_analysis(date)
if not daily_data:
return jsonify({'success': False, 'error': f'日期 {date} 数据不存在'}), 404
# 过滤指定股票
stocks = [s for s in daily_data.get('stocks', []) if s.get('scode') in stock_codes]
return jsonify({
'success': True,
'data': stocks
})
except Exception as e:
logger.error(f"批量获取股票详情失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/stocks/search/hybrid', methods=['POST'])
def search_stocks():
"""
关键词搜索股票
支持搜索股票代码、股票名称、涨停原因brief、板块
"""
try:
data = request.json
query = data.get('query', '').strip().lower()
date = data.get('date')
date_range = data.get('date_range', {})
page = data.get('page', 1)
page_size = data.get('page_size', 20)
if not query:
return jsonify({'success': False, 'error': '搜索关键词不能为空'}), 400
# 加载搜索数据
all_stocks = load_stocks_for_search()
# 过滤
results = []
for stock in all_stocks:
# 日期过滤
stock_date = stock.get('date', '')
if date and stock_date != date:
continue
if date_range:
if date_range.get('start') and stock_date < date_range['start']:
continue
if date_range.get('end') and stock_date > date_range['end']:
continue
# 关键词匹配
match_score = 0
# 精确匹配股票代码(最高优先级)
if query == stock.get('scode', '').lower():
match_score = 100
# 精确匹配股票名称
elif query == stock.get('sname', '').lower():
match_score = 90
# 部分匹配股票名称
elif query in stock.get('sname', '').lower():
match_score = 80
# 匹配板块
elif any(query in sector.lower() for sector in stock.get('core_sectors', [])):
match_score = 70
# 匹配涨停原因
elif query in stock.get('brief', '').lower():
match_score = 60
if match_score > 0:
stock_copy = stock.copy()
stock_copy['_score'] = match_score
results.append(stock_copy)
# 按分数和日期排序
results.sort(key=lambda x: (-x['_score'], -int(x.get('date', '0'))))
# 分页
total = len(results)
start = (page - 1) * page_size
end = start + page_size
page_results = results[start:end]
return jsonify({
'success': True,
'data': {
'stocks': page_results,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size,
'search_mode': 'keyword'
}
})
except Exception as e:
logger.error(f"搜索失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/init/data', methods=['GET'])
def init_data():
"""初始化数据(获取最新日期的分析数据)"""
try:
dates_data = load_dates()
dates = dates_data.get('dates', [])
if not dates:
return jsonify({'success': False, 'error': '无可用数据'}), 404
latest_date = dates[0]['date']
analysis = load_daily_analysis(latest_date)
if not analysis:
return jsonify({'success': False, 'error': '数据加载失败'}), 500
# 转换日期为日历事件格式
events = [{
'title': f"{d['count']}",
'start': d['formatted_date'],
'end': d['formatted_date'],
'className': 'bg-gradient-primary',
'allDay': True,
'date': d['date'],
'count': d['count']
} for d in dates]
return jsonify({
'success': True,
'data': {
'latest_date': latest_date,
'formatted_date': analysis['formatted_date'],
'analysis': {
'date': analysis['date'],
'formatted_date': analysis['formatted_date'],
'total_stocks': analysis['total_stocks'],
'sector_data': analysis['sector_data'],
'chart_data': analysis['chart_data'],
'word_freq_data': analysis['word_freq_data'],
'sector_relations_top10': analysis['sector_relations_top10']
},
'available_dates': events
}
})
except Exception as e:
logger.error(f"初始化数据失败: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/health', methods=['GET'])
def health_check():
"""健康检查"""
dates_data = load_dates()
return jsonify({
'success': True,
'status': 'healthy',
'mode': 'static_file',
'data_dir': DATA_DIR,
'total_dates': dates_data.get('total', 0),
'updated_at': dates_data.get('updated_at', 'unknown')
})
@app.route('/api/v1/cache/reload', methods=['POST'])
def reload_cache():
"""重新加载缓存"""
global _dates_cache, _stocks_cache
_dates_cache = None
_stocks_cache = None
# 重新加载
load_dates()
load_stocks_for_search()
return jsonify({
'success': True,
'message': '缓存已重新加载'
})
# 静态文件服务(可选,用于直接访问 JSON 文件)
@app.route('/data/zt/<path:filename>')
def serve_data_file(filename):
"""直接提供静态 JSON 文件"""
return send_from_directory(DATA_DIR, filename)
if __name__ == '__main__':
# 预加载数据
logger.info("预加载数据...")
load_dates()
load_stocks_for_search()
logger.info("数据加载完成")
app.run(debug=True, host='0.0.0.0', port=8800)