diff --git a/app_vx.py b/app_vx.py index b1491541..abeedc03 100644 --- a/app_vx.py +++ b/app_vx.py @@ -1,9 +1,24 @@ +# ===================== Gevent Monkey Patch (必须在最开头) ===================== +# 检测是否通过 gevent/gunicorn 运行,如果是则打 monkey patch +import os +import sys + +# 检查环境变量或命令行参数判断是否需要 gevent +_USE_GEVENT = os.environ.get('USE_GEVENT', 'false').lower() == 'true' +if _USE_GEVENT or 'gevent' in sys.modules: + try: + from gevent import monkey + monkey.patch_all() + print("✅ Gevent monkey patch 已应用") + except ImportError: + print("⚠️ Gevent 未安装,跳过 monkey patch") +# ===================== Gevent Monkey Patch 结束 ===================== + import csv import logging import random import re import math -import os import secrets import string @@ -74,20 +89,28 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level logger = logging.getLogger(__name__) -# ===================== ClickHouse 连接池实现 ===================== +# ===================== ClickHouse 连接池实现(增强版)===================== +import threading as _threading +import atexit + class ClickHouseConnectionPool: """ - ClickHouse 连接池 + ClickHouse 连接池(增强版) - 支持连接复用,避免频繁创建/销毁连接 - 支持连接超时和健康检查 - - 支持自动重连 + - 支持连接最大存活时间(防止僵尸连接) + - 支持后台自动清理过期连接 + - 支持自动重连和重试 - 线程安全 """ def __init__(self, host, port, user, password, database, pool_size=10, max_overflow=10, connection_timeout=10, query_timeout=30, - health_check_interval=60): + health_check_interval=60, + max_connection_lifetime=300, # 新增:连接最大存活时间(秒) + cleanup_interval=60, # 新增:清理间隔(秒) + max_retries=3): # 新增:最大重试次数 """ 初始化连接池 @@ -102,6 +125,9 @@ class ClickHouseConnectionPool: connection_timeout: 获取连接超时时间(秒) query_timeout: 查询超时时间(秒) health_check_interval: 健康检查间隔(秒) + max_connection_lifetime: 连接最大存活时间(秒),超过后自动关闭重建 + cleanup_interval: 后台清理线程间隔(秒) + max_retries: 查询失败时的最大重试次数 """ self.host = host self.port = port @@ -113,6 +139,9 @@ class ClickHouseConnectionPool: self.connection_timeout = connection_timeout self.query_timeout = query_timeout self.health_check_interval = health_check_interval + self.max_connection_lifetime = max_connection_lifetime + self.cleanup_interval = cleanup_interval + self.max_retries = max_retries # 连接池队列 self._pool = Queue(maxsize=pool_size + max_overflow) @@ -124,25 +153,108 @@ class ClickHouseConnectionPool: self._last_used = {} # 连接创建时间记录 self._created_at = {} + # 是否已关闭 + self._closed = False + # 清理线程 + self._cleanup_thread = None + self._cleanup_stop_event = _threading.Event() - # 初始化核心连接 - self._init_pool() - logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}") + # 初始化核心连接(延迟初始化,首次使用时创建) + self._initialized = False + logger.info(f"ClickHouse 连接池配置完成: pool_size={pool_size}, max_overflow={max_overflow}, " + f"max_lifetime={max_connection_lifetime}s") + + # 启动后台清理线程 + self._start_cleanup_thread() + + # 注册退出时清理 + atexit.register(self.close_all) + + def _start_cleanup_thread(self): + """启动后台清理线程""" + def cleanup_worker(): + while not self._cleanup_stop_event.wait(self.cleanup_interval): + if self._closed: + break + try: + self._cleanup_expired_connections() + except Exception as e: + logger.error(f"清理连接时出错: {e}") + + self._cleanup_thread = _threading.Thread(target=cleanup_worker, daemon=True) + self._cleanup_thread.start() + logger.debug("后台清理线程已启动") + + def _cleanup_expired_connections(self): + """清理过期连接""" + current_time = time.time() + cleaned_count = 0 + + # 创建临时列表存放需要保留的连接 + valid_connections = [] + + # 从池中取出所有连接进行检查 + while True: + try: + conn = self._pool.get_nowait() + conn_id = id(conn) + created_at = self._created_at.get(conn_id, current_time) + last_used = self._last_used.get(conn_id, current_time) + + # 检查是否过期 + lifetime = current_time - created_at + idle_time = current_time - last_used + + if lifetime > self.max_connection_lifetime: + # 连接存活时间过长,关闭 + logger.debug(f"连接 {conn_id} 存活时间 {lifetime:.0f}s 超过限制,关闭") + self._close_connection(conn) + cleaned_count += 1 + elif idle_time > self.health_check_interval * 3: + # 长时间空闲,进行健康检查 + if not self._check_connection_health(conn): + self._close_connection(conn) + cleaned_count += 1 + else: + valid_connections.append(conn) + else: + valid_connections.append(conn) + except Empty: + break + + # 将有效连接放回池中 + for conn in valid_connections: + try: + self._pool.put_nowait(conn) + except Full: + self._close_connection(conn) + + if cleaned_count > 0: + logger.info(f"清理了 {cleaned_count} 个过期连接,当前活跃连接数: {self._active_connections}") def _init_pool(self): """初始化连接池,预创建部分核心连接(非阻塞)""" - # 只预创建 2 个连接,其余按需创建 - init_count = min(2, self.pool_size) - for i in range(init_count): - try: - conn = self._create_connection() - if conn: - self._pool.put(conn) - logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功") - except Exception as e: - logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}") - # 预创建失败不阻塞启动,后续按需创建 - break + if self._initialized: + return + + with self._lock: + if self._initialized: + return + + # 只预创建 1 个连接,其余按需创建 + init_count = min(1, self.pool_size) + for i in range(init_count): + try: + conn = self._create_connection() + if conn: + self._pool.put(conn) + logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功") + except Exception as e: + logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}") + # 预创建失败不阻塞启动,后续按需创建 + break + + self._initialized = True def _create_connection(self): """创建新的 ClickHouse 连接""" @@ -162,8 +274,9 @@ class ClickHouseConnectionPool: } ) conn_id = id(client) - self._created_at[conn_id] = time.time() - self._last_used[conn_id] = time.time() + current_time = time.time() + self._created_at[conn_id] = current_time + self._last_used[conn_id] = current_time with self._lock: self._active_connections += 1 logger.debug(f"创建新的 ClickHouse 连接: {conn_id}") @@ -173,15 +286,23 @@ class ClickHouseConnectionPool: raise def _check_connection_health(self, conn): - """检查连接健康状态""" + """检查连接健康状态(带超时保护)""" try: conn_id = id(conn) last_used = self._last_used.get(conn_id, 0) + created_at = self._created_at.get(conn_id, 0) + current_time = time.time() + + # 检查连接是否存活时间过长 + if current_time - created_at > self.max_connection_lifetime: + logger.debug(f"连接 {conn_id} 超过最大存活时间,标记为不健康") + return False # 如果连接长时间未使用,进行健康检查 - if time.time() - last_used > self.health_check_interval: + if current_time - last_used > self.health_check_interval: # 执行简单查询检查连接 conn.execute("SELECT 1") + self._last_used[conn_id] = current_time logger.debug(f"连接 {conn_id} 健康检查通过") return True @@ -191,9 +312,14 @@ class ClickHouseConnectionPool: def _close_connection(self, conn): """关闭连接""" + if conn is None: + return try: conn_id = id(conn) - conn.disconnect() + try: + conn.disconnect() + except: + pass # 忽略断开连接时的错误 self._last_used.pop(conn_id, None) self._created_at.pop(conn_id, None) with self._lock: @@ -216,34 +342,48 @@ class ClickHouseConnectionPool: TimeoutError: 获取连接超时 Exception: 创建连接失败 """ + if self._closed: + raise RuntimeError("连接池已关闭") + + # 延迟初始化 + if not self._initialized: + self._init_pool() + timeout = timeout or self.connection_timeout + start_time = time.time() - # 首先尝试从池中获取连接 - try: - conn = self._pool.get(block=True, timeout=timeout) + while True: + elapsed = time.time() - start_time + if elapsed >= timeout: + raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)") - # 检查连接健康状态 - if self._check_connection_health(conn): - self._last_used[id(conn)] = time.time() - return conn - else: - # 连接不健康,关闭并创建新连接 - self._close_connection(conn) - return self._create_connection() + remaining_timeout = timeout - elapsed - except Empty: - # 池中没有可用连接,检查是否可以创建新连接 - with self._lock: - if self._active_connections < self.pool_size + self.max_overflow: - try: - return self._create_connection() - except Exception as e: - logger.error(f"创建溢出连接失败: {e}") - raise + # 首先尝试从池中获取连接 + try: + conn = self._pool.get(block=True, timeout=min(remaining_timeout, 1.0)) - # 已达到最大连接数,等待连接释放 - logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})") - raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)") + # 检查连接健康状态 + if self._check_connection_health(conn): + self._last_used[id(conn)] = time.time() + return conn + else: + # 连接不健康,关闭并尝试获取新连接 + self._close_connection(conn) + continue + + except Empty: + # 池中没有可用连接,检查是否可以创建新连接 + with self._lock: + if self._active_connections < self.pool_size + self.max_overflow: + try: + return self._create_connection() + except Exception as e: + logger.error(f"创建溢出连接失败: {e}") + # 不立即抛出异常,继续等待 + + # 短暂等待后重试 + time.sleep(0.1) def release_connection(self, conn): """ @@ -255,7 +395,19 @@ class ClickHouseConnectionPool: if conn is None: return + if self._closed: + self._close_connection(conn) + return + conn_id = id(conn) + created_at = self._created_at.get(conn_id, 0) + + # 如果连接存活时间过长,直接关闭而不放回池中 + if time.time() - created_at > self.max_connection_lifetime: + logger.debug(f"连接 {conn_id} 超过最大存活时间,关闭而不放回池中") + self._close_connection(conn) + return + self._last_used[conn_id] = time.time() try: @@ -296,7 +448,7 @@ class ClickHouseConnectionPool: def execute(self, query, params=None, timeout=None): """ - 执行查询(自动管理连接) + 执行查询(自动管理连接,带重试机制) Args: query: SQL 查询语句 @@ -306,8 +458,21 @@ class ClickHouseConnectionPool: Returns: 查询结果 """ - with self.connection(timeout) as conn: - return conn.execute(query, params) + last_error = None + for retry in range(self.max_retries): + try: + with self.connection(timeout) as conn: + return conn.execute(query, params) + except (TimeoutError, RuntimeError) as e: + # 这些错误不应该重试 + raise + except Exception as e: + last_error = e + logger.warning(f"查询执行失败 (重试 {retry + 1}/{self.max_retries}): {e}") + if retry < self.max_retries - 1: + time.sleep(0.5 * (retry + 1)) # 递增等待时间 + + raise last_error def get_pool_status(self): """获取连接池状态""" @@ -316,11 +481,22 @@ class ClickHouseConnectionPool: 'max_overflow': self.max_overflow, 'active_connections': self._active_connections, 'available_connections': self._pool.qsize(), - 'max_connections': self.pool_size + self.max_overflow + 'max_connections': self.pool_size + self.max_overflow, + 'max_connection_lifetime': self.max_connection_lifetime, + 'initialized': self._initialized, + 'closed': self._closed } def close_all(self): """关闭所有连接""" + self._closed = True + self._cleanup_stop_event.set() # 停止清理线程 + + # 等待清理线程结束 + if self._cleanup_thread and self._cleanup_thread.is_alive(): + self._cleanup_thread.join(timeout=2) + + # 关闭所有池中的连接 while not self._pool.empty(): try: conn = self._pool.get_nowait() @@ -347,11 +523,14 @@ def _init_clickhouse_pool(): user='default', password='Zzl33818!', database='stock', - pool_size=5, # 减少预创建连接数 - max_overflow=20, # 增加溢出连接数,总共支持 25 并发 - connection_timeout=10, # 连接超时 10 秒 - query_timeout=30, # 查询超时 30 秒 - health_check_interval=60 # 60 秒未使用的连接进行健康检查 + pool_size=5, # 核心连接数 + max_overflow=20, # 溢出连接数,总共支持 25 并发 + connection_timeout=15, # 连接超时 15 秒(增加容忍度) + query_timeout=60, # 查询超时 60 秒(给复杂查询更多时间) + health_check_interval=30, # 30 秒进行健康检查 + max_connection_lifetime=300, # 连接最大存活 5 分钟(防止僵尸连接) + cleanup_interval=60, # 每 60 秒清理一次过期连接 + max_retries=3 # 查询失败最多重试 3 次 ) return clickhouse_pool # ===================== ClickHouse 连接池实现结束 ===================== @@ -6427,18 +6606,61 @@ def before_request_init(): ensure_sywg_cache_initialized() +# ===================== 应用启动配置 ===================== +# 生产环境推荐使用 Gunicorn 启动(见下方命令) +# +# 【启动方式 1】使用 Gunicorn + gevent(推荐): +# USE_GEVENT=true gunicorn -w 4 -k gevent --worker-connections 1000 \ +# -b 0.0.0.0:5002 --timeout 120 --graceful-timeout 30 \ +# --certfile=/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem \ +# --keyfile=/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem \ +# app_vx:app +# +# 【启动方式 2】使用 Gunicorn + 多进程(不使用 gevent): +# gunicorn -w 4 -b 0.0.0.0:5002 --timeout 120 --graceful-timeout 30 \ +# --certfile=/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem \ +# --keyfile=/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem \ +# app_vx:app +# +# 【启动方式 3】开发环境直接运行(仅限本地调试): +# python app_vx.py +# +# ===================== 应用启动配置结束 ===================== + if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='启动 Flask 应用') + parser.add_argument('--debug', action='store_true', help='启用调试模式(仅限开发环境)') + parser.add_argument('--port', type=int, default=5002, help='监听端口(默认 5002)') + parser.add_argument('--no-ssl', action='store_true', help='禁用 SSL(仅限开发环境)') + args = parser.parse_args() + # 直接运行时,立即初始化缓存 with app.app_context(): init_sywg_industry_cache() _sywg_cache_initialized = True + # 警告:生产环境不应使用 debug=True + if args.debug: + logger.warning("⚠️ 调试模式已启用,仅限开发环境使用!") + + # SSL 配置 + ssl_context = None + if not args.no_ssl: + cert_file = '/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem' + key_file = '/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem' + if os.path.exists(cert_file) and os.path.exists(key_file): + ssl_context = (cert_file, key_file) + else: + logger.warning("⚠️ SSL 证书文件不存在,将使用 HTTP 模式") + + logger.info(f"🚀 启动 Flask 应用: port={args.port}, debug={args.debug}, ssl={'enabled' if ssl_context else 'disabled'}") + app.run( host='0.0.0.0', - port=5002, - debug=True, - ssl_context=( - '/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem', - '/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem' - ) + port=args.port, + debug=args.debug, + ssl_context=ssl_context, + threaded=True # 启用多线程处理请求 ) diff --git a/gunicorn_config.py b/gunicorn_config.py index 13738889..2bf3e1ea 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -1,33 +1,102 @@ -# Gunicorn 配置文件 +# -*- coding: utf-8 -*- +""" +Gunicorn 配置文件 - app_vx.py 生产环境配置 -# 基本配置 -bind = "0.0.0.0:5002" -workers = 4 -threads = 4 +使用方式: + # 方式1: 使用 gevent 异步模式(推荐,支持高并发) + USE_GEVENT=true gunicorn -c gunicorn_config.py app_vx:app + + # 方式2: 使用同步多进程模式 + gunicorn -c gunicorn_config.py app_vx:app + + # 方式3: 使用 systemd 管理(见文件末尾 systemd 配置示例) +""" + +import os +import multiprocessing + +# ==================== 基础配置 ==================== + +# 绑定地址和端口 +bind = '0.0.0.0:5002' + +# Worker 进程数(建议:CPU 核心数 * 2 + 1) +workers = min(multiprocessing.cpu_count() * 2 + 1, 8) # 最多 8 个 worker + +# Worker 类型 +# - 'sync': 同步模式(默认) +# - 'gevent': 异步模式(推荐用于 I/O 密集型应用) +# - 'gthread': 多线程模式 +if os.environ.get('USE_GEVENT', 'false').lower() == 'true': + worker_class = 'gevent' + worker_connections = 1000 # gevent 模式下每个 worker 的最大并发连接数 +else: + worker_class = 'gthread' + threads = 4 # gthread 模式下每个 worker 的线程数 + +# 每个 worker 处理的最大请求数,超过后重启(防止内存泄漏) +max_requests = 10000 +max_requests_jitter = 1000 # 随机抖动,避免所有 worker 同时重启 + +# ==================== 超时配置 ==================== + +# Worker 超时时间(秒),超过后 worker 会被杀死重启 timeout = 120 -worker_class = "gthread" -# SSL 配置(如需要) -# certfile = "/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem" -# keyfile = "/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem" +# 优雅关闭超时时间(秒) +graceful_timeout = 30 -# 日志配置 -loglevel = "info" -accesslog = "-" -errorlog = "-" +# 保持连接超时时间(秒) +keepalive = 5 -# 预加载应用(在 fork 前加载,加快 worker 启动) +# ==================== SSL 配置 ==================== + +# SSL 证书路径(生产环境需要配置) +cert_file = '/etc/letsencrypt/live/api.valuefrontier.cn/fullchain.pem' +key_file = '/etc/letsencrypt/live/api.valuefrontier.cn/privkey.pem' + +if os.path.exists(cert_file) and os.path.exists(key_file): + certfile = cert_file + keyfile = key_file + +# ==================== 日志配置 ==================== + +# 访问日志文件路径(- 表示输出到 stdout) +accesslog = '-' + +# 错误日志文件路径(- 表示输出到 stderr) +errorlog = '-' + +# 日志级别:debug, info, warning, error, critical +loglevel = 'info' + +# 访问日志格式 +access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s' + +# ==================== 进程管理 ==================== + +# 是否在后台运行(daemon 模式) +daemon = False + +# PID 文件路径 +pidfile = '/tmp/gunicorn_app_vx.pid' + +# 进程名称 +proc_name = 'app_vx' + +# ==================== 预加载配置 ==================== + +# 是否预加载应用代码(可以减少内存占用,但会增加启动时间) preload_app = True +# ==================== Hook 函数 ==================== def on_starting(server): - """主进程启动时调用""" - print("Gunicorn 主进程启动...") - - -def post_fork(server, worker): - """Worker 进程 fork 后调用""" - print(f"Worker {worker.pid} 已启动") + """服务器启动时调用""" + print(f"Gunicorn 服务器正在启动...") + print(f" Workers: {server.app.cfg.workers}") + print(f" Worker Class: {server.app.cfg.worker_class}") + print(f" Bind: {server.app.cfg.bind}") def when_ready(server): @@ -37,3 +106,69 @@ def when_ready(server): with app.app_context(): init_sywg_industry_cache() print("初始化完成!") + + +def on_reload(server): + """服务器重载时调用""" + print("Gunicorn 服务器正在重载...") + + +def worker_int(worker): + """Worker 收到 INT 或 QUIT 信号时调用""" + print(f"Worker {worker.pid} 收到中断信号") + + +def worker_abort(worker): + """Worker 收到 SIGABRT 信号时调用(超时)""" + print(f"Worker {worker.pid} 超时被终止") + + +def post_fork(server, worker): + """Worker 进程 fork 之后调用""" + print(f"Worker {worker.pid} 已启动") + + +def worker_exit(server, worker): + """Worker 退出时调用""" + print(f"Worker {worker.pid} 已退出") + + +def on_exit(server): + """服务器退出时调用""" + print("Gunicorn 服务器已关闭") + + +# ==================== systemd 配置示例 ==================== +""" +将以下内容保存为 /etc/systemd/system/app_vx.service: + +[Unit] +Description=Gunicorn instance to serve app_vx +After=network.target + +[Service] +User=www-data +Group=www-data +WorkingDirectory=/path/to/vf_react +Environment="PATH=/path/to/venv/bin" +Environment="USE_GEVENT=true" +ExecStart=/path/to/venv/bin/gunicorn -c gunicorn_config.py app_vx:app +ExecReload=/bin/kill -s HUP $MAINPID +KillMode=mixed +TimeoutStopSec=5 +PrivateTmp=true +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target + +启用服务: + sudo systemctl daemon-reload + sudo systemctl enable app_vx + sudo systemctl start app_vx + sudo systemctl status app_vx + +查看日志: + sudo journalctl -u app_vx -f +"""