Initial commit

This commit is contained in:
2025-10-11 11:55:25 +08:00
parent 467dad8449
commit 8107dee8d3
2879 changed files with 610575 additions and 0 deletions

View File

@@ -0,0 +1,605 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模拟盘后台处理脚本
负责处理待成交订单、更新持仓收益等后台计算任务
与主应用分离,可以独立运行
"""
import os
import sys
import time
import logging
from datetime import datetime, timedelta, time as dt_time
from decimal import Decimal
from typing import List, Optional
import pytz
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入主应用的数据库连接和模型
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from clickhouse_driver import Client as ClickhouseClient
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('simulation_processor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 数据库配置
DATABASE_URL = "mysql+pymysql://root:Zzl5588161!@localhost:3306/stock?charset=utf8mb4"
CLICKHOUSE_HOST = "192.168.1.58"
CLICKHOUSE_PORT = 9000
# 创建数据库连接
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine)
# ClickHouse客户端
clickhouse_client = ClickhouseClient(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT,user='default',
password='Zzl33818!',
database='stock')
# 北京时区
beijing_tz = pytz.timezone('Asia/Shanghai')
def beijing_now():
"""获取北京时间"""
return datetime.now(beijing_tz)
def is_trading_time():
"""判断是否为交易时间"""
now = beijing_now()
if now.weekday() >= 5: # 周六日
return False
current_time = now.time()
morning_start = dt_time(9, 30)
morning_end = dt_time(11, 30)
afternoon_start = dt_time(13, 0)
afternoon_end = dt_time(15, 0)
return (morning_start <= current_time <= morning_end) or \
(afternoon_start <= current_time <= afternoon_end)
def get_latest_price_from_clickhouse(stock_code: str) -> Optional[float]:
"""从ClickHouse获取最新价格"""
try:
# 确保股票代码包含后缀
if '.' not in stock_code:
stock_code = f"{stock_code}.SH" if stock_code.startswith('6') else f"{stock_code}.SZ"
query = """
SELECT close, timestamp
FROM stock_minute
WHERE code = %(code)s
AND timestamp >= today() - 7
ORDER BY timestamp DESC
LIMIT 1
"""
result = clickhouse_client.execute(query, {'code': stock_code})
if result:
return float(result[0][0])
# 如果没有分钟数据,尝试获取日线数据
daily_query = """
SELECT close
FROM stock_daily
WHERE code = %(code)s
ORDER BY date DESC
LIMIT 1
"""
daily_result = clickhouse_client.execute(daily_query, {'code': stock_code})
if daily_result:
return float(daily_result[0][0])
return None
except Exception as e:
logger.error(f"获取股票 {stock_code} 价格失败: {e}")
return None
def process_pending_orders():
"""处理待成交订单"""
session = SessionLocal()
try:
# 查询所有待成交的订单
pending_orders = session.execute(text("""
SELECT id, order_no, stock_code, stock_name, order_type, order_qty,
order_time, account_id
FROM simulation_orders
WHERE status = 'PENDING'
AND order_time <= NOW() - INTERVAL 1 MINUTE
ORDER BY order_time ASC
""")).fetchall()
logger.info(f"找到 {len(pending_orders)} 个待处理订单")
for order_data in pending_orders:
try:
order_id = order_data[0]
stock_code = order_data[2]
order_type = order_data[4]
order_qty = order_data[5]
account_id = order_data[7]
# 获取最新价格
latest_price = get_latest_price_from_clickhouse(stock_code)
if latest_price:
# 有价格数据,执行成交
filled_amount = latest_price * order_qty
# 计算费用
commission = max(filled_amount * 0.00025, 5.0) # 万分之2.5最低5元
stamp_tax = filled_amount * 0.001 if order_type == 'SELL' else 0 # 卖出印花税
transfer_fee = filled_amount * 0.00002 # 万分之0.2
total_fee = commission + stamp_tax + transfer_fee
# 更新订单状态
session.execute(text("""
UPDATE simulation_orders
SET status = 'FILLED',
filled_qty = :filled_qty,
filled_price = :filled_price,
filled_amount = :filled_amount,
commission = :commission,
stamp_tax = :stamp_tax,
transfer_fee = :transfer_fee,
total_fee = :total_fee,
filled_time = NOW()
WHERE id = :order_id
"""), {
'order_id': order_id,
'filled_qty': order_qty,
'filled_price': latest_price,
'filled_amount': filled_amount,
'commission': commission,
'stamp_tax': stamp_tax,
'transfer_fee': transfer_fee,
'total_fee': total_fee
})
# 处理账户和持仓更新
if order_type == 'BUY':
process_buy_order_settlement(session, order_data, latest_price, total_fee)
else:
process_sell_order_settlement(session, order_data, latest_price, total_fee)
logger.info(f"订单 {order_data[1]} 成交,价格: {latest_price}")
else:
logger.warning(f"订单 {order_data[1]} 无法获取价格数据,继续等待")
except Exception as e:
logger.error(f"处理订单 {order_data[1]} 时出错: {e}")
session.rollback()
continue
session.commit()
except Exception as e:
logger.error(f"处理待成交订单时出错: {e}")
session.rollback()
finally:
session.close()
def process_buy_order_settlement(session, order_data, price: float, total_fee: float):
"""处理买入订单结算"""
account_id = order_data[7]
stock_code = order_data[2]
stock_name = order_data[3]
order_qty = order_data[5]
total_cost = price * order_qty + total_fee
# 扣除账户资金
session.execute(text("""
UPDATE simulation_accounts
SET available_cash = available_cash - :total_cost
WHERE id = :account_id
"""), {'total_cost': total_cost, 'account_id': account_id})
# 更新或创建持仓
existing_position = session.execute(text("""
SELECT id, position_qty, avg_cost FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
if existing_position:
# 更新现有持仓
old_qty = existing_position[1]
old_cost = float(existing_position[2])
new_qty = old_qty + order_qty
new_avg_cost = (old_qty * old_cost + total_cost) / new_qty
session.execute(text("""
UPDATE simulation_positions
SET position_qty = :new_qty,
available_qty = :new_qty,
avg_cost = :new_avg_cost,
updated_at = NOW()
WHERE id = :position_id
"""), {
'position_id': existing_position[0],
'new_qty': new_qty,
'new_avg_cost': new_avg_cost
})
else:
# 创建新持仓
session.execute(text("""
INSERT INTO simulation_positions
(account_id, stock_code, stock_name, position_qty, available_qty, avg_cost,
current_price, market_value, profit, profit_rate, created_at, updated_at)
VALUES (:account_id, :stock_code, :stock_name, :qty, :qty, :avg_cost,
:price, :market_value, 0, 0, NOW(), NOW())
"""), {
'account_id': account_id,
'stock_code': stock_code,
'stock_name': stock_name,
'qty': order_qty,
'avg_cost': total_cost / order_qty,
'price': price,
'market_value': price * order_qty
})
def process_sell_order_settlement(session, order_data, price: float, total_fee: float):
"""处理卖出订单结算"""
account_id = order_data[7]
stock_code = order_data[2]
order_qty = order_data[5]
sell_amount = price * order_qty
net_amount = sell_amount - total_fee
# 增加账户资金
session.execute(text("""
UPDATE simulation_accounts
SET available_cash = available_cash + :net_amount
WHERE id = :account_id
"""), {'net_amount': net_amount, 'account_id': account_id})
# 更新持仓
session.execute(text("""
UPDATE simulation_positions
SET position_qty = position_qty - :qty,
available_qty = available_qty - :qty,
updated_at = NOW()
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'qty': order_qty, 'account_id': account_id, 'stock_code': stock_code})
# 删除清仓的持仓
session.execute(text("""
DELETE FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code AND position_qty <= 0
"""), {'account_id': account_id, 'stock_code': stock_code})
def update_all_positions_market_value():
"""更新所有持仓的市值和收益"""
session = SessionLocal()
try:
positions = session.execute(text("""
SELECT id, stock_code, position_qty, avg_cost
FROM simulation_positions
WHERE position_qty > 0
""")).fetchall()
logger.info(f"更新 {len(positions)} 个持仓的市值")
for position in positions:
position_id, stock_code, qty, avg_cost = position
# 获取最新价格
latest_price = get_latest_price_from_clickhouse(stock_code)
if latest_price:
market_value = latest_price * qty
cost_value = float(avg_cost) * qty
profit = market_value - cost_value
profit_rate = (profit / cost_value) * 100 if cost_value > 0 else 0
session.execute(text("""
UPDATE simulation_positions
SET current_price = :price,
market_value = :market_value,
profit = :profit,
profit_rate = :profit_rate,
updated_at = NOW()
WHERE id = :position_id
"""), {
'position_id': position_id,
'price': latest_price,
'market_value': market_value,
'profit': profit,
'profit_rate': profit_rate
})
session.commit()
logger.info("持仓市值更新完成")
except Exception as e:
logger.error(f"更新持仓市值时出错: {e}")
session.rollback()
finally:
session.close()
def update_all_accounts_assets():
"""更新所有账户的总资产"""
session = SessionLocal()
try:
# 获取所有账户
accounts = session.execute(text("""
SELECT id, initial_capital, available_cash, frozen_cash
FROM simulation_accounts
""")).fetchall()
for account in accounts:
account_id, initial_capital, available_cash, frozen_cash = account
# 计算持仓总市值
total_position_value = session.execute(text("""
SELECT COALESCE(SUM(market_value), 0)
FROM simulation_positions
WHERE account_id = :account_id
"""), {'account_id': account_id}).scalar()
# 计算总资产和收益
total_assets = float(available_cash) + float(frozen_cash) + float(total_position_value or 0)
total_profit = total_assets - float(initial_capital)
total_profit_rate = (total_profit / float(initial_capital)) * 100 if initial_capital > 0 else 0
# 更新账户
session.execute(text("""
UPDATE simulation_accounts
SET position_value = :position_value,
total_assets = :total_assets,
total_profit = :total_profit,
total_profit_rate = :total_profit_rate,
updated_at = NOW()
WHERE id = :account_id
"""), {
'account_id': account_id,
'position_value': total_position_value or 0,
'total_assets': total_assets,
'total_profit': total_profit,
'total_profit_rate': total_profit_rate
})
session.commit()
logger.info(f"更新了 {len(accounts)} 个账户的资产")
except Exception as e:
logger.error(f"更新账户资产时出错: {e}")
session.rollback()
finally:
session.close()
def process_t1_settlement():
"""处理T+1结算每日收盘后运行"""
session = SessionLocal()
try:
# 获取所有需要结算的持仓
positions = session.execute(text("""
SELECT id, account_id, stock_code, frozen_qty
FROM simulation_positions
WHERE frozen_qty > 0
""")).fetchall()
logger.info(f"开始T+1结算{len(positions)} 个持仓需要结算")
for position in positions:
position_id, account_id, stock_code, frozen_qty = position
# 将冻结数量转为可用数量
session.execute(text("""
UPDATE simulation_positions
SET available_qty = available_qty + :frozen_qty,
frozen_qty = 0,
updated_at = NOW()
WHERE id = :position_id
"""), {'frozen_qty': frozen_qty, 'position_id': position_id})
logger.info(f"持仓 {stock_code} T+1结算完成: {frozen_qty} 股从冻结转为可用")
session.commit()
logger.info(f"T+1结算完成共处理 {len(positions)} 个持仓")
except Exception as e:
logger.error(f"T+1结算失败: {e}")
session.rollback()
finally:
session.close()
def generate_daily_stats():
"""生成日统计数据(每日收盘后调用)"""
session = SessionLocal()
try:
today = beijing_now().date()
# 获取所有账户
accounts = session.execute(text("""
SELECT id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate
FROM simulation_accounts
""")).fetchall()
logger.info(f"开始生成 {today} 的日统计数据,共 {len(accounts)} 个账户")
for account in accounts:
account_id, initial_capital, available_cash, frozen_cash, position_value, total_assets, total_profit, total_profit_rate = account
# 检查是否已存在今日统计
existing_stat = session.execute(text("""
SELECT id FROM simulation_daily_stats
WHERE account_id = :account_id AND stat_date = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).fetchone()
if existing_stat:
logger.info(f"账户 {account_id}{today} 统计数据已存在,跳过")
continue
# 获取昨日统计数据作为期初资产
yesterday = today - timedelta(days=1)
yesterday_stat = session.execute(text("""
SELECT closing_assets, total_profit, total_profit_rate
FROM simulation_daily_stats
WHERE account_id = :account_id AND stat_date = :stat_date
ORDER BY stat_date DESC LIMIT 1
"""), {'account_id': account_id, 'stat_date': yesterday}).fetchone()
if yesterday_stat:
opening_assets = float(yesterday_stat[0])
yesterday_total_profit = float(yesterday_stat[1])
yesterday_total_profit_rate = float(yesterday_stat[2])
else:
# 如果没有昨日数据,使用初始资金作为期初资产
opening_assets = float(initial_capital)
yesterday_total_profit = 0
yesterday_total_profit_rate = 0
# 计算日盈亏
closing_assets = float(total_assets)
daily_profit = closing_assets - opening_assets
daily_profit_rate = (daily_profit / opening_assets) * 100 if opening_assets > 0 else 0
# 计算累计盈亏
current_total_profit = float(total_profit)
current_total_profit_rate = float(total_profit_rate)
# 统计当日交易次数
trade_count = session.execute(text("""
SELECT COUNT(*) FROM simulation_transactions
WHERE account_id = :account_id
AND DATE(transaction_time) = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).scalar()
# 统计当日盈利和亏损次数
win_count = 0
loss_count = 0
max_profit = 0
max_loss = 0
if trade_count > 0:
# 获取当日所有卖出交易
sell_transactions = session.execute(text("""
SELECT st.transaction_price, st.transaction_qty, st.stock_code
FROM simulation_transactions st
WHERE st.account_id = :account_id
AND st.transaction_type = 'SELL'
AND DATE(st.transaction_time) = :stat_date
"""), {'account_id': account_id, 'stat_date': today}).fetchall()
for sell_trans in sell_transactions:
sell_price, sell_qty, stock_code = sell_trans
# 查找对应的持仓成本
position = session.execute(text("""
SELECT avg_cost FROM simulation_positions
WHERE account_id = :account_id AND stock_code = :stock_code
"""), {'account_id': account_id, 'stock_code': stock_code}).fetchone()
if position:
avg_cost = float(position[0])
profit = (sell_price - avg_cost) * sell_qty
if profit > 0:
win_count += 1
max_profit = max(max_profit, profit)
else:
loss_count += 1
max_loss = min(max_loss, profit)
# 插入日统计数据
session.execute(text("""
INSERT INTO simulation_daily_stats
(account_id, stat_date, opening_assets, closing_assets, daily_profit, daily_profit_rate,
total_profit, total_profit_rate, trade_count, win_count, loss_count, max_profit, max_loss, created_at)
VALUES (:account_id, :stat_date, :opening_assets, :closing_assets, :daily_profit, :daily_profit_rate,
:total_profit, :total_profit_rate, :trade_count, :win_count, :loss_count, :max_profit, :max_loss, NOW())
"""), {
'account_id': account_id,
'stat_date': today,
'opening_assets': opening_assets,
'closing_assets': closing_assets,
'daily_profit': daily_profit,
'daily_profit_rate': daily_profit_rate,
'total_profit': current_total_profit,
'total_profit_rate': current_total_profit_rate,
'trade_count': trade_count,
'win_count': win_count,
'loss_count': loss_count,
'max_profit': max_profit,
'max_loss': max_loss
})
logger.info(f"账户 {account_id} 日统计完成: 期初 {opening_assets:.2f}, 期末 {closing_assets:.2f}, 日盈亏 {daily_profit:.2f}")
session.commit()
logger.info(f"日统计数据生成完成,共处理 {len(accounts)} 个账户")
except Exception as e:
logger.error(f"生成日统计数据时出错: {e}")
session.rollback()
finally:
session.close()
def main():
"""主循环"""
logger.info("模拟盘后台处理器启动")
last_daily_stats_date = None # 记录上次生成日统计的日期
while True:
try:
current_time = beijing_now()
logger.info(f"开始处理循环 - {current_time}")
# 1. 处理待成交订单(每分钟检查)
process_pending_orders()
# 2. 更新持仓市值交易时间每分钟更新非交易时间每10分钟更新
if is_trading_time():
update_all_positions_market_value()
update_all_accounts_assets()
sleep_time = 60 # 交易时间每分钟更新
else:
# 非交易时间每10分钟更新一次
if current_time.minute % 10 == 0:
update_all_positions_market_value()
update_all_accounts_assets()
sleep_time = 60 # 但仍然每分钟检查待成交订单
# 3. T+1结算和日结算逻辑每日收盘后执行一次
today = current_time.date()
if last_daily_stats_date != today:
# 检查是否已过收盘时间15:00或者是非交易日
if not is_trading_time() and (current_time.hour >= 15 or current_time.weekday() >= 5):
logger.info("开始执行日结算...")
# 先执行T+1结算
process_t1_settlement()
# 再生成日统计数据
generate_daily_stats()
last_daily_stats_date = today
logger.info("日结算完成")
logger.info(f"处理完成,等待 {sleep_time}")
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info("收到停止信号,正在退出...")
break
except Exception as e:
logger.error(f"主循环出错: {e}")
time.sleep(30) # 出错后等待30秒再重试
if __name__ == "__main__":
main()