from flask import Blueprint, request, jsonify from app import db from clickhouse_driver import Client import pandas as pd from datetime import datetime, timedelta import pytz bp = Blueprint('stocks', __name__, url_prefix='/api/stock') def get_clickhouse_client(): """获取ClickHouse客户端""" return Client('localhost', port=9000, user='default', password='', database='default') @bp.route('/quotes', methods=['GET', 'POST']) def get_stock_quotes(): """获取股票实时报价""" try: if request.method == 'GET': # GET 请求从 URL 参数获取数据 codes = request.args.get('codes', '').split(',') event_time_str = request.args.get('event_time') else: # POST 请求从 JSON 获取数据 codes = request.json.get('codes', []) event_time_str = request.json.get('event_time') if not codes: return jsonify({'success': False, 'error': '请提供股票代码'}), 400 # 过滤空字符串 codes = [code.strip() for code in codes if code.strip()] if not codes: return jsonify({'success': False, 'error': '请提供有效的股票代码'}), 400 # 解析事件时间 event_time = None if event_time_str: try: event_time = datetime.fromisoformat(event_time_str.replace('Z', '+00:00')) except ValueError: return jsonify({'success': False, 'error': '事件时间格式错误'}), 400 # 获取当前时间 now = datetime.now(pytz.timezone('Asia/Shanghai')) # 如果提供了事件时间,使用事件时间;否则使用当前时间 target_time = event_time if event_time else now # 获取交易日和交易时间 def get_trading_day_and_times(event_datetime): """获取交易日和交易时间列表""" # 这里简化处理,实际应该查询交易日历 trading_day = event_datetime.strftime('%Y-%m-%d') # 生成交易时间列表 (9:30-11:30, 13:00-15:00) morning_times = [f"{trading_day} {hour:02d}:{minute:02d}" for hour in range(9, 12) for minute in range(0, 60, 1) if not (hour == 9 and minute < 30) and not (hour == 11 and minute > 30)] afternoon_times = [f"{trading_day} {hour:02d}:{minute:02d}" for hour in range(13, 16) for minute in range(0, 60, 1)] return trading_day, morning_times + afternoon_times trading_day, trading_times = get_trading_day_and_times(target_time) # 模拟股票数据 results = {} for code in codes: # 这里应该从ClickHouse或其他数据源获取真实数据 # 现在使用模拟数据 import random base_price = 10.0 + random.random() * 20.0 change = (random.random() - 0.5) * 2.0 results[code] = { 'price': round(base_price, 2), 'change': round(change, 2), 'name': f'股票{code}' } return jsonify({ 'success': True, 'data': results }) except Exception as e: print(f"Error getting stock quotes: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @bp.route('//kline') def get_stock_kline(stock_code): """获取股票K线数据""" try: chart_type = request.args.get('type', 'daily') event_time_str = request.args.get('event_time') if not event_time_str: return jsonify({'success': False, 'error': '请提供事件时间'}), 400 try: event_datetime = datetime.fromisoformat(event_time_str.replace('Z', '+00:00')) except ValueError: return jsonify({'success': False, 'error': '事件时间格式错误'}), 400 # 获取股票名称(这里简化处理) stock_name = f'股票{stock_code}' if chart_type == 'daily': return get_daily_kline(stock_code, event_datetime, stock_name) elif chart_type == 'minute': return get_minute_kline(stock_code, event_datetime, stock_name) elif chart_type == 'timeline': return get_timeline_data(stock_code, event_datetime, stock_name) else: return jsonify({'error': f'Unsupported chart type: {chart_type}'}), 400 except Exception as e: print(f"Error getting stock kline: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def get_daily_kline(stock_code, event_datetime, stock_name): """获取日K线数据""" try: # 模拟日K线数据 data = [] base_price = 10.0 for i in range(30): date = (event_datetime - timedelta(days=30-i)).strftime('%Y-%m-%d') open_price = base_price + (i * 0.1) + (i % 3 - 1) * 0.5 close_price = open_price + (i % 5 - 2) * 0.3 high_price = max(open_price, close_price) + 0.2 low_price = min(open_price, close_price) - 0.2 volume = 1000000 + i * 50000 data.append({ 'date': date, 'open': round(open_price, 2), 'close': round(close_price, 2), 'high': round(high_price, 2), 'low': round(low_price, 2), 'volume': volume }) return jsonify({ 'code': stock_code, 'name': stock_name, 'trade_date': event_datetime.strftime('%Y-%m-%d'), 'data': data }) except Exception as e: print(f"Error getting daily kline: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def get_minute_kline(stock_code, event_datetime, stock_name): """获取分钟K线数据""" try: # 模拟分钟K线数据 data = [] base_price = 10.0 trading_times = [] # 生成交易时间 for hour in range(9, 16): if hour == 12: continue for minute in range(0, 60): if (hour == 9 and minute < 30) or (hour == 11 and minute > 30): continue trading_times.append(f"{hour:02d}:{minute:02d}") for i, time in enumerate(trading_times): open_price = base_price + (i * 0.01) + (i % 10 - 5) * 0.02 close_price = open_price + (i % 7 - 3) * 0.01 high_price = max(open_price, close_price) + 0.01 low_price = min(open_price, close_price) - 0.01 volume = 50000 + i * 1000 data.append({ 'time': time, 'open': round(open_price, 2), 'close': round(close_price, 2), 'high': round(high_price, 2), 'low': round(low_price, 2), 'volume': volume }) return jsonify({ 'code': stock_code, 'name': stock_name, 'trade_date': event_datetime.strftime('%Y-%m-%d'), 'data': data }) except Exception as e: print(f"Error getting minute kline: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def get_timeline_data(stock_code, event_datetime, stock_name): """获取分时图数据""" try: # 模拟分时图数据 data = [] base_price = 10.0 trading_times = [] # 生成交易时间 for hour in range(9, 16): if hour == 12: continue for minute in range(0, 60): if (hour == 9 and minute < 30) or (hour == 11 and minute > 30): continue trading_times.append(f"{hour:02d}:{minute:02d}") for i, time in enumerate(trading_times): price = base_price + (i * 0.01) + (i % 10 - 5) * 0.02 avg_price = price + (i % 5 - 2) * 0.01 volume = 50000 + i * 1000 data.append({ 'time': time, 'price': round(price, 2), 'avg_price': round(avg_price, 2), 'volume': volume }) return jsonify({ 'code': stock_code, 'name': stock_name, 'trade_date': event_datetime.strftime('%Y-%m-%d'), 'data': data }) except Exception as e: print(f"Error getting timeline data: {e}") return jsonify({'success': False, 'error': str(e)}), 500