606 lines
24 KiB
Python
606 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
模拟盘后台处理脚本
|
||
负责处理待成交订单、更新持仓收益等后台计算任务
|
||
与主应用分离,可以独立运行
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import logging
|
||
from datetime import datetime, timedelta, time as dt_time
|
||
from decimal import Decimal
|
||
from typing import List, Optional
|
||
import pytz
|
||
|
||
# 添加项目根目录到Python路径
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# 导入主应用的数据库连接和模型
|
||
from sqlalchemy import create_engine, text
|
||
from sqlalchemy.orm import sessionmaker
|
||
from clickhouse_driver import Client as ClickhouseClient
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('simulation_processor.log'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 数据库配置
|
||
DATABASE_URL = "mysql+pymysql://root:Zzl5588161!@localhost:3306/stock?charset=utf8mb4"
|
||
CLICKHOUSE_HOST = "192.168.1.58"
|
||
CLICKHOUSE_PORT = 9000
|
||
|
||
# 创建数据库连接
|
||
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
|
||
SessionLocal = sessionmaker(bind=engine)
|
||
# ClickHouse客户端
|
||
clickhouse_client = ClickhouseClient(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT,user='default',
|
||
password='Zzl33818!',
|
||
database='stock')
|
||
|
||
# 北京时区
|
||
beijing_tz = pytz.timezone('Asia/Shanghai')
|
||
|
||
def beijing_now():
|
||
"""获取北京时间"""
|
||
return datetime.now(beijing_tz)
|
||
|
||
def is_trading_time():
|
||
"""判断是否为交易时间"""
|
||
now = beijing_now()
|
||
if now.weekday() >= 5: # 周六日
|
||
return False
|
||
|
||
current_time = now.time()
|
||
morning_start = dt_time(9, 30)
|
||
morning_end = dt_time(11, 30)
|
||
afternoon_start = dt_time(13, 0)
|
||
afternoon_end = dt_time(15, 0)
|
||
|
||
return (morning_start <= current_time <= morning_end) or \
|
||
(afternoon_start <= current_time <= afternoon_end)
|
||
|
||
def get_latest_price_from_clickhouse(stock_code: str) -> Optional[float]:
|
||
"""从ClickHouse获取最新价格"""
|
||
try:
|
||
# 确保股票代码包含后缀
|
||
if '.' not in stock_code:
|
||
stock_code = f"{stock_code}.SH" if stock_code.startswith('6') else f"{stock_code}.SZ"
|
||
|
||
query = """
|
||
SELECT close, timestamp
|
||
FROM stock_minute
|
||
WHERE code = %(code)s
|
||
AND timestamp >= today() - 7
|
||
ORDER BY timestamp DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
result = clickhouse_client.execute(query, {'code': stock_code})
|
||
|
||
if result:
|
||
return float(result[0][0])
|
||
|
||
# 如果没有分钟数据,尝试获取日线数据
|
||
daily_query = """
|
||
SELECT close
|
||
FROM stock_daily
|
||
WHERE code = %(code)s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
daily_result = clickhouse_client.execute(daily_query, {'code': stock_code})
|
||
if daily_result:
|
||
return float(daily_result[0][0])
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取股票 {stock_code} 价格失败: {e}")
|
||
return None
|
||
|
||
def process_pending_orders():
|
||
"""处理待成交订单"""
|
||
session = SessionLocal()
|
||
try:
|
||
# 查询所有待成交的订单
|
||
pending_orders = session.execute(text("""
|
||
SELECT id, order_no, stock_code, stock_name, order_type, order_qty,
|
||
order_time, account_id
|
||
FROM simulation_orders
|
||
WHERE status = 'PENDING'
|
||
AND order_time <= NOW() - INTERVAL 1 MINUTE
|
||
ORDER BY order_time ASC
|
||
""")).fetchall()
|
||
|
||
logger.info(f"找到 {len(pending_orders)} 个待处理订单")
|
||
|
||
for order_data in pending_orders:
|
||
try:
|
||
order_id = order_data[0]
|
||
stock_code = order_data[2]
|
||
order_type = order_data[4]
|
||
order_qty = order_data[5]
|
||
account_id = order_data[7]
|
||
|
||
# 获取最新价格
|
||
latest_price = get_latest_price_from_clickhouse(stock_code)
|
||
|
||
if latest_price:
|
||
# 有价格数据,执行成交
|
||
filled_amount = latest_price * order_qty
|
||
|
||
# 计算费用
|
||
commission = max(filled_amount * 0.00025, 5.0) # 万分之2.5,最低5元
|
||
stamp_tax = filled_amount * 0.001 if order_type == 'SELL' else 0 # 卖出印花税
|
||
transfer_fee = filled_amount * 0.00002 # 万分之0.2
|
||
total_fee = commission + stamp_tax + transfer_fee
|
||
|
||
# 更新订单状态
|
||
session.execute(text("""
|
||
UPDATE simulation_orders
|
||
SET status = 'FILLED',
|
||
filled_qty = :filled_qty,
|
||
filled_price = :filled_price,
|
||
filled_amount = :filled_amount,
|
||
commission = :commission,
|
||
stamp_tax = :stamp_tax,
|
||
transfer_fee = :transfer_fee,
|
||
total_fee = :total_fee,
|
||
filled_time = NOW()
|
||
WHERE id = :order_id
|
||
"""), {
|
||
'order_id': order_id,
|
||
'filled_qty': order_qty,
|
||
'filled_price': latest_price,
|
||
'filled_amount': filled_amount,
|
||
'commission': commission,
|
||
'stamp_tax': stamp_tax,
|
||
'transfer_fee': transfer_fee,
|
||
'total_fee': total_fee
|
||
})
|
||
|
||
# 处理账户和持仓更新
|
||
if order_type == 'BUY':
|
||
process_buy_order_settlement(session, order_data, latest_price, total_fee)
|
||
else:
|
||
process_sell_order_settlement(session, order_data, latest_price, total_fee)
|
||
|
||
logger.info(f"订单 {order_data[1]} 成交,价格: {latest_price}")
|
||
else:
|
||
logger.warning(f"订单 {order_data[1]} 无法获取价格数据,继续等待")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理订单 {order_data[1]} 时出错: {e}")
|
||
session.rollback()
|
||
continue
|
||
|
||
session.commit()
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理待成交订单时出错: {e}")
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
def process_buy_order_settlement(session, order_data, price: float, total_fee: float):
|
||
"""处理买入订单结算"""
|
||
account_id = order_data[7]
|
||
stock_code = order_data[2]
|
||
stock_name = order_data[3]
|
||
order_qty = order_data[5]
|
||
|
||
total_cost = price * order_qty + total_fee
|
||
|
||
# 扣除账户资金
|
||
session.execute(text("""
|
||
UPDATE simulation_accounts
|
||
SET available_cash = available_cash - :total_cost
|
||
WHERE id = :account_id
|
||
"""), {'total_cost': total_cost, 'account_id': account_id})
|
||
|
||
# 更新或创建持仓
|
||
existing_position = session.execute(text("""
|
||
SELECT id, position_qty, avg_cost FROM simulation_positions
|
||
WHERE account_id = :account_id AND stock_code = :stock_code
|
||
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
|
||
|
||
if existing_position:
|
||
# 更新现有持仓
|
||
old_qty = existing_position[1]
|
||
old_cost = float(existing_position[2])
|
||
new_qty = old_qty + order_qty
|
||
new_avg_cost = (old_qty * old_cost + total_cost) / new_qty
|
||
|
||
session.execute(text("""
|
||
UPDATE simulation_positions
|
||
SET position_qty = :new_qty,
|
||
available_qty = :new_qty,
|
||
avg_cost = :new_avg_cost,
|
||
updated_at = NOW()
|
||
WHERE id = :position_id
|
||
"""), {
|
||
'position_id': existing_position[0],
|
||
'new_qty': new_qty,
|
||
'new_avg_cost': new_avg_cost
|
||
})
|
||
else:
|
||
# 创建新持仓
|
||
session.execute(text("""
|
||
INSERT INTO simulation_positions
|
||
(account_id, stock_code, stock_name, position_qty, available_qty, avg_cost,
|
||
current_price, market_value, profit, profit_rate, created_at, updated_at)
|
||
VALUES (:account_id, :stock_code, :stock_name, :qty, :qty, :avg_cost,
|
||
:price, :market_value, 0, 0, NOW(), NOW())
|
||
"""), {
|
||
'account_id': account_id,
|
||
'stock_code': stock_code,
|
||
'stock_name': stock_name,
|
||
'qty': order_qty,
|
||
'avg_cost': total_cost / order_qty,
|
||
'price': price,
|
||
'market_value': price * order_qty
|
||
})
|
||
|
||
def process_sell_order_settlement(session, order_data, price: float, total_fee: float):
|
||
"""处理卖出订单结算"""
|
||
account_id = order_data[7]
|
||
stock_code = order_data[2]
|
||
order_qty = order_data[5]
|
||
|
||
sell_amount = price * order_qty
|
||
net_amount = sell_amount - total_fee
|
||
|
||
# 增加账户资金
|
||
session.execute(text("""
|
||
UPDATE simulation_accounts
|
||
SET available_cash = available_cash + :net_amount
|
||
WHERE id = :account_id
|
||
"""), {'net_amount': net_amount, 'account_id': account_id})
|
||
|
||
# 更新持仓
|
||
session.execute(text("""
|
||
UPDATE simulation_positions
|
||
SET position_qty = position_qty - :qty,
|
||
available_qty = available_qty - :qty,
|
||
updated_at = NOW()
|
||
WHERE account_id = :account_id AND stock_code = :stock_code
|
||
"""), {'qty': order_qty, 'account_id': account_id, 'stock_code': stock_code})
|
||
|
||
# 删除清仓的持仓
|
||
session.execute(text("""
|
||
DELETE FROM simulation_positions
|
||
WHERE account_id = :account_id AND stock_code = :stock_code AND position_qty <= 0
|
||
"""), {'account_id': account_id, 'stock_code': stock_code})
|
||
|
||
def update_all_positions_market_value():
|
||
"""更新所有持仓的市值和收益"""
|
||
session = SessionLocal()
|
||
try:
|
||
positions = session.execute(text("""
|
||
SELECT id, stock_code, position_qty, avg_cost
|
||
FROM simulation_positions
|
||
WHERE position_qty > 0
|
||
""")).fetchall()
|
||
|
||
logger.info(f"更新 {len(positions)} 个持仓的市值")
|
||
|
||
for position in positions:
|
||
position_id, stock_code, qty, avg_cost = position
|
||
|
||
# 获取最新价格
|
||
latest_price = get_latest_price_from_clickhouse(stock_code)
|
||
|
||
if latest_price:
|
||
market_value = latest_price * qty
|
||
cost_value = float(avg_cost) * qty
|
||
profit = market_value - cost_value
|
||
profit_rate = (profit / cost_value) * 100 if cost_value > 0 else 0
|
||
|
||
session.execute(text("""
|
||
UPDATE simulation_positions
|
||
SET current_price = :price,
|
||
market_value = :market_value,
|
||
profit = :profit,
|
||
profit_rate = :profit_rate,
|
||
updated_at = NOW()
|
||
WHERE id = :position_id
|
||
"""), {
|
||
'position_id': position_id,
|
||
'price': latest_price,
|
||
'market_value': market_value,
|
||
'profit': profit,
|
||
'profit_rate': profit_rate
|
||
})
|
||
|
||
session.commit()
|
||
logger.info("持仓市值更新完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新持仓市值时出错: {e}")
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
def update_all_accounts_assets():
|
||
"""更新所有账户的总资产"""
|
||
session = SessionLocal()
|
||
try:
|
||
# 获取所有账户
|
||
accounts = session.execute(text("""
|
||
SELECT id, initial_capital, available_cash, frozen_cash
|
||
FROM simulation_accounts
|
||
""")).fetchall()
|
||
|
||
for account in accounts:
|
||
account_id, initial_capital, available_cash, frozen_cash = account
|
||
|
||
# 计算持仓总市值
|
||
total_position_value = session.execute(text("""
|
||
SELECT COALESCE(SUM(market_value), 0)
|
||
FROM simulation_positions
|
||
WHERE account_id = :account_id
|
||
"""), {'account_id': account_id}).scalar()
|
||
|
||
# 计算总资产和收益
|
||
total_assets = float(available_cash) + float(frozen_cash) + float(total_position_value or 0)
|
||
total_profit = total_assets - float(initial_capital)
|
||
total_profit_rate = (total_profit / float(initial_capital)) * 100 if initial_capital > 0 else 0
|
||
|
||
# 更新账户
|
||
session.execute(text("""
|
||
UPDATE simulation_accounts
|
||
SET position_value = :position_value,
|
||
total_assets = :total_assets,
|
||
total_profit = :total_profit,
|
||
total_profit_rate = :total_profit_rate,
|
||
updated_at = NOW()
|
||
WHERE id = :account_id
|
||
"""), {
|
||
'account_id': account_id,
|
||
'position_value': total_position_value or 0,
|
||
'total_assets': total_assets,
|
||
'total_profit': total_profit,
|
||
'total_profit_rate': total_profit_rate
|
||
})
|
||
|
||
session.commit()
|
||
logger.info(f"更新了 {len(accounts)} 个账户的资产")
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新账户资产时出错: {e}")
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
def process_t1_settlement():
|
||
"""处理T+1结算(每日收盘后运行)"""
|
||
session = SessionLocal()
|
||
try:
|
||
# 获取所有需要结算的持仓
|
||
positions = session.execute(text("""
|
||
SELECT id, account_id, stock_code, frozen_qty
|
||
FROM simulation_positions
|
||
WHERE frozen_qty > 0
|
||
""")).fetchall()
|
||
|
||
logger.info(f"开始T+1结算,共 {len(positions)} 个持仓需要结算")
|
||
|
||
for position in positions:
|
||
position_id, account_id, stock_code, frozen_qty = position
|
||
|
||
# 将冻结数量转为可用数量
|
||
session.execute(text("""
|
||
UPDATE simulation_positions
|
||
SET available_qty = available_qty + :frozen_qty,
|
||
frozen_qty = 0,
|
||
updated_at = NOW()
|
||
WHERE id = :position_id
|
||
"""), {'frozen_qty': frozen_qty, 'position_id': position_id})
|
||
|
||
logger.info(f"持仓 {stock_code} T+1结算完成: {frozen_qty} 股从冻结转为可用")
|
||
|
||
session.commit()
|
||
logger.info(f"T+1结算完成,共处理 {len(positions)} 个持仓")
|
||
|
||
except Exception as e:
|
||
logger.error(f"T+1结算失败: {e}")
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
def generate_daily_stats():
|
||
"""生成日统计数据(每日收盘后调用)"""
|
||
session = SessionLocal()
|
||
try:
|
||
today = beijing_now().date()
|
||
|
||
# 获取所有账户
|
||
accounts = session.execute(text("""
|
||
SELECT id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate
|
||
FROM simulation_accounts
|
||
""")).fetchall()
|
||
|
||
logger.info(f"开始生成 {today} 的日统计数据,共 {len(accounts)} 个账户")
|
||
|
||
for account in accounts:
|
||
account_id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate = account
|
||
|
||
# 检查是否已存在今日统计
|
||
existing_stat = session.execute(text("""
|
||
SELECT id FROM simulation_daily_stats
|
||
WHERE account_id = :account_id AND stat_date = :stat_date
|
||
"""), {'account_id': account_id, 'stat_date': today}).fetchone()
|
||
|
||
if existing_stat:
|
||
logger.info(f"账户 {account_id} 的 {today} 统计数据已存在,跳过")
|
||
continue
|
||
|
||
# 获取昨日统计数据作为期初资产
|
||
yesterday = today - timedelta(days=1)
|
||
yesterday_stat = session.execute(text("""
|
||
SELECT closing_assets, total_profit, total_profit_rate
|
||
FROM simulation_daily_stats
|
||
WHERE account_id = :account_id AND stat_date = :stat_date
|
||
ORDER BY stat_date DESC LIMIT 1
|
||
"""), {'account_id': account_id, 'stat_date': yesterday}).fetchone()
|
||
|
||
if yesterday_stat:
|
||
opening_assets = float(yesterday_stat[0])
|
||
yesterday_total_profit = float(yesterday_stat[1])
|
||
yesterday_total_profit_rate = float(yesterday_stat[2])
|
||
else:
|
||
# 如果没有昨日数据,使用初始资金作为期初资产
|
||
opening_assets = float(initial_capital)
|
||
yesterday_total_profit = 0
|
||
yesterday_total_profit_rate = 0
|
||
|
||
# 计算日盈亏
|
||
closing_assets = float(total_assets)
|
||
daily_profit = closing_assets - opening_assets
|
||
daily_profit_rate = (daily_profit / opening_assets) * 100 if opening_assets > 0 else 0
|
||
|
||
# 计算累计盈亏
|
||
current_total_profit = float(total_profit)
|
||
current_total_profit_rate = float(total_profit_rate)
|
||
|
||
# 统计当日交易次数
|
||
trade_count = session.execute(text("""
|
||
SELECT COUNT(*) FROM simulation_transactions
|
||
WHERE account_id = :account_id
|
||
AND DATE(transaction_time) = :stat_date
|
||
"""), {'account_id': account_id, 'stat_date': today}).scalar()
|
||
|
||
# 统计当日盈利和亏损次数
|
||
win_count = 0
|
||
loss_count = 0
|
||
max_profit = 0
|
||
max_loss = 0
|
||
|
||
if trade_count > 0:
|
||
# 获取当日所有卖出交易
|
||
sell_transactions = session.execute(text("""
|
||
SELECT st.transaction_price, st.transaction_qty, st.stock_code
|
||
FROM simulation_transactions st
|
||
WHERE st.account_id = :account_id
|
||
AND st.transaction_type = 'SELL'
|
||
AND DATE(st.transaction_time) = :stat_date
|
||
"""), {'account_id': account_id, 'stat_date': today}).fetchall()
|
||
|
||
for sell_trans in sell_transactions:
|
||
sell_price, sell_qty, stock_code = sell_trans
|
||
|
||
# 查找对应的持仓成本
|
||
position = session.execute(text("""
|
||
SELECT avg_cost FROM simulation_positions
|
||
WHERE account_id = :account_id AND stock_code = :stock_code
|
||
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
|
||
|
||
if position:
|
||
avg_cost = float(position[0])
|
||
profit = (sell_price - avg_cost) * sell_qty
|
||
|
||
if profit > 0:
|
||
win_count += 1
|
||
max_profit = max(max_profit, profit)
|
||
else:
|
||
loss_count += 1
|
||
max_loss = min(max_loss, profit)
|
||
|
||
# 插入日统计数据
|
||
session.execute(text("""
|
||
INSERT INTO simulation_daily_stats
|
||
(account_id, stat_date, opening_assets, closing_assets, daily_profit, daily_profit_rate,
|
||
total_profit, total_profit_rate, trade_count, win_count, loss_count, max_profit, max_loss, created_at)
|
||
VALUES (:account_id, :stat_date, :opening_assets, :closing_assets, :daily_profit, :daily_profit_rate,
|
||
:total_profit, :total_profit_rate, :trade_count, :win_count, :loss_count, :max_profit, :max_loss, NOW())
|
||
"""), {
|
||
'account_id': account_id,
|
||
'stat_date': today,
|
||
'opening_assets': opening_assets,
|
||
'closing_assets': closing_assets,
|
||
'daily_profit': daily_profit,
|
||
'daily_profit_rate': daily_profit_rate,
|
||
'total_profit': current_total_profit,
|
||
'total_profit_rate': current_total_profit_rate,
|
||
'trade_count': trade_count,
|
||
'win_count': win_count,
|
||
'loss_count': loss_count,
|
||
'max_profit': max_profit,
|
||
'max_loss': max_loss
|
||
})
|
||
|
||
logger.info(f"账户 {account_id} 日统计完成: 期初 {opening_assets:.2f}, 期末 {closing_assets:.2f}, 日盈亏 {daily_profit:.2f}")
|
||
|
||
session.commit()
|
||
logger.info(f"日统计数据生成完成,共处理 {len(accounts)} 个账户")
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成日统计数据时出错: {e}")
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
def main():
|
||
"""主循环"""
|
||
logger.info("模拟盘后台处理器启动")
|
||
last_daily_stats_date = None # 记录上次生成日统计的日期
|
||
while True:
|
||
try:
|
||
current_time = beijing_now()
|
||
logger.info(f"开始处理循环 - {current_time}")
|
||
|
||
# 1. 处理待成交订单(每分钟检查)
|
||
process_pending_orders()
|
||
|
||
# 2. 更新持仓市值(交易时间每分钟更新,非交易时间每10分钟更新)
|
||
if is_trading_time():
|
||
update_all_positions_market_value()
|
||
update_all_accounts_assets()
|
||
sleep_time = 60 # 交易时间每分钟更新
|
||
else:
|
||
# 非交易时间,每10分钟更新一次
|
||
if current_time.minute % 10 == 0:
|
||
update_all_positions_market_value()
|
||
update_all_accounts_assets()
|
||
sleep_time = 60 # 但仍然每分钟检查待成交订单
|
||
|
||
# 3. T+1结算和日结算逻辑(每日收盘后执行一次)
|
||
today = current_time.date()
|
||
if last_daily_stats_date != today:
|
||
# 检查是否已过收盘时间(15:00)或者是非交易日
|
||
if not is_trading_time() and (current_time.hour >= 15 or current_time.weekday() >= 5):
|
||
logger.info("开始执行日结算...")
|
||
|
||
# 先执行T+1结算
|
||
process_t1_settlement()
|
||
|
||
# 再生成日统计数据
|
||
generate_daily_stats()
|
||
|
||
last_daily_stats_date = today
|
||
logger.info("日结算完成")
|
||
|
||
logger.info(f"处理完成,等待 {sleep_time} 秒")
|
||
time.sleep(sleep_time)
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("收到停止信号,正在退出...")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"主循环出错: {e}")
|
||
time.sleep(30) # 出错后等待30秒再重试
|
||
|
||
if __name__ == "__main__":
|
||
main()
|