update pay function

This commit is contained in:
2025-11-28 09:17:44 +08:00
parent bc6e993dec
commit 49656e6e88
2 changed files with 441 additions and 84 deletions

348
app_vx.py
View File

@@ -1,9 +1,24 @@
# ===================== Gevent Monkey Patch (必须在最开头) =====================
# 检测是否通过 gevent/gunicorn 运行,如果是则打 monkey patch
import os
import sys
# 检查环境变量或命令行参数判断是否需要 gevent
_USE_GEVENT = os.environ.get('USE_GEVENT', 'false').lower() == 'true'
if _USE_GEVENT or 'gevent' in sys.modules:
try:
from gevent import monkey
monkey.patch_all()
print("✅ Gevent monkey patch 已应用")
except ImportError:
print("⚠️ Gevent 未安装,跳过 monkey patch")
# ===================== Gevent Monkey Patch 结束 =====================
import csv
import logging
import random
import re
import math
import os
import secrets
import string
@@ -74,20 +89,28 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
logger = logging.getLogger(__name__)
# ===================== ClickHouse 连接池实现 =====================
# ===================== ClickHouse 连接池实现(增强版)=====================
import threading as _threading
import atexit
class ClickHouseConnectionPool:
"""
ClickHouse 连接池
ClickHouse 连接池(增强版)
- 支持连接复用,避免频繁创建/销毁连接
- 支持连接超时和健康检查
- 支持自动重
- 支持连接最大存活时间(防止僵尸连接)
- 支持后台自动清理过期连接
- 支持自动重连和重试
- 线程安全
"""
def __init__(self, host, port, user, password, database,
pool_size=10, max_overflow=10,
connection_timeout=10, query_timeout=30,
health_check_interval=60):
health_check_interval=60,
max_connection_lifetime=300, # 新增:连接最大存活时间(秒)
cleanup_interval=60, # 新增:清理间隔(秒)
max_retries=3): # 新增:最大重试次数
"""
初始化连接池
@@ -102,6 +125,9 @@ class ClickHouseConnectionPool:
connection_timeout: 获取连接超时时间(秒)
query_timeout: 查询超时时间(秒)
health_check_interval: 健康检查间隔(秒)
max_connection_lifetime: 连接最大存活时间(秒),超过后自动关闭重建
cleanup_interval: 后台清理线程间隔(秒)
max_retries: 查询失败时的最大重试次数
"""
self.host = host
self.port = port
@@ -113,6 +139,9 @@ class ClickHouseConnectionPool:
self.connection_timeout = connection_timeout
self.query_timeout = query_timeout
self.health_check_interval = health_check_interval
self.max_connection_lifetime = max_connection_lifetime
self.cleanup_interval = cleanup_interval
self.max_retries = max_retries
# 连接池队列
self._pool = Queue(maxsize=pool_size + max_overflow)
@@ -124,25 +153,108 @@ class ClickHouseConnectionPool:
self._last_used = {}
# 连接创建时间记录
self._created_at = {}
# 是否已关闭
self._closed = False
# 清理线程
self._cleanup_thread = None
self._cleanup_stop_event = _threading.Event()
# 初始化核心连接
self._init_pool()
logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}")
# 初始化核心连接(延迟初始化,首次使用时创建)
self._initialized = False
logger.info(f"ClickHouse 连接池配置完成: pool_size={pool_size}, max_overflow={max_overflow}, "
f"max_lifetime={max_connection_lifetime}s")
# 启动后台清理线程
self._start_cleanup_thread()
# 注册退出时清理
atexit.register(self.close_all)
def _start_cleanup_thread(self):
"""启动后台清理线程"""
def cleanup_worker():
while not self._cleanup_stop_event.wait(self.cleanup_interval):
if self._closed:
break
try:
self._cleanup_expired_connections()
except Exception as e:
logger.error(f"清理连接时出错: {e}")
self._cleanup_thread = _threading.Thread(target=cleanup_worker, daemon=True)
self._cleanup_thread.start()
logger.debug("后台清理线程已启动")
def _cleanup_expired_connections(self):
"""清理过期连接"""
current_time = time.time()
cleaned_count = 0
# 创建临时列表存放需要保留的连接
valid_connections = []
# 从池中取出所有连接进行检查
while True:
try:
conn = self._pool.get_nowait()
conn_id = id(conn)
created_at = self._created_at.get(conn_id, current_time)
last_used = self._last_used.get(conn_id, current_time)
# 检查是否过期
lifetime = current_time - created_at
idle_time = current_time - last_used
if lifetime > self.max_connection_lifetime:
# 连接存活时间过长,关闭
logger.debug(f"连接 {conn_id} 存活时间 {lifetime:.0f}s 超过限制,关闭")
self._close_connection(conn)
cleaned_count += 1
elif idle_time > self.health_check_interval * 3:
# 长时间空闲,进行健康检查
if not self._check_connection_health(conn):
self._close_connection(conn)
cleaned_count += 1
else:
valid_connections.append(conn)
else:
valid_connections.append(conn)
except Empty:
break
# 将有效连接放回池中
for conn in valid_connections:
try:
self._pool.put_nowait(conn)
except Full:
self._close_connection(conn)
if cleaned_count > 0:
logger.info(f"清理了 {cleaned_count} 个过期连接,当前活跃连接数: {self._active_connections}")
def _init_pool(self):
"""初始化连接池,预创建部分核心连接(非阻塞)"""
# 只预创建 2 个连接,其余按需创建
init_count = min(2, self.pool_size)
for i in range(init_count):
try:
conn = self._create_connection()
if conn:
self._pool.put(conn)
logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功")
except Exception as e:
logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}")
# 预创建失败不阻塞启动,后续按需创建
break
if self._initialized:
return
with self._lock:
if self._initialized:
return
# 只预创建 1 个连接,其余按需创建
init_count = min(1, self.pool_size)
for i in range(init_count):
try:
conn = self._create_connection()
if conn:
self._pool.put(conn)
logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功")
except Exception as e:
logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}")
# 预创建失败不阻塞启动,后续按需创建
break
self._initialized = True
def _create_connection(self):
"""创建新的 ClickHouse 连接"""
@@ -162,8 +274,9 @@ class ClickHouseConnectionPool:
}
)
conn_id = id(client)
self._created_at[conn_id] = time.time()
self._last_used[conn_id] = time.time()
current_time = time.time()
self._created_at[conn_id] = current_time
self._last_used[conn_id] = current_time
with self._lock:
self._active_connections += 1
logger.debug(f"创建新的 ClickHouse 连接: {conn_id}")
@@ -173,15 +286,23 @@ class ClickHouseConnectionPool:
raise
def _check_connection_health(self, conn):
"""检查连接健康状态"""
"""检查连接健康状态(带超时保护)"""
try:
conn_id = id(conn)
last_used = self._last_used.get(conn_id, 0)
created_at = self._created_at.get(conn_id, 0)
current_time = time.time()
# 检查连接是否存活时间过长
if current_time - created_at > self.max_connection_lifetime:
logger.debug(f"连接 {conn_id} 超过最大存活时间,标记为不健康")
return False
# 如果连接长时间未使用,进行健康检查
if time.time() - last_used > self.health_check_interval:
if current_time - last_used > self.health_check_interval:
# 执行简单查询检查连接
conn.execute("SELECT 1")
self._last_used[conn_id] = current_time
logger.debug(f"连接 {conn_id} 健康检查通过")
return True
@@ -191,9 +312,14 @@ class ClickHouseConnectionPool:
def _close_connection(self, conn):
"""关闭连接"""
if conn is None:
return
try:
conn_id = id(conn)
conn.disconnect()
try:
conn.disconnect()
except:
pass # 忽略断开连接时的错误
self._last_used.pop(conn_id, None)
self._created_at.pop(conn_id, None)
with self._lock:
@@ -216,34 +342,48 @@ class ClickHouseConnectionPool:
TimeoutError: 获取连接超时
Exception: 创建连接失败
"""
if self._closed:
raise RuntimeError("连接池已关闭")
# 延迟初始化
if not self._initialized:
self._init_pool()
timeout = timeout or self.connection_timeout
start_time = time.time()
# 首先尝试从池中获取连接
try:
conn = self._pool.get(block=True, timeout=timeout)
while True:
elapsed = time.time() - start_time
if elapsed >= timeout:
raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)")
# 检查连接健康状态
if self._check_connection_health(conn):
self._last_used[id(conn)] = time.time()
return conn
else:
# 连接不健康,关闭并创建新连接
self._close_connection(conn)
return self._create_connection()
remaining_timeout = timeout - elapsed
except Empty:
# 池中没有可用连接,检查是否可以创建新连接
with self._lock:
if self._active_connections < self.pool_size + self.max_overflow:
try:
return self._create_connection()
except Exception as e:
logger.error(f"创建溢出连接失败: {e}")
raise
# 首先尝试从池中获取连接
try:
conn = self._pool.get(block=True, timeout=min(remaining_timeout, 1.0))
# 已达到最大连接数,等待连接释放
logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})")
raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)")
# 检查连接健康状态
if self._check_connection_health(conn):
self._last_used[id(conn)] = time.time()
return conn
else:
# 连接不健康,关闭并尝试获取新连接
self._close_connection(conn)
continue
except Empty:
# 池中没有可用连接,检查是否可以创建新连接
with self._lock:
if self._active_connections < self.pool_size + self.max_overflow:
try:
return self._create_connection()
except Exception as e:
logger.error(f"创建溢出连接失败: {e}")
# 不立即抛出异常,继续等待
# 短暂等待后重试
time.sleep(0.1)
def release_connection(self, conn):
"""
@@ -255,7 +395,19 @@ class ClickHouseConnectionPool:
if conn is None:
return
if self._closed:
self._close_connection(conn)
return
conn_id = id(conn)
created_at = self._created_at.get(conn_id, 0)
# 如果连接存活时间过长,直接关闭而不放回池中
if time.time() - created_at > self.max_connection_lifetime:
logger.debug(f"连接 {conn_id} 超过最大存活时间,关闭而不放回池中")
self._close_connection(conn)
return
self._last_used[conn_id] = time.time()
try:
@@ -296,7 +448,7 @@ class ClickHouseConnectionPool:
def execute(self, query, params=None, timeout=None):
"""
执行查询(自动管理连接)
执行查询(自动管理连接,带重试机制
Args:
query: SQL 查询语句
@@ -306,8 +458,21 @@ class ClickHouseConnectionPool:
Returns:
查询结果
"""
with self.connection(timeout) as conn:
return conn.execute(query, params)
last_error = None
for retry in range(self.max_retries):
try:
with self.connection(timeout) as conn:
return conn.execute(query, params)
except (TimeoutError, RuntimeError) as e:
# 这些错误不应该重试
raise
except Exception as e:
last_error = e
logger.warning(f"查询执行失败 (重试 {retry + 1}/{self.max_retries}): {e}")
if retry < self.max_retries - 1:
time.sleep(0.5 * (retry + 1)) # 递增等待时间
raise last_error
def get_pool_status(self):
"""获取连接池状态"""
@@ -316,11 +481,22 @@ class ClickHouseConnectionPool:
'max_overflow': self.max_overflow,
'active_connections': self._active_connections,
'available_connections': self._pool.qsize(),
'max_connections': self.pool_size + self.max_overflow
'max_connections': self.pool_size + self.max_overflow,
'max_connection_lifetime': self.max_connection_lifetime,
'initialized': self._initialized,
'closed': self._closed
}
def close_all(self):
"""关闭所有连接"""
self._closed = True
self._cleanup_stop_event.set() # 停止清理线程
# 等待清理线程结束
if self._cleanup_thread and self._cleanup_thread.is_alive():
self._cleanup_thread.join(timeout=2)
# 关闭所有池中的连接
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
@@ -347,11 +523,14 @@ def _init_clickhouse_pool():
user='default',
password='Zzl33818!',
database='stock',
pool_size=5, # 减少预创建连接数
max_overflow=20, # 增加溢出连接数,总共支持 25 并发
connection_timeout=10, # 连接超时 10
query_timeout=30, # 查询超时 30 秒
health_check_interval=60 # 60 秒未使用的连接进行健康检查
pool_size=5, # 核心连接数
max_overflow=20, # 溢出连接数,总共支持 25 并发
connection_timeout=15, # 连接超时 15(增加容忍度)
query_timeout=60, # 查询超时 60 秒(给复杂查询更多时间)
health_check_interval=30, # 30 秒进行健康检查
max_connection_lifetime=300, # 连接最大存活 5 分钟(防止僵尸连接)
cleanup_interval=60, # 每 60 秒清理一次过期连接
max_retries=3 # 查询失败最多重试 3 次
)
return clickhouse_pool
# ===================== ClickHouse 连接池实现结束 =====================
@@ -6427,18 +6606,61 @@ def before_request_init():
ensure_sywg_cache_initialized()
# ===================== 应用启动配置 =====================
# 生产环境推荐使用 Gunicorn 启动(见下方命令)
#
# 【启动方式 1】使用 Gunicorn + gevent推荐:
# USE_GEVENT=true gunicorn -w 4 -k gevent --worker-connections 1000 \
# -b 0.0.0.0:5002 --timeout 120 --graceful-timeout 30 \
# --certfile=/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem \
# --keyfile=/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem \
# app_vx:app
#
# 【启动方式 2】使用 Gunicorn + 多进程(不使用 gevent:
# gunicorn -w 4 -b 0.0.0.0:5002 --timeout 120 --graceful-timeout 30 \
# --certfile=/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem \
# --keyfile=/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem \
# app_vx:app
#
# 【启动方式 3】开发环境直接运行仅限本地调试:
# python app_vx.py
#
# ===================== 应用启动配置结束 =====================
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='启动 Flask 应用')
parser.add_argument('--debug', action='store_true', help='启用调试模式(仅限开发环境)')
parser.add_argument('--port', type=int, default=5002, help='监听端口(默认 5002')
parser.add_argument('--no-ssl', action='store_true', help='禁用 SSL仅限开发环境')
args = parser.parse_args()
# 直接运行时,立即初始化缓存
with app.app_context():
init_sywg_industry_cache()
_sywg_cache_initialized = True
# 警告:生产环境不应使用 debug=True
if args.debug:
logger.warning("⚠️ 调试模式已启用,仅限开发环境使用!")
# SSL 配置
ssl_context = None
if not args.no_ssl:
cert_file = '/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem'
key_file = '/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem'
if os.path.exists(cert_file) and os.path.exists(key_file):
ssl_context = (cert_file, key_file)
else:
logger.warning("⚠️ SSL 证书文件不存在,将使用 HTTP 模式")
logger.info(f"🚀 启动 Flask 应用: port={args.port}, debug={args.debug}, ssl={'enabled' if ssl_context else 'disabled'}")
app.run(
host='0.0.0.0',
port=5002,
debug=True,
ssl_context=(
'/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem',
'/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem'
)
port=args.port,
debug=args.debug,
ssl_context=ssl_context,
threaded=True # 启用多线程处理请求
)

View File

@@ -1,33 +1,102 @@
# Gunicorn 配置文件
# -*- coding: utf-8 -*-
"""
Gunicorn 配置文件 - app_vx.py 生产环境配置
# 基本配置
bind = "0.0.0.0:5002"
workers = 4
threads = 4
使用方式:
# 方式1: 使用 gevent 异步模式(推荐,支持高并发)
USE_GEVENT=true gunicorn -c gunicorn_config.py app_vx:app
# 方式2: 使用同步多进程模式
gunicorn -c gunicorn_config.py app_vx:app
# 方式3: 使用 systemd 管理(见文件末尾 systemd 配置示例)
"""
import os
import multiprocessing
# ==================== 基础配置 ====================
# 绑定地址和端口
bind = '0.0.0.0:5002'
# Worker 进程数建议CPU 核心数 * 2 + 1
workers = min(multiprocessing.cpu_count() * 2 + 1, 8) # 最多 8 个 worker
# Worker 类型
# - 'sync': 同步模式(默认)
# - 'gevent': 异步模式(推荐用于 I/O 密集型应用)
# - 'gthread': 多线程模式
if os.environ.get('USE_GEVENT', 'false').lower() == 'true':
worker_class = 'gevent'
worker_connections = 1000 # gevent 模式下每个 worker 的最大并发连接数
else:
worker_class = 'gthread'
threads = 4 # gthread 模式下每个 worker 的线程数
# 每个 worker 处理的最大请求数,超过后重启(防止内存泄漏)
max_requests = 10000
max_requests_jitter = 1000 # 随机抖动,避免所有 worker 同时重启
# ==================== 超时配置 ====================
# Worker 超时时间(秒),超过后 worker 会被杀死重启
timeout = 120
worker_class = "gthread"
# SSL 配置(如需要
# certfile = "/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem"
# keyfile = "/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem"
# 优雅关闭超时时间(秒
graceful_timeout = 30
# 日志配置
loglevel = "info"
accesslog = "-"
errorlog = "-"
# 保持连接超时时间(秒)
keepalive = 5
# 预加载应用(在 fork 前加载,加快 worker 启动)
# ==================== SSL 配置 ====================
# SSL 证书路径(生产环境需要配置)
cert_file = '/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem'
key_file = '/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem'
if os.path.exists(cert_file) and os.path.exists(key_file):
certfile = cert_file
keyfile = key_file
# ==================== 日志配置 ====================
# 访问日志文件路径(- 表示输出到 stdout
accesslog = '-'
# 错误日志文件路径(- 表示输出到 stderr
errorlog = '-'
# 日志级别debug, info, warning, error, critical
loglevel = 'info'
# 访问日志格式
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# ==================== 进程管理 ====================
# 是否在后台运行daemon 模式)
daemon = False
# PID 文件路径
pidfile = '/tmp/gunicorn_app_vx.pid'
# 进程名称
proc_name = 'app_vx'
# ==================== 预加载配置 ====================
# 是否预加载应用代码(可以减少内存占用,但会增加启动时间)
preload_app = True
# ==================== Hook 函数 ====================
def on_starting(server):
"""主进程启动时调用"""
print("Gunicorn 主进程启动...")
def post_fork(server, worker):
"""Worker 进程 fork 后调用"""
print(f"Worker {worker.pid} 已启动")
"""服务器启动时调用"""
print(f"Gunicorn 服务器正在启动...")
print(f" Workers: {server.app.cfg.workers}")
print(f" Worker Class: {server.app.cfg.worker_class}")
print(f" Bind: {server.app.cfg.bind}")
def when_ready(server):
@@ -37,3 +106,69 @@ def when_ready(server):
with app.app_context():
init_sywg_industry_cache()
print("初始化完成!")
def on_reload(server):
"""服务器重载时调用"""
print("Gunicorn 服务器正在重载...")
def worker_int(worker):
"""Worker 收到 INT 或 QUIT 信号时调用"""
print(f"Worker {worker.pid} 收到中断信号")
def worker_abort(worker):
"""Worker 收到 SIGABRT 信号时调用(超时)"""
print(f"Worker {worker.pid} 超时被终止")
def post_fork(server, worker):
"""Worker 进程 fork 之后调用"""
print(f"Worker {worker.pid} 已启动")
def worker_exit(server, worker):
"""Worker 退出时调用"""
print(f"Worker {worker.pid} 已退出")
def on_exit(server):
"""服务器退出时调用"""
print("Gunicorn 服务器已关闭")
# ==================== systemd 配置示例 ====================
"""
将以下内容保存为 /etc/systemd/system/app_vx.service:
[Unit]
Description=Gunicorn instance to serve app_vx
After=network.target
[Service]
User=www-data
Group=www-data
WorkingDirectory=/path/to/vf_react
Environment="PATH=/path/to/venv/bin"
Environment="USE_GEVENT=true"
ExecStart=/path/to/venv/bin/gunicorn -c gunicorn_config.py app_vx:app
ExecReload=/bin/kill -s HUP $MAINPID
KillMode=mixed
TimeoutStopSec=5
PrivateTmp=true
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
启用服务:
sudo systemctl daemon-reload
sudo systemctl enable app_vx
sudo systemctl start app_vx
sudo systemctl status app_vx
查看日志:
sudo journalctl -u app_vx -f
"""