Files
vf_react/app/routes/events.py
2025-10-11 12:02:01 +08:00

385 lines
14 KiB
Python

from flask import Blueprint, request, jsonify
from app import db
from app.models import Event, RelatedStock, RelatedConcepts, HistoricalEvent, EventTransmissionNode, EventTransmissionEdge, EventSankeyFlow
from datetime import datetime
import json
bp = Blueprint('events', __name__, url_prefix='/api/events')
@bp.route('/<int:event_id>', methods=['GET'])
def get_event_detail(event_id):
"""获取事件详情"""
try:
event = Event.query.get(event_id)
if not event:
return jsonify({'success': False, 'error': '事件不存在'}), 404
# 获取相关股票
related_stocks = RelatedStock.query.filter_by(event_id=event_id).all()
stocks_data = []
for stock in related_stocks:
stocks_data.append({
'id': stock.id,
'stock_code': stock.stock_code,
'stock_name': stock.stock_name,
'sector': stock.sector,
'relation_desc': stock.relation_desc,
'correlation': stock.correlation,
'momentum': stock.momentum,
'created_at': stock.created_at.isoformat() if stock.created_at else None
})
# 获取相关概念
related_concepts = RelatedConcepts.query.filter_by(event_id=event_id).all()
concepts_data = []
for concept in related_concepts:
concepts_data.append({
'id': concept.id,
'concept_code': concept.concept_code,
'concept': concept.concept,
'reason': concept.reason,
'image_paths': concept.image_paths_list,
'created_at': concept.created_at.isoformat() if concept.created_at else None
})
event_data = {
'id': event.id,
'title': event.title,
'description': event.description,
'event_type': event.event_type,
'status': event.status,
'start_time': event.start_time.isoformat() if event.start_time else None,
'end_time': event.end_time.isoformat() if event.end_time else None,
'created_at': event.created_at.isoformat() if event.created_at else None,
'updated_at': event.updated_at.isoformat() if event.updated_at else None,
'hot_score': event.hot_score,
'view_count': event.view_count,
'trending_score': event.trending_score,
'post_count': event.post_count,
'follower_count': event.follower_count,
'related_industries': event.related_industries,
'keywords': event.keywords_list,
'files': event.files,
'importance': event.importance,
'related_avg_chg': event.related_avg_chg,
'related_max_chg': event.related_max_chg,
'related_week_chg': event.related_week_chg,
'invest_score': event.invest_score,
'expectation_surprise_score': event.expectation_surprise_score,
'related_stocks': stocks_data,
'related_concepts': concepts_data
}
return jsonify({
'success': True,
'data': event_data
})
except Exception as e:
print(f"Error getting event detail: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/stocks', methods=['GET'])
def get_related_stocks(event_id):
"""获取事件相关股票"""
try:
stocks = RelatedStock.query.filter_by(event_id=event_id).all()
stocks_data = []
for stock in stocks:
stocks_data.append({
'id': stock.id,
'stock_code': stock.stock_code,
'stock_name': stock.stock_name,
'sector': stock.sector,
'relation_desc': stock.relation_desc,
'correlation': stock.correlation,
'momentum': stock.momentum,
'created_at': stock.created_at.isoformat() if stock.created_at else None
})
return jsonify({
'success': True,
'data': stocks_data
})
except Exception as e:
print(f"Error getting related stocks: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/stocks', methods=['POST'])
def add_related_stock(event_id):
"""添加相关股票"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': '请提供数据'}), 400
# 检查事件是否存在
event = Event.query.get(event_id)
if not event:
return jsonify({'success': False, 'error': '事件不存在'}), 404
# 创建新的相关股票记录
new_stock = RelatedStock(
event_id=event_id,
stock_code=data['stock_code'],
stock_name=data.get('stock_name', ''),
sector=data.get('sector', ''),
relation_desc=data['relation_desc'],
correlation=data.get('correlation', 0.5),
momentum=data.get('momentum', '')
)
db.session.add(new_stock)
db.session.commit()
return jsonify({
'success': True,
'message': '相关股票添加成功',
'data': {
'id': new_stock.id,
'stock_code': new_stock.stock_code,
'stock_name': new_stock.stock_name,
'sector': new_stock.sector,
'relation_desc': new_stock.relation_desc,
'correlation': new_stock.correlation,
'momentum': new_stock.momentum
}
})
except Exception as e:
db.session.rollback()
print(f"Error adding related stock: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/stocks/<int:stock_id>', methods=['DELETE'])
def delete_related_stock(stock_id):
"""删除相关股票"""
try:
stock = RelatedStock.query.get(stock_id)
if not stock:
return jsonify({'success': False, 'error': '相关股票不存在'}), 404
db.session.delete(stock)
db.session.commit()
return jsonify({
'success': True,
'message': '相关股票删除成功'
})
except Exception as e:
db.session.rollback()
print(f"Error deleting related stock: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/concepts', methods=['GET'])
def get_related_concepts(event_id):
"""获取事件相关概念"""
try:
concepts = RelatedConcepts.query.filter_by(event_id=event_id).all()
concepts_data = []
for concept in concepts:
concepts_data.append({
'id': concept.id,
'concept_code': concept.concept_code,
'concept': concept.concept,
'reason': concept.reason,
'image_paths': concept.image_paths_list,
'created_at': concept.created_at.isoformat() if concept.created_at else None
})
return jsonify({
'success': True,
'data': concepts_data
})
except Exception as e:
print(f"Error getting related concepts: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/historical', methods=['GET'])
def get_historical_events(event_id):
"""获取历史事件"""
try:
historical_events = HistoricalEvent.query.filter_by(event_id=event_id).all()
events_data = []
for event in historical_events:
events_data.append({
'id': event.id,
'title': event.title,
'content': event.content,
'event_date': event.event_date.isoformat() if event.event_date else None,
'relevance': event.relevance,
'importance': event.importance,
'related_stock': event.related_stock,
'created_at': event.created_at.isoformat() if event.created_at else None
})
return jsonify({
'success': True,
'data': events_data
})
except Exception as e:
print(f"Error getting historical events: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/expectation-score', methods=['GET'])
def get_expectation_score(event_id):
"""获取超预期得分"""
try:
event = Event.query.get(event_id)
if not event:
return jsonify({'success': False, 'error': '事件不存在'}), 404
return jsonify({
'success': True,
'data': {
'invest_score': event.invest_score,
'expectation_surprise_score': event.expectation_surprise_score
}
})
except Exception as e:
print(f"Error getting expectation score: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/follow', methods=['POST'])
def toggle_event_follow(event_id):
"""关注/取消关注事件"""
try:
# 这里需要用户认证,暂时返回成功
return jsonify({
'success': True,
'message': '关注状态更新成功'
})
except Exception as e:
print(f"Error toggling event follow: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/transmission', methods=['GET'])
def get_transmission_chain(event_id):
"""获取事件传导链"""
try:
# 获取传导节点
nodes = EventTransmissionNode.query.filter_by(event_id=event_id).all()
nodes_data = []
for node in nodes:
nodes_data.append({
'id': node.id,
'node_type': node.node_type,
'node_name': node.node_name,
'node_description': node.node_description,
'importance_score': node.importance_score,
'stock_code': node.stock_code,
'is_main_event': node.is_main_event
})
# 获取传导边
edges = EventTransmissionEdge.query.filter_by(event_id=event_id).all()
edges_data = []
for edge in edges:
edges_data.append({
'id': edge.id,
'from_node_id': edge.from_node_id,
'to_node_id': edge.to_node_id,
'transmission_type': edge.transmission_type,
'transmission_mechanism': edge.transmission_mechanism,
'direction': edge.direction,
'strength': edge.strength,
'impact': edge.impact,
'is_circular': edge.is_circular
})
return jsonify({
'success': True,
'data': {
'nodes': nodes_data,
'edges': edges_data
}
})
except Exception as e:
print(f"Error getting transmission chain: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/sankey-data')
def get_event_sankey_data(event_id):
"""获取事件桑基图数据"""
try:
flows = EventSankeyFlow.query.filter_by(event_id=event_id).all()
flows_data = []
for flow in flows:
flows_data.append({
'id': flow.id,
'source_node': flow.source_node,
'source_type': flow.source_type,
'source_level': flow.source_level,
'target_node': flow.target_node,
'target_type': flow.target_type,
'target_level': flow.target_level,
'flow_value': float(flow.flow_value),
'flow_ratio': float(flow.flow_ratio),
'transmission_path': flow.transmission_path,
'impact_description': flow.impact_description,
'evidence_strength': flow.evidence_strength
})
return jsonify({
'success': True,
'data': flows_data
})
except Exception as e:
print(f"Error getting sankey data: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/chain-analysis')
def get_event_chain_analysis(event_id):
"""获取事件链分析"""
try:
# 这里可以添加更复杂的链分析逻辑
return jsonify({
'success': True,
'data': {
'event_id': event_id,
'analysis': '链分析数据'
}
})
except Exception as e:
print(f"Error getting chain analysis: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@bp.route('/<int:event_id>/chain-node/<int:node_id>', methods=['GET'])
def get_chain_node_detail(event_id, node_id):
"""获取链节点详情"""
try:
node = EventTransmissionNode.query.filter_by(
event_id=event_id,
id=node_id
).first()
if not node:
return jsonify({'success': False, 'error': '节点不存在'}), 404
return jsonify({
'success': True,
'data': {
'id': node.id,
'node_type': node.node_type,
'node_name': node.node_name,
'node_description': node.node_description,
'importance_score': node.importance_score,
'stock_code': node.stock_code,
'is_main_event': node.is_main_event
}
})
except Exception as e:
print(f"Error getting chain node detail: {e}")
return jsonify({'success': False, 'error': str(e)}), 500