Files
vf_react/simulation_background_processor.py
2025-10-11 12:02:01 +08:00

606 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模拟盘后台处理脚本
负责处理待成交订单、更新持仓收益等后台计算任务
与主应用分离,可以独立运行
"""
import os
import sys
import time
import logging
from datetime import datetime, timedelta, time as dt_time
from decimal import Decimal
from typing import List, Optional
import pytz
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入主应用的数据库连接和模型
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from clickhouse_driver import Client as ClickhouseClient
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('simulation_processor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 数据库配置
DATABASE_URL = "mysql+pymysql://root:Zzl5588161!@localhost:3306/stock?charset=utf8mb4"
CLICKHOUSE_HOST = "192.168.1.58"
CLICKHOUSE_PORT = 9000
# 创建数据库连接
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine)
# ClickHouse客户端
clickhouse_client = ClickhouseClient(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT,user='default',
password='Zzl33818!',
database='stock')
# 北京时区
beijing_tz = pytz.timezone('Asia/Shanghai')
def beijing_now():
"""获取北京时间"""
return datetime.now(beijing_tz)
def is_trading_time():
"""判断是否为交易时间"""
now = beijing_now()
if now.weekday() >= 5: # 周六日
return False
current_time = now.time()
morning_start = dt_time(9, 30)
morning_end = dt_time(11, 30)
afternoon_start = dt_time(13, 0)
afternoon_end = dt_time(15, 0)
return (morning_start <= current_time <= morning_end) or \
(afternoon_start <= current_time <= afternoon_end)
def get_latest_price_from_clickhouse(stock_code: str) -> Optional[float]:
"""从ClickHouse获取最新价格"""
try:
# 确保股票代码包含后缀
if '.' not in stock_code:
stock_code = f"{stock_code}.SH" if stock_code.startswith('6') else f"{stock_code}.SZ"
query = """
SELECT close, timestamp
FROM stock_minute
WHERE code = %(code)s
AND timestamp >= today() - 7
ORDER BY timestamp DESC
LIMIT 1
"""
result = clickhouse_client.execute(query, {'code': stock_code})
if result:
return float(result[0][0])
# 如果没有分钟数据,尝试获取日线数据
daily_query = """
SELECT close
FROM stock_daily
WHERE code = %(code)s
ORDER BY date DESC
LIMIT 1
"""
daily_result = clickhouse_client.execute(daily_query, {'code': stock_code})
if daily_result:
return float(daily_result[0][0])
return None
except Exception as e:
logger.error(f"获取股票 {stock_code} 价格失败: {e}")
return None
def process_pending_orders():
"""处理待成交订单"""
session = SessionLocal()
try:
# 查询所有待成交的订单
pending_orders = session.execute(text("""
SELECT id, order_no, stock_code, stock_name, order_type, order_qty,
order_time, account_id
FROM simulation_orders
WHERE status = 'PENDING'
AND order_time <= NOW() - INTERVAL 1 MINUTE
ORDER BY order_time ASC
""")).fetchall()
logger.info(f"找到 {len(pending_orders)} 个待处理订单")
for order_data in pending_orders:
try:
order_id = order_data[0]
stock_code = order_data[2]
order_type = order_data[4]
order_qty = order_data[5]
account_id = order_data[7]
# 获取最新价格
latest_price = get_latest_price_from_clickhouse(stock_code)
if latest_price:
# 有价格数据,执行成交
filled_amount = latest_price * order_qty
# 计算费用
commission = max(filled_amount * 0.00025, 5.0) # 万分之2.5最低5元
stamp_tax = filled_amount * 0.001 if order_type == 'SELL' else 0 # 卖出印花税
transfer_fee = filled_amount * 0.00002 # 万分之0.2
total_fee = commission + stamp_tax + transfer_fee
# 更新订单状态
session.execute(text("""
UPDATE simulation_orders
SET status = 'FILLED',
filled_qty = :filled_qty,
filled_price = :filled_price,
filled_amount = :filled_amount,
commission = :commission,
stamp_tax = :stamp_tax,
transfer_fee = :transfer_fee,
total_fee = :total_fee,
filled_time = NOW()
WHERE id = :order_id
"""), {
'order_id': order_id,
'filled_qty': order_qty,
'filled_price': latest_price,
'filled_amount': filled_amount,
'commission': commission,
'stamp_tax': stamp_tax,
'transfer_fee': transfer_fee,
'total_fee': total_fee
})
# 处理账户和持仓更新
if order_type == 'BUY':
process_buy_order_settlement(session, order_data, latest_price, total_fee)
else:
process_sell_order_settlement(session, order_data, latest_price, total_fee)
logger.info(f"订单 {order_data[1]} 成交,价格: {latest_price}")
else:
logger.warning(f"订单 {order_data[1]} 无法获取价格数据,继续等待")
except Exception as e:
logger.error(f"处理订单 {order_data[1]} 时出错: {e}")
session.rollback()
continue
session.commit()
except Exception as e:
logger.error(f"处理待成交订单时出错: {e}")
session.rollback()
finally:
session.close()
def process_buy_order_settlement(session, order_data, price: float, total_fee: float):
"""处理买入订单结算"""
account_id = order_data[7]
stock_code = order_data[2]
stock_name = order_data[3]
order_qty = order_data[5]
total_cost = price * order_qty + total_fee
# 扣除账户资金
session.execute(text("""
UPDATE simulation_accounts
SET available_cash = available_cash - :total_cost
WHERE id = :account_id
"""), {'total_cost': total_cost, 'account_id': account_id})
# 更新或创建持仓
existing_position = session.execute(text("""
SELECT id, position_qty, avg_cost FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
if existing_position:
# 更新现有持仓
old_qty = existing_position[1]
old_cost = float(existing_position[2])
new_qty = old_qty + order_qty
new_avg_cost = (old_qty * old_cost + total_cost) / new_qty
session.execute(text("""
UPDATE simulation_positions
SET position_qty = :new_qty,
available_qty = :new_qty,
avg_cost = :new_avg_cost,
updated_at = NOW()
WHERE id = :position_id
"""), {
'position_id': existing_position[0],
'new_qty': new_qty,
'new_avg_cost': new_avg_cost
})
else:
# 创建新持仓
session.execute(text("""
INSERT INTO simulation_positions
(account_id, stock_code, stock_name, position_qty, available_qty, avg_cost,
current_price, market_value, profit, profit_rate, created_at, updated_at)
VALUES (:account_id, :stock_code, :stock_name, :qty, :qty, :avg_cost,
:price, :market_value, 0, 0, NOW(), NOW())
"""), {
'account_id': account_id,
'stock_code': stock_code,
'stock_name': stock_name,
'qty': order_qty,
'avg_cost': total_cost / order_qty,
'price': price,
'market_value': price * order_qty
})
def process_sell_order_settlement(session, order_data, price: float, total_fee: float):
"""处理卖出订单结算"""
account_id = order_data[7]
stock_code = order_data[2]
order_qty = order_data[5]
sell_amount = price * order_qty
net_amount = sell_amount - total_fee
# 增加账户资金
session.execute(text("""
UPDATE simulation_accounts
SET available_cash = available_cash + :net_amount
WHERE id = :account_id
"""), {'net_amount': net_amount, 'account_id': account_id})
# 更新持仓
session.execute(text("""
UPDATE simulation_positions
SET position_qty = position_qty - :qty,
available_qty = available_qty - :qty,
updated_at = NOW()
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'qty': order_qty, 'account_id': account_id, 'stock_code': stock_code})
# 删除清仓的持仓
session.execute(text("""
DELETE FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code AND position_qty <= 0
"""), {'account_id': account_id, 'stock_code': stock_code})
def update_all_positions_market_value():
"""更新所有持仓的市值和收益"""
session = SessionLocal()
try:
positions = session.execute(text("""
SELECT id, stock_code, position_qty, avg_cost
FROM simulation_positions
WHERE position_qty > 0
""")).fetchall()
logger.info(f"更新 {len(positions)} 个持仓的市值")
for position in positions:
position_id, stock_code, qty, avg_cost = position
# 获取最新价格
latest_price = get_latest_price_from_clickhouse(stock_code)
if latest_price:
market_value = latest_price * qty
cost_value = float(avg_cost) * qty
profit = market_value - cost_value
profit_rate = (profit / cost_value) * 100 if cost_value > 0 else 0
session.execute(text("""
UPDATE simulation_positions
SET current_price = :price,
market_value = :market_value,
profit = :profit,
profit_rate = :profit_rate,
updated_at = NOW()
WHERE id = :position_id
"""), {
'position_id': position_id,
'price': latest_price,
'market_value': market_value,
'profit': profit,
'profit_rate': profit_rate
})
session.commit()
logger.info("持仓市值更新完成")
except Exception as e:
logger.error(f"更新持仓市值时出错: {e}")
session.rollback()
finally:
session.close()
def update_all_accounts_assets():
"""更新所有账户的总资产"""
session = SessionLocal()
try:
# 获取所有账户
accounts = session.execute(text("""
SELECT id, initial_capital, available_cash, frozen_cash
FROM simulation_accounts
""")).fetchall()
for account in accounts:
account_id, initial_capital, available_cash, frozen_cash = account
# 计算持仓总市值
total_position_value = session.execute(text("""
SELECT COALESCE(SUM(market_value), 0)
FROM simulation_positions
WHERE account_id = :account_id
"""), {'account_id': account_id}).scalar()
# 计算总资产和收益
total_assets = float(available_cash) + float(frozen_cash) + float(total_position_value or 0)
total_profit = total_assets - float(initial_capital)
total_profit_rate = (total_profit / float(initial_capital)) * 100 if initial_capital > 0 else 0
# 更新账户
session.execute(text("""
UPDATE simulation_accounts
SET position_value = :position_value,
total_assets = :total_assets,
total_profit = :total_profit,
total_profit_rate = :total_profit_rate,
updated_at = NOW()
WHERE id = :account_id
"""), {
'account_id': account_id,
'position_value': total_position_value or 0,
'total_assets': total_assets,
'total_profit': total_profit,
'total_profit_rate': total_profit_rate
})
session.commit()
logger.info(f"更新了 {len(accounts)} 个账户的资产")
except Exception as e:
logger.error(f"更新账户资产时出错: {e}")
session.rollback()
finally:
session.close()
def process_t1_settlement():
"""处理T+1结算每日收盘后运行"""
session = SessionLocal()
try:
# 获取所有需要结算的持仓
positions = session.execute(text("""
SELECT id, account_id, stock_code, frozen_qty
FROM simulation_positions
WHERE frozen_qty > 0
""")).fetchall()
logger.info(f"开始T+1结算{len(positions)} 个持仓需要结算")
for position in positions:
position_id, account_id, stock_code, frozen_qty = position
# 将冻结数量转为可用数量
session.execute(text("""
UPDATE simulation_positions
SET available_qty = available_qty + :frozen_qty,
frozen_qty = 0,
updated_at = NOW()
WHERE id = :position_id
"""), {'frozen_qty': frozen_qty, 'position_id': position_id})
logger.info(f"持仓 {stock_code} T+1结算完成: {frozen_qty} 股从冻结转为可用")
session.commit()
logger.info(f"T+1结算完成共处理 {len(positions)} 个持仓")
except Exception as e:
logger.error(f"T+1结算失败: {e}")
session.rollback()
finally:
session.close()
def generate_daily_stats():
"""生成日统计数据(每日收盘后调用)"""
session = SessionLocal()
try:
today = beijing_now().date()
# 获取所有账户
accounts = session.execute(text("""
SELECT id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate
FROM simulation_accounts
""")).fetchall()
logger.info(f"开始生成 {today} 的日统计数据,共 {len(accounts)} 个账户")
for account in accounts:
account_id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate = account
# 检查是否已存在今日统计
existing_stat = session.execute(text("""
SELECT id FROM simulation_daily_stats
WHERE account_id = :account_id AND stat_date = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).fetchone()
if existing_stat:
logger.info(f"账户 {account_id}{today} 统计数据已存在,跳过")
continue
# 获取昨日统计数据作为期初资产
yesterday = today - timedelta(days=1)
yesterday_stat = session.execute(text("""
SELECT closing_assets, total_profit, total_profit_rate
FROM simulation_daily_stats
WHERE account_id = :account_id AND stat_date = :stat_date
ORDER BY stat_date DESC LIMIT 1
"""), {'account_id': account_id, 'stat_date': yesterday}).fetchone()
if yesterday_stat:
opening_assets = float(yesterday_stat[0])
yesterday_total_profit = float(yesterday_stat[1])
yesterday_total_profit_rate = float(yesterday_stat[2])
else:
# 如果没有昨日数据,使用初始资金作为期初资产
opening_assets = float(initial_capital)
yesterday_total_profit = 0
yesterday_total_profit_rate = 0
# 计算日盈亏
closing_assets = float(total_assets)
daily_profit = closing_assets - opening_assets
daily_profit_rate = (daily_profit / opening_assets) * 100 if opening_assets > 0 else 0
# 计算累计盈亏
current_total_profit = float(total_profit)
current_total_profit_rate = float(total_profit_rate)
# 统计当日交易次数
trade_count = session.execute(text("""
SELECT COUNT(*) FROM simulation_transactions
WHERE account_id = :account_id
AND DATE(transaction_time) = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).scalar()
# 统计当日盈利和亏损次数
win_count = 0
loss_count = 0
max_profit = 0
max_loss = 0
if trade_count > 0:
# 获取当日所有卖出交易
sell_transactions = session.execute(text("""
SELECT st.transaction_price, st.transaction_qty, st.stock_code
FROM simulation_transactions st
WHERE st.account_id = :account_id
AND st.transaction_type = 'SELL'
AND DATE(st.transaction_time) = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).fetchall()
for sell_trans in sell_transactions:
sell_price, sell_qty, stock_code = sell_trans
# 查找对应的持仓成本
position = session.execute(text("""
SELECT avg_cost FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
if position:
avg_cost = float(position[0])
profit = (sell_price - avg_cost) * sell_qty
if profit > 0:
win_count += 1
max_profit = max(max_profit, profit)
else:
loss_count += 1
max_loss = min(max_loss, profit)
# 插入日统计数据
session.execute(text("""
INSERT INTO simulation_daily_stats
(account_id, stat_date, opening_assets, closing_assets, daily_profit, daily_profit_rate,
total_profit, total_profit_rate, trade_count, win_count, loss_count, max_profit, max_loss, created_at)
VALUES (:account_id, :stat_date, :opening_assets, :closing_assets, :daily_profit, :daily_profit_rate,
:total_profit, :total_profit_rate, :trade_count, :win_count, :loss_count, :max_profit, :max_loss, NOW())
"""), {
'account_id': account_id,
'stat_date': today,
'opening_assets': opening_assets,
'closing_assets': closing_assets,
'daily_profit': daily_profit,
'daily_profit_rate': daily_profit_rate,
'total_profit': current_total_profit,
'total_profit_rate': current_total_profit_rate,
'trade_count': trade_count,
'win_count': win_count,
'loss_count': loss_count,
'max_profit': max_profit,
'max_loss': max_loss
})
logger.info(f"账户 {account_id} 日统计完成: 期初 {opening_assets:.2f}, 期末 {closing_assets:.2f}, 日盈亏 {daily_profit:.2f}")
session.commit()
logger.info(f"日统计数据生成完成,共处理 {len(accounts)} 个账户")
except Exception as e:
logger.error(f"生成日统计数据时出错: {e}")
session.rollback()
finally:
session.close()
def main():
"""主循环"""
logger.info("模拟盘后台处理器启动")
last_daily_stats_date = None # 记录上次生成日统计的日期
while True:
try:
current_time = beijing_now()
logger.info(f"开始处理循环 - {current_time}")
# 1. 处理待成交订单(每分钟检查)
process_pending_orders()
# 2. 更新持仓市值交易时间每分钟更新非交易时间每10分钟更新
if is_trading_time():
update_all_positions_market_value()
update_all_accounts_assets()
sleep_time = 60 # 交易时间每分钟更新
else:
# 非交易时间每10分钟更新一次
if current_time.minute % 10 == 0:
update_all_positions_market_value()
update_all_accounts_assets()
sleep_time = 60 # 但仍然每分钟检查待成交订单
# 3. T+1结算和日结算逻辑每日收盘后执行一次
today = current_time.date()
if last_daily_stats_date != today:
# 检查是否已过收盘时间15:00或者是非交易日
if not is_trading_time() and (current_time.hour >= 15 or current_time.weekday() >= 5):
logger.info("开始执行日结算...")
# 先执行T+1结算
process_t1_settlement()
# 再生成日统计数据
generate_daily_stats()
last_daily_stats_date = today
logger.info("日结算完成")
logger.info(f"处理完成,等待 {sleep_time}")
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("收到停止信号,正在退出...")
break
except Exception as e:
logger.error(f"主循环出错: {e}")
time.sleep(30) # 出错后等待30秒再重试
if __name__ == "__main__":
main()