diff --git a/app.py b/app.py index 3654c8d0..552a63a3 100755 --- a/app.py +++ b/app.py @@ -26,6 +26,7 @@ import re import string from datetime import datetime, timedelta, time as dt_time, date from clickhouse_driver import Client as Cclient +from elasticsearch import Elasticsearch from flask_cors import CORS from collections import defaultdict @@ -138,6 +139,15 @@ engine_2 = create_engine( pool_timeout=30, max_overflow=10 ) + +# Elasticsearch 客户端初始化 +es_client = Elasticsearch( + hosts=["http://222.128.1.157:19200"], + request_timeout=30, + max_retries=3, + retry_on_timeout=True +) + app = Flask(__name__) # 存储验证码的临时字典(生产环境应使用Redis) verification_codes = {} @@ -11688,108 +11698,98 @@ def get_daily_top_concepts(): @app.route('/api/market/rise-analysis/', methods=['GET']) def get_rise_analysis(seccode): - """获取股票涨幅分析数据""" + """获取股票涨幅分析数据(从 Elasticsearch 获取)""" try: # 获取日期范围参数 start_date = request.args.get('start_date') end_date = request.args.get('end_date') + limit = request.args.get('limit', 100, type=int) - query = text(""" - SELECT stock_code, - stock_name, - trade_date, - rise_rate, - close_price, - volume, - amount, - main_business, - rise_reason_brief, - rise_reason_detail, - news_summary, - announcements, - guba_sentiment, - analysis_time - FROM stock_rise_analysis - WHERE stock_code = :stock_code - """) + # 构建 ES 查询 + must_conditions = [ + {"term": {"stock_code": seccode}} + ] - params = {'stock_code': seccode} - - # 添加日期筛选 + # 添加日期范围筛选 if start_date and end_date: - query = text(""" - SELECT stock_code, - stock_name, - trade_date, - rise_rate, - close_price, - volume, - amount, - main_business, - rise_reason_brief, - rise_reason_detail, - news_summary, - announcements, - guba_sentiment, - analysis_time - FROM stock_rise_analysis - WHERE stock_code = :stock_code - AND trade_date BETWEEN :start_date AND :end_date - ORDER BY trade_date DESC - """) - params['start_date'] = start_date - params['end_date'] = end_date - else: - query = text(""" - SELECT stock_code, - stock_name, - trade_date, - rise_rate, - close_price, - volume, - amount, - main_business, - rise_reason_brief, - rise_reason_detail, - news_summary, - announcements, - guba_sentiment, - analysis_time - FROM stock_rise_analysis - WHERE stock_code = :stock_code - ORDER BY trade_date DESC LIMIT 100 - """) + must_conditions.append({ + "range": { + "trade_date": { + "gte": start_date, + "lte": end_date, + "format": "yyyy-MM-dd" + } + } + }) - with engine.connect() as conn: - result = conn.execute(query, params).fetchall() + es_query = { + "query": { + "bool": { + "must": must_conditions + } + }, + "sort": [ + {"trade_date": {"order": "desc"}} + ], + "size": limit, + "_source": { + "excludes": ["rise_reason_detail_embedding"] # 排除向量字段 + } + } + + # 执行 ES 查询 + response = es_client.search(index="stock_rise_analysis", body=es_query) # 格式化数据 rise_analysis_data = [] - for row in result: + for hit in response['hits']['hits']: + source = hit['_source'] + + # 处理研报引用数据 + verification_reports = [] + if source.get('has_verification_info') and source.get('verification_info'): + v_info = source['verification_info'] + processed_results = v_info.get('processed_result', []) + for report in processed_results: + verification_reports.append({ + 'publisher': report.get('publisher', ''), + 'report_title': report.get('report_title', ''), + 'author': report.get('author', ''), + 'declare_date': report.get('declare_date', ''), + 'content': report.get('content', ''), + 'verification_item': report.get('verification_item', ''), + 'match_ratio': report.get('match_ratio', 0), + 'match_score': report.get('match_score', '') + }) + rise_analysis_data.append({ - 'stock_code': row.stock_code, - 'stock_name': row.stock_name, - 'trade_date': format_date(row.trade_date), - 'rise_rate': format_decimal(row.rise_rate), - 'close_price': format_decimal(row.close_price), - 'volume': format_decimal(row.volume), - 'amount': format_decimal(row.amount), - 'main_business': row.main_business, - 'rise_reason_brief': row.rise_reason_brief, - 'rise_reason_detail': row.rise_reason_detail, - 'news_summary': row.news_summary, - 'announcements': row.announcements, - 'guba_sentiment': row.guba_sentiment, - 'analysis_time': row.analysis_time.strftime('%Y-%m-%d %H:%M:%S') if row.analysis_time else None + 'stock_code': source.get('stock_code', ''), + 'stock_name': source.get('stock_name', ''), + 'trade_date': source.get('trade_date', ''), + 'rise_rate': source.get('rise_rate', 0), + 'close_price': source.get('close_price', 0), + 'volume': source.get('volume', 0), + 'amount': source.get('amount', 0), + 'main_business': source.get('main_business', ''), + 'rise_reason_brief': source.get('rise_reason_brief', ''), + 'rise_reason_detail': source.get('rise_reason_detail', ''), + 'announcements': source.get('announcements', ''), + 'verification_reports': verification_reports, + 'has_verification_info': source.get('has_verification_info', False), + 'create_time': source.get('create_time', ''), + 'update_time': source.get('update_time', '') }) return jsonify({ 'success': True, 'data': rise_analysis_data, - 'count': len(rise_analysis_data) + 'count': len(rise_analysis_data), + 'total': response['hits']['total']['value'] }) except Exception as e: + import traceback + print(f"ES查询错误: {traceback.format_exc()}") return jsonify({ 'success': False, 'error': str(e) diff --git a/requirements.txt b/requirements.txt index 481c7967..aabefca5 100755 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,5 @@ gevent-websocket==0.10.1 psutil==5.9.6 Pillow==10.1.0 itsdangerous==2.1.2 -APScheduler==3.10.4 \ No newline at end of file +APScheduler==3.10.4 +elasticsearch==8.15.0 \ No newline at end of file