#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 模拟盘后台处理脚本 负责处理待成交订单、更新持仓收益等后台计算任务 与主应用分离,可以独立运行 """ import os import sys import time import logging from datetime import datetime, timedelta, time as dt_time from decimal import Decimal from typing import List, Optional import pytz # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 导入主应用的数据库连接和模型 from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from clickhouse_driver import Client as ClickhouseClient # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('simulation_processor.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据库配置 DATABASE_URL = "mysql+pymysql://root:Zzl5588161!@localhost:3306/stock?charset=utf8mb4" CLICKHOUSE_HOST = "192.168.1.58" CLICKHOUSE_PORT = 9000 # 创建数据库连接 engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine) # ClickHouse客户端 clickhouse_client = ClickhouseClient(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT,user='default', password='Zzl33818!', database='stock') # 北京时区 beijing_tz = pytz.timezone('Asia/Shanghai') def beijing_now(): """获取北京时间""" return datetime.now(beijing_tz) def is_trading_time(): """判断是否为交易时间""" now = beijing_now() if now.weekday() >= 5: # 周六日 return False current_time = now.time() morning_start = dt_time(9, 30) morning_end = dt_time(11, 30) afternoon_start = dt_time(13, 0) afternoon_end = dt_time(15, 0) return (morning_start <= current_time <= morning_end) or \ (afternoon_start <= current_time <= afternoon_end) def get_latest_price_from_clickhouse(stock_code: str) -> Optional[float]: """从ClickHouse获取最新价格""" try: # 确保股票代码包含后缀 if '.' not in stock_code: stock_code = f"{stock_code}.SH" if stock_code.startswith('6') else f"{stock_code}.SZ" query = """ SELECT close, timestamp FROM stock_minute WHERE code = %(code)s AND timestamp >= today() - 7 ORDER BY timestamp DESC LIMIT 1 """ result = clickhouse_client.execute(query, {'code': stock_code}) if result: return float(result[0][0]) # 如果没有分钟数据,尝试获取日线数据 daily_query = """ SELECT close FROM stock_daily WHERE code = %(code)s ORDER BY date DESC LIMIT 1 """ daily_result = clickhouse_client.execute(daily_query, {'code': stock_code}) if daily_result: return float(daily_result[0][0]) return None except Exception as e: logger.error(f"获取股票 {stock_code} 价格失败: {e}") return None def process_pending_orders(): """处理待成交订单""" session = SessionLocal() try: # 查询所有待成交的订单 pending_orders = session.execute(text(""" SELECT id, order_no, stock_code, stock_name, order_type, order_qty, order_time, account_id FROM simulation_orders WHERE status = 'PENDING' AND order_time <= NOW() - INTERVAL 1 MINUTE ORDER BY order_time ASC """)).fetchall() logger.info(f"找到 {len(pending_orders)} 个待处理订单") for order_data in pending_orders: try: order_id = order_data[0] stock_code = order_data[2] order_type = order_data[4] order_qty = order_data[5] account_id = order_data[7] # 获取最新价格 latest_price = get_latest_price_from_clickhouse(stock_code) if latest_price: # 有价格数据,执行成交 filled_amount = latest_price * order_qty # 计算费用 commission = max(filled_amount * 0.00025, 5.0) # 万分之2.5,最低5元 stamp_tax = filled_amount * 0.001 if order_type == 'SELL' else 0 # 卖出印花税 transfer_fee = filled_amount * 0.00002 # 万分之0.2 total_fee = commission + stamp_tax + transfer_fee # 更新订单状态 session.execute(text(""" UPDATE simulation_orders SET status = 'FILLED', filled_qty = :filled_qty, filled_price = :filled_price, filled_amount = :filled_amount, commission = :commission, stamp_tax = :stamp_tax, transfer_fee = :transfer_fee, total_fee = :total_fee, filled_time = NOW() WHERE id = :order_id """), { 'order_id': order_id, 'filled_qty': order_qty, 'filled_price': latest_price, 'filled_amount': filled_amount, 'commission': commission, 'stamp_tax': stamp_tax, 'transfer_fee': transfer_fee, 'total_fee': total_fee }) # 处理账户和持仓更新 if order_type == 'BUY': process_buy_order_settlement(session, order_data, latest_price, total_fee) else: process_sell_order_settlement(session, order_data, latest_price, total_fee) logger.info(f"订单 {order_data[1]} 成交,价格: {latest_price}") else: logger.warning(f"订单 {order_data[1]} 无法获取价格数据,继续等待") except Exception as e: logger.error(f"处理订单 {order_data[1]} 时出错: {e}") session.rollback() continue session.commit() except Exception as e: logger.error(f"处理待成交订单时出错: {e}") session.rollback() finally: session.close() def process_buy_order_settlement(session, order_data, price: float, total_fee: float): """处理买入订单结算""" account_id = order_data[7] stock_code = order_data[2] stock_name = order_data[3] order_qty = order_data[5] total_cost = price * order_qty + total_fee # 扣除账户资金 session.execute(text(""" UPDATE simulation_accounts SET available_cash = available_cash - :total_cost WHERE id = :account_id """), {'total_cost': total_cost, 'account_id': account_id}) # 更新或创建持仓 existing_position = session.execute(text(""" SELECT id, position_qty, avg_cost FROM simulation_positions WHERE account_id = :account_id AND stock_code = :stock_code """), {'account_id': account_id, 'stock_code': stock_code}).fetchone() if existing_position: # 更新现有持仓 old_qty = existing_position[1] old_cost = float(existing_position[2]) new_qty = old_qty + order_qty new_avg_cost = (old_qty * old_cost + total_cost) / new_qty session.execute(text(""" UPDATE simulation_positions SET position_qty = :new_qty, available_qty = :new_qty, avg_cost = :new_avg_cost, updated_at = NOW() WHERE id = :position_id """), { 'position_id': existing_position[0], 'new_qty': new_qty, 'new_avg_cost': new_avg_cost }) else: # 创建新持仓 session.execute(text(""" INSERT INTO simulation_positions (account_id, stock_code, stock_name, position_qty, available_qty, avg_cost, current_price, market_value, profit, profit_rate, created_at, updated_at) VALUES (:account_id, :stock_code, :stock_name, :qty, :qty, :avg_cost, :price, :market_value, 0, 0, NOW(), NOW()) """), { 'account_id': account_id, 'stock_code': stock_code, 'stock_name': stock_name, 'qty': order_qty, 'avg_cost': total_cost / order_qty, 'price': price, 'market_value': price * order_qty }) def process_sell_order_settlement(session, order_data, price: float, total_fee: float): """处理卖出订单结算""" account_id = order_data[7] stock_code = order_data[2] order_qty = order_data[5] sell_amount = price * order_qty net_amount = sell_amount - total_fee # 增加账户资金 session.execute(text(""" UPDATE simulation_accounts SET available_cash = available_cash + :net_amount WHERE id = :account_id """), {'net_amount': net_amount, 'account_id': account_id}) # 更新持仓 session.execute(text(""" UPDATE simulation_positions SET position_qty = position_qty - :qty, available_qty = available_qty - :qty, updated_at = NOW() WHERE account_id = :account_id AND stock_code = :stock_code """), {'qty': order_qty, 'account_id': account_id, 'stock_code': stock_code}) # 删除清仓的持仓 session.execute(text(""" DELETE FROM simulation_positions WHERE account_id = :account_id AND stock_code = :stock_code AND position_qty <= 0 """), {'account_id': account_id, 'stock_code': stock_code}) def update_all_positions_market_value(): """更新所有持仓的市值和收益""" session = SessionLocal() try: positions = session.execute(text(""" SELECT id, stock_code, position_qty, avg_cost FROM simulation_positions WHERE position_qty > 0 """)).fetchall() logger.info(f"更新 {len(positions)} 个持仓的市值") for position in positions: position_id, stock_code, qty, avg_cost = position # 获取最新价格 latest_price = get_latest_price_from_clickhouse(stock_code) if latest_price: market_value = latest_price * qty cost_value = float(avg_cost) * qty profit = market_value - cost_value profit_rate = (profit / cost_value) * 100 if cost_value > 0 else 0 session.execute(text(""" UPDATE simulation_positions SET current_price = :price, market_value = :market_value, profit = :profit, profit_rate = :profit_rate, updated_at = NOW() WHERE id = :position_id """), { 'position_id': position_id, 'price': latest_price, 'market_value': market_value, 'profit': profit, 'profit_rate': profit_rate }) session.commit() logger.info("持仓市值更新完成") except Exception as e: logger.error(f"更新持仓市值时出错: {e}") session.rollback() finally: session.close() def update_all_accounts_assets(): """更新所有账户的总资产""" session = SessionLocal() try: # 获取所有账户 accounts = session.execute(text(""" SELECT id, initial_capital, available_cash, frozen_cash FROM simulation_accounts """)).fetchall() for account in accounts: account_id, initial_capital, available_cash, frozen_cash = account # 计算持仓总市值 total_position_value = session.execute(text(""" SELECT COALESCE(SUM(market_value), 0) FROM simulation_positions WHERE account_id = :account_id """), {'account_id': account_id}).scalar() # 计算总资产和收益 total_assets = float(available_cash) + float(frozen_cash) + float(total_position_value or 0) total_profit = total_assets - float(initial_capital) total_profit_rate = (total_profit / float(initial_capital)) * 100 if initial_capital > 0 else 0 # 更新账户 session.execute(text(""" UPDATE simulation_accounts SET position_value = :position_value, total_assets = :total_assets, total_profit = :total_profit, total_profit_rate = :total_profit_rate, updated_at = NOW() WHERE id = :account_id """), { 'account_id': account_id, 'position_value': total_position_value or 0, 'total_assets': total_assets, 'total_profit': total_profit, 'total_profit_rate': total_profit_rate }) session.commit() logger.info(f"更新了 {len(accounts)} 个账户的资产") except Exception as e: logger.error(f"更新账户资产时出错: {e}") session.rollback() finally: session.close() def process_t1_settlement(): """处理T+1结算(每日收盘后运行)""" session = SessionLocal() try: # 获取所有需要结算的持仓 positions = session.execute(text(""" SELECT id, account_id, stock_code, frozen_qty FROM simulation_positions WHERE frozen_qty > 0 """)).fetchall() logger.info(f"开始T+1结算,共 {len(positions)} 个持仓需要结算") for position in positions: position_id, account_id, stock_code, frozen_qty = position # 将冻结数量转为可用数量 session.execute(text(""" UPDATE simulation_positions SET available_qty = available_qty + :frozen_qty, frozen_qty = 0, updated_at = NOW() WHERE id = :position_id """), {'frozen_qty': frozen_qty, 'position_id': position_id}) logger.info(f"持仓 {stock_code} T+1结算完成: {frozen_qty} 股从冻结转为可用") session.commit() logger.info(f"T+1结算完成,共处理 {len(positions)} 个持仓") except Exception as e: logger.error(f"T+1结算失败: {e}") session.rollback() finally: session.close() def generate_daily_stats(): """生成日统计数据(每日收盘后调用)""" session = SessionLocal() try: today = beijing_now().date() # 获取所有账户 accounts = session.execute(text(""" SELECT id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate FROM simulation_accounts """)).fetchall() logger.info(f"开始生成 {today} 的日统计数据,共 {len(accounts)} 个账户") for account in accounts: account_id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate = account # 检查是否已存在今日统计 existing_stat = session.execute(text(""" SELECT id FROM simulation_daily_stats WHERE account_id = :account_id AND stat_date = :stat_date """), {'account_id': account_id, 'stat_date': today}).fetchone() if existing_stat: logger.info(f"账户 {account_id} 的 {today} 统计数据已存在,跳过") continue # 获取昨日统计数据作为期初资产 yesterday = today - timedelta(days=1) yesterday_stat = session.execute(text(""" SELECT closing_assets, total_profit, total_profit_rate FROM simulation_daily_stats WHERE account_id = :account_id AND stat_date = :stat_date ORDER BY stat_date DESC LIMIT 1 """), {'account_id': account_id, 'stat_date': yesterday}).fetchone() if yesterday_stat: opening_assets = float(yesterday_stat[0]) yesterday_total_profit = float(yesterday_stat[1]) yesterday_total_profit_rate = float(yesterday_stat[2]) else: # 如果没有昨日数据,使用初始资金作为期初资产 opening_assets = float(initial_capital) yesterday_total_profit = 0 yesterday_total_profit_rate = 0 # 计算日盈亏 closing_assets = float(total_assets) daily_profit = closing_assets - opening_assets daily_profit_rate = (daily_profit / opening_assets) * 100 if opening_assets > 0 else 0 # 计算累计盈亏 current_total_profit = float(total_profit) current_total_profit_rate = float(total_profit_rate) # 统计当日交易次数 trade_count = session.execute(text(""" SELECT COUNT(*) FROM simulation_transactions WHERE account_id = :account_id AND DATE(transaction_time) = :stat_date """), {'account_id': account_id, 'stat_date': today}).scalar() # 统计当日盈利和亏损次数 win_count = 0 loss_count = 0 max_profit = 0 max_loss = 0 if trade_count > 0: # 获取当日所有卖出交易 sell_transactions = session.execute(text(""" SELECT st.transaction_price, st.transaction_qty, st.stock_code FROM simulation_transactions st WHERE st.account_id = :account_id AND st.transaction_type = 'SELL' AND DATE(st.transaction_time) = :stat_date """), {'account_id': account_id, 'stat_date': today}).fetchall() for sell_trans in sell_transactions: sell_price, sell_qty, stock_code = sell_trans # 查找对应的持仓成本 position = session.execute(text(""" SELECT avg_cost FROM simulation_positions WHERE account_id = :account_id AND stock_code = :stock_code """), {'account_id': account_id, 'stock_code': stock_code}).fetchone() if position: avg_cost = float(position[0]) profit = (sell_price - avg_cost) * sell_qty if profit > 0: win_count += 1 max_profit = max(max_profit, profit) else: loss_count += 1 max_loss = min(max_loss, profit) # 插入日统计数据 session.execute(text(""" INSERT INTO simulation_daily_stats (account_id, stat_date, opening_assets, closing_assets, daily_profit, daily_profit_rate, total_profit, total_profit_rate, trade_count, win_count, loss_count, max_profit, max_loss, created_at) VALUES (:account_id, :stat_date, :opening_assets, :closing_assets, :daily_profit, :daily_profit_rate, :total_profit, :total_profit_rate, :trade_count, :win_count, :loss_count, :max_profit, :max_loss, NOW()) """), { 'account_id': account_id, 'stat_date': today, 'opening_assets': opening_assets, 'closing_assets': closing_assets, 'daily_profit': daily_profit, 'daily_profit_rate': daily_profit_rate, 'total_profit': current_total_profit, 'total_profit_rate': current_total_profit_rate, 'trade_count': trade_count, 'win_count': win_count, 'loss_count': loss_count, 'max_profit': max_profit, 'max_loss': max_loss }) logger.info(f"账户 {account_id} 日统计完成: 期初 {opening_assets:.2f}, 期末 {closing_assets:.2f}, 日盈亏 {daily_profit:.2f}") session.commit() logger.info(f"日统计数据生成完成,共处理 {len(accounts)} 个账户") except Exception as e: logger.error(f"生成日统计数据时出错: {e}") session.rollback() finally: session.close() def main(): """主循环""" logger.info("模拟盘后台处理器启动") last_daily_stats_date = None # 记录上次生成日统计的日期 while True: try: current_time = beijing_now() logger.info(f"开始处理循环 - {current_time}") # 1. 处理待成交订单(每分钟检查) process_pending_orders() # 2. 更新持仓市值(交易时间每分钟更新,非交易时间每10分钟更新) if is_trading_time(): update_all_positions_market_value() update_all_accounts_assets() sleep_time = 60 # 交易时间每分钟更新 else: # 非交易时间,每10分钟更新一次 if current_time.minute % 10 == 0: update_all_positions_market_value() update_all_accounts_assets() sleep_time = 60 # 但仍然每分钟检查待成交订单 # 3. T+1结算和日结算逻辑(每日收盘后执行一次) today = current_time.date() if last_daily_stats_date != today: # 检查是否已过收盘时间(15:00)或者是非交易日 if not is_trading_time() and (current_time.hour >= 15 or current_time.weekday() >= 5): logger.info("开始执行日结算...") # 先执行T+1结算 process_t1_settlement() # 再生成日统计数据 generate_daily_stats() last_daily_stats_date = today logger.info("日结算完成") logger.info(f"处理完成,等待 {sleep_time} 秒") time.sleep(sleep_time) except KeyboardInterrupt: logger.info("收到停止信号,正在退出...") break except Exception as e: logger.error(f"主循环出错: {e}") time.sleep(30) # 出错后等待30秒再重试 if __name__ == "__main__": main()