diff --git a/app_vx.py b/app_vx.py index be0bd87b..ddff76e9 100644 --- a/app_vx.py +++ b/app_vx.py @@ -55,6 +55,9 @@ from datetime import datetime, timedelta, time as dt_time from werkzeug.security import generate_password_hash, check_password_hash import json from clickhouse_driver import Client as Cclient +from queue import Queue, Empty, Full +from threading import Lock, RLock +from contextlib import contextmanager import jwt from docx import Document from tencentcloud.common import credential @@ -69,6 +72,273 @@ engine_med = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060 engine_2 = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060/valuefrontier", echo=False) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) + + +# ===================== ClickHouse 连接池实现 ===================== +class ClickHouseConnectionPool: + """ + ClickHouse 连接池 + - 支持连接复用,避免频繁创建/销毁连接 + - 支持连接超时和健康检查 + - 支持自动重连 + - 线程安全 + """ + + def __init__(self, host, port, user, password, database, + pool_size=10, max_overflow=10, + connection_timeout=10, query_timeout=30, + health_check_interval=60): + """ + 初始化连接池 + + Args: + host: ClickHouse 主机地址 + port: ClickHouse 端口 + user: 用户名 + password: 密码 + database: 数据库名 + pool_size: 连接池核心大小(预创建连接数) + max_overflow: 最大溢出连接数(总连接数 = pool_size + max_overflow) + connection_timeout: 获取连接超时时间(秒) + query_timeout: 查询超时时间(秒) + health_check_interval: 健康检查间隔(秒) + """ + self.host = host + self.port = port + self.user = user + self.password = password + self.database = database + self.pool_size = pool_size + self.max_overflow = max_overflow + self.connection_timeout = connection_timeout + self.query_timeout = query_timeout + self.health_check_interval = health_check_interval + + # 连接池队列 + self._pool = Queue(maxsize=pool_size + max_overflow) + # 当前活跃连接数 + self._active_connections = 0 + # 锁 + self._lock = RLock() + # 连接最后使用时间记录 + self._last_used = {} + # 连接创建时间记录 + self._created_at = {} + + # 初始化核心连接 + self._init_pool() + logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}") + + def _init_pool(self): + """初始化连接池,预创建核心连接""" + for _ in range(self.pool_size): + try: + conn = self._create_connection() + if conn: + self._pool.put(conn) + except Exception as e: + logger.warning(f"预创建 ClickHouse 连接失败: {e}") + + def _create_connection(self): + """创建新的 ClickHouse 连接""" + try: + client = Cclient( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + connect_timeout=self.connection_timeout, + send_receive_timeout=self.query_timeout, + sync_request_timeout=self.query_timeout, + settings={ + 'max_execution_time': self.query_timeout, + 'connect_timeout': self.connection_timeout, + } + ) + conn_id = id(client) + self._created_at[conn_id] = time.time() + self._last_used[conn_id] = time.time() + with self._lock: + self._active_connections += 1 + logger.debug(f"创建新的 ClickHouse 连接: {conn_id}") + return client + except Exception as e: + logger.error(f"创建 ClickHouse 连接失败: {e}") + raise + + def _check_connection_health(self, conn): + """检查连接健康状态""" + try: + conn_id = id(conn) + last_used = self._last_used.get(conn_id, 0) + + # 如果连接长时间未使用,进行健康检查 + if time.time() - last_used > self.health_check_interval: + # 执行简单查询检查连接 + conn.execute("SELECT 1") + logger.debug(f"连接 {conn_id} 健康检查通过") + + return True + except Exception as e: + logger.warning(f"连接健康检查失败: {e}") + return False + + def _close_connection(self, conn): + """关闭连接""" + try: + conn_id = id(conn) + conn.disconnect() + self._last_used.pop(conn_id, None) + self._created_at.pop(conn_id, None) + with self._lock: + self._active_connections = max(0, self._active_connections - 1) + logger.debug(f"关闭 ClickHouse 连接: {conn_id}") + except Exception as e: + logger.warning(f"关闭连接时出错: {e}") + + def get_connection(self, timeout=None): + """ + 从连接池获取连接 + + Args: + timeout: 获取连接的超时时间,默认使用 connection_timeout + + Returns: + ClickHouse 客户端连接 + + Raises: + TimeoutError: 获取连接超时 + Exception: 创建连接失败 + """ + timeout = timeout or self.connection_timeout + + # 首先尝试从池中获取连接 + try: + conn = self._pool.get(block=True, timeout=timeout) + + # 检查连接健康状态 + if self._check_connection_health(conn): + self._last_used[id(conn)] = time.time() + return conn + else: + # 连接不健康,关闭并创建新连接 + self._close_connection(conn) + return self._create_connection() + + except Empty: + # 池中没有可用连接,检查是否可以创建新连接 + with self._lock: + if self._active_connections < self.pool_size + self.max_overflow: + try: + return self._create_connection() + except Exception as e: + logger.error(f"创建溢出连接失败: {e}") + raise + + # 已达到最大连接数,等待连接释放 + logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})") + raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)") + + def release_connection(self, conn): + """ + 释放连接回连接池 + + Args: + conn: 要释放的连接 + """ + if conn is None: + return + + conn_id = id(conn) + self._last_used[conn_id] = time.time() + + try: + self._pool.put(conn, block=False) + logger.debug(f"连接 {conn_id} 已释放回连接池") + except Full: + # 池已满,关闭多余连接 + logger.debug(f"连接池已满,关闭多余连接: {conn_id}") + self._close_connection(conn) + + @contextmanager + def connection(self, timeout=None): + """ + 上下文管理器方式获取连接 + + Usage: + with pool.connection() as conn: + result = conn.execute("SELECT * FROM table") + """ + conn = None + try: + conn = self.get_connection(timeout) + yield conn + except Exception as e: + # 发生异常时,检查连接是否需要重建 + if conn: + try: + # 尝试简单查询检测连接状态 + conn.execute("SELECT 1") + except: + # 连接已损坏,关闭它 + self._close_connection(conn) + conn = None + raise + finally: + if conn: + self.release_connection(conn) + + def execute(self, query, params=None, timeout=None): + """ + 执行查询(自动管理连接) + + Args: + query: SQL 查询语句 + params: 查询参数 + timeout: 查询超时时间 + + Returns: + 查询结果 + """ + with self.connection(timeout) as conn: + return conn.execute(query, params) + + def get_pool_status(self): + """获取连接池状态""" + return { + 'pool_size': self.pool_size, + 'max_overflow': self.max_overflow, + 'active_connections': self._active_connections, + 'available_connections': self._pool.qsize(), + 'max_connections': self.pool_size + self.max_overflow + } + + def close_all(self): + """关闭所有连接""" + while not self._pool.empty(): + try: + conn = self._pool.get_nowait() + self._close_connection(conn) + except Empty: + break + logger.info("ClickHouse 连接池已关闭所有连接") + + +# 初始化全局 ClickHouse 连接池 +clickhouse_pool = ClickHouseConnectionPool( + host='222.128.1.157', + port=18000, + user='default', + password='Zzl33818!', + database='stock', + pool_size=10, # 核心连接数 + max_overflow=15, # 最大溢出连接数,总共支持 25 并发 + connection_timeout=10, # 连接超时 10 秒 + query_timeout=30, # 查询超时 30 秒 + health_check_interval=60 # 60 秒未使用的连接进行健康检查 +) +# ===================== ClickHouse 连接池实现结束 ===================== app = Flask(__name__) Compress(app) UPLOAD_FOLDER = 'static/uploads/avatars' @@ -1150,13 +1420,39 @@ def update_investment_preferences(): def get_clickhouse_client(): - return Cclient( - host='222.128.1.157', - port=18000, - user='default', - password='Zzl33818!', - database='stock' - ) + """ + 获取 ClickHouse 客户端(使用连接池) + + 返回连接池对象,支持两种使用方式: + + 方式1(推荐)- 直接调用 execute: + client = get_clickhouse_client() + result = client.execute("SELECT * FROM table", {'param': value}) + + 方式2 - 使用上下文管理器: + client = get_clickhouse_client() + with client.connection() as conn: + result = conn.execute("SELECT * FROM table") + """ + return clickhouse_pool + + +@app.route('/api/system/clickhouse-pool-status', methods=['GET']) +def api_clickhouse_pool_status(): + """获取 ClickHouse 连接池状态(仅供监控使用)""" + try: + status = clickhouse_pool.get_pool_status() + return jsonify({ + 'code': 200, + 'message': 'success', + 'data': status + }) + except Exception as e: + return jsonify({ + 'code': 500, + 'message': str(e), + 'data': None + }), 500 @app.route('/api/stock//kline')