From 007de2d76d7b67e3a986f7a100295b2dc63f6bff Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Fri, 28 Nov 2025 09:45:36 +0800 Subject: [PATCH] update pay function --- app_vx.py | 40 +++++++++++++++++++++++++--------------- gunicorn_config.py | 40 ++++++++++++++++------------------------ 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/app_vx.py b/app_vx.py index abeedc03..027cc618 100644 --- a/app_vx.py +++ b/app_vx.py @@ -161,29 +161,36 @@ class ClickHouseConnectionPool: # 初始化核心连接(延迟初始化,首次使用时创建) self._initialized = False + # 清理线程也延迟启动(避免 fork 前启动线程导致问题) + self._cleanup_thread_started = False logger.info(f"ClickHouse 连接池配置完成: pool_size={pool_size}, max_overflow={max_overflow}, " f"max_lifetime={max_connection_lifetime}s") - # 启动后台清理线程 - self._start_cleanup_thread() - # 注册退出时清理 atexit.register(self.close_all) def _start_cleanup_thread(self): - """启动后台清理线程""" - def cleanup_worker(): - while not self._cleanup_stop_event.wait(self.cleanup_interval): - if self._closed: - break - try: - self._cleanup_expired_connections() - except Exception as e: - logger.error(f"清理连接时出错: {e}") + """启动后台清理线程(延迟启动,首次使用连接池时调用)""" + if self._cleanup_thread_started or self._closed: + return - self._cleanup_thread = _threading.Thread(target=cleanup_worker, daemon=True) - self._cleanup_thread.start() - logger.debug("后台清理线程已启动") + with self._lock: + if self._cleanup_thread_started or self._closed: + return + + def cleanup_worker(): + while not self._cleanup_stop_event.wait(self.cleanup_interval): + if self._closed: + break + try: + self._cleanup_expired_connections() + except Exception as e: + logger.error(f"清理连接时出错: {e}") + + self._cleanup_thread = _threading.Thread(target=cleanup_worker, daemon=True, name="CH-Pool-Cleanup") + self._cleanup_thread.start() + self._cleanup_thread_started = True + logger.debug("后台清理线程已启动") def _cleanup_expired_connections(self): """清理过期连接""" @@ -241,6 +248,9 @@ class ClickHouseConnectionPool: if self._initialized: return + # 启动清理线程(延迟到首次使用时启动) + self._start_cleanup_thread() + # 只预创建 1 个连接,其余按需创建 init_count = min(1, self.pool_size) for i in range(init_count): diff --git a/gunicorn_config.py b/gunicorn_config.py index 2bf3e1ea..0bb20055 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -4,9 +4,9 @@ Gunicorn 配置文件 - app_vx.py 生产环境配置 使用方式: # 方式1: 使用 gevent 异步模式(推荐,支持高并发) - USE_GEVENT=true gunicorn -c gunicorn_config.py app_vx:app + gunicorn -c gunicorn_config.py -k gevent app_vx:app - # 方式2: 使用同步多进程模式 + # 方式2: 使用同步多进程模式(更稳定) gunicorn -c gunicorn_config.py app_vx:app # 方式3: 使用 systemd 管理(见文件末尾 systemd 配置示例) @@ -20,23 +20,16 @@ import multiprocessing # 绑定地址和端口 bind = '0.0.0.0:5002' -# Worker 进程数(建议:CPU 核心数 * 2 + 1) -workers = min(multiprocessing.cpu_count() * 2 + 1, 8) # 最多 8 个 worker +# Worker 进程数(建议 2-4 个,不要太多以避免连接池竞争) +workers = 4 -# Worker 类型 -# - 'sync': 同步模式(默认) -# - 'gevent': 异步模式(推荐用于 I/O 密集型应用) -# - 'gthread': 多线程模式 -if os.environ.get('USE_GEVENT', 'false').lower() == 'true': - worker_class = 'gevent' - worker_connections = 1000 # gevent 模式下每个 worker 的最大并发连接数 -else: - worker_class = 'gthread' - threads = 4 # gthread 模式下每个 worker 的线程数 +# Worker 类型 - 默认使用 sync 模式,更稳定 +# 如果需要 gevent,在命令行添加 -k gevent +worker_class = 'sync' # 每个 worker 处理的最大请求数,超过后重启(防止内存泄漏) -max_requests = 10000 -max_requests_jitter = 1000 # 随机抖动,避免所有 worker 同时重启 +max_requests = 5000 +max_requests_jitter = 500 # 随机抖动,避免所有 worker 同时重启 # ==================== 超时配置 ==================== @@ -86,8 +79,10 @@ proc_name = 'app_vx' # ==================== 预加载配置 ==================== -# 是否预加载应用代码(可以减少内存占用,但会增加启动时间) -preload_app = True +# 是否预加载应用代码 +# 重要:设为 False 以确保每个 worker 有独立的连接池实例 +# 否则多个 worker 共享同一个连接池会导致竞争和超时 +preload_app = False # ==================== Hook 函数 ==================== @@ -100,12 +95,9 @@ def on_starting(server): def when_ready(server): - """服务准备就绪时调用,初始化缓存""" - print("Gunicorn 服务准备就绪,开始初始化...") - from app_vx import app, init_sywg_industry_cache - with app.app_context(): - init_sywg_industry_cache() - print("初始化完成!") + """服务准备就绪时调用""" + print("Gunicorn 服务准备就绪!") + print("注意: 缓存将在首次请求时懒加载初始化") def on_reload(server):