From 734591d677e2df9a1fd056000acde19cc7fc11cd Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Wed, 26 Nov 2025 08:44:22 +0800 Subject: [PATCH 1/3] update pay function --- app_vx.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app_vx.py b/app_vx.py index 9429a92a..be0bd87b 100644 --- a/app_vx.py +++ b/app_vx.py @@ -1292,6 +1292,7 @@ def get_daily_kline(stock_code, event_datetime, stock_name): 'name': stock_name, 'data': kline_data, 'trade_date': event_datetime.date().strftime('%Y-%m-%d'), + 'event_time': event_datetime.isoformat(), 'type': 'daily', 'is_history': True, 'data_count': len(kline_data) @@ -4120,7 +4121,9 @@ def api_stock_detail(event_id, stock_code): 'event_info': { 'event_id': event.id, 'event_title': event.title, - 'event_description': event.description + 'event_description': event.description, + 'event_start_time': event.start_time.isoformat() if event.start_time else None, + 'event_created_at': event.created_at.strftime("%Y-%m-%d %H:%M:%S") if event.created_at else None }, 'basic_info': { 'stock_code': basic_info.SECCODE, From 1b93644f1dbc0350bcf3023e3004975c6b9eb8bb Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Wed, 26 Nov 2025 09:00:38 +0800 Subject: [PATCH 2/3] update pay function --- app_vx.py | 310 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 303 insertions(+), 7 deletions(-) diff --git a/app_vx.py b/app_vx.py index be0bd87b..ddff76e9 100644 --- a/app_vx.py +++ b/app_vx.py @@ -55,6 +55,9 @@ from datetime import datetime, timedelta, time as dt_time from werkzeug.security import generate_password_hash, check_password_hash import json from clickhouse_driver import Client as Cclient +from queue import Queue, Empty, Full +from threading import Lock, RLock +from contextlib import contextmanager import jwt from docx import Document from tencentcloud.common import credential @@ -69,6 +72,273 @@ engine_med = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060 engine_2 = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060/valuefrontier", echo=False) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) + + +# ===================== ClickHouse 连接池实现 ===================== +class ClickHouseConnectionPool: + """ + ClickHouse 连接池 + - 支持连接复用,避免频繁创建/销毁连接 + - 支持连接超时和健康检查 + - 支持自动重连 + - 线程安全 + """ + + def __init__(self, host, port, user, password, database, + pool_size=10, max_overflow=10, + connection_timeout=10, query_timeout=30, + health_check_interval=60): + """ + 初始化连接池 + + Args: + host: ClickHouse 主机地址 + port: ClickHouse 端口 + user: 用户名 + password: 密码 + database: 数据库名 + pool_size: 连接池核心大小(预创建连接数) + max_overflow: 最大溢出连接数(总连接数 = pool_size + max_overflow) + connection_timeout: 获取连接超时时间(秒) + query_timeout: 查询超时时间(秒) + health_check_interval: 健康检查间隔(秒) + """ + self.host = host + self.port = port + self.user = user + self.password = password + self.database = database + self.pool_size = pool_size + self.max_overflow = max_overflow + self.connection_timeout = connection_timeout + self.query_timeout = query_timeout + self.health_check_interval = health_check_interval + + # 连接池队列 + self._pool = Queue(maxsize=pool_size + max_overflow) + # 当前活跃连接数 + self._active_connections = 0 + # 锁 + self._lock = RLock() + # 连接最后使用时间记录 + self._last_used = {} + # 连接创建时间记录 + self._created_at = {} + + # 初始化核心连接 + self._init_pool() + logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}") + + def _init_pool(self): + """初始化连接池,预创建核心连接""" + for _ in range(self.pool_size): + try: + conn = self._create_connection() + if conn: + self._pool.put(conn) + except Exception as e: + logger.warning(f"预创建 ClickHouse 连接失败: {e}") + + def _create_connection(self): + """创建新的 ClickHouse 连接""" + try: + client = Cclient( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + connect_timeout=self.connection_timeout, + send_receive_timeout=self.query_timeout, + sync_request_timeout=self.query_timeout, + settings={ + 'max_execution_time': self.query_timeout, + 'connect_timeout': self.connection_timeout, + } + ) + conn_id = id(client) + self._created_at[conn_id] = time.time() + self._last_used[conn_id] = time.time() + with self._lock: + self._active_connections += 1 + logger.debug(f"创建新的 ClickHouse 连接: {conn_id}") + return client + except Exception as e: + logger.error(f"创建 ClickHouse 连接失败: {e}") + raise + + def _check_connection_health(self, conn): + """检查连接健康状态""" + try: + conn_id = id(conn) + last_used = self._last_used.get(conn_id, 0) + + # 如果连接长时间未使用,进行健康检查 + if time.time() - last_used > self.health_check_interval: + # 执行简单查询检查连接 + conn.execute("SELECT 1") + logger.debug(f"连接 {conn_id} 健康检查通过") + + return True + except Exception as e: + logger.warning(f"连接健康检查失败: {e}") + return False + + def _close_connection(self, conn): + """关闭连接""" + try: + conn_id = id(conn) + conn.disconnect() + self._last_used.pop(conn_id, None) + self._created_at.pop(conn_id, None) + with self._lock: + self._active_connections = max(0, self._active_connections - 1) + logger.debug(f"关闭 ClickHouse 连接: {conn_id}") + except Exception as e: + logger.warning(f"关闭连接时出错: {e}") + + def get_connection(self, timeout=None): + """ + 从连接池获取连接 + + Args: + timeout: 获取连接的超时时间,默认使用 connection_timeout + + Returns: + ClickHouse 客户端连接 + + Raises: + TimeoutError: 获取连接超时 + Exception: 创建连接失败 + """ + timeout = timeout or self.connection_timeout + + # 首先尝试从池中获取连接 + try: + conn = self._pool.get(block=True, timeout=timeout) + + # 检查连接健康状态 + if self._check_connection_health(conn): + self._last_used[id(conn)] = time.time() + return conn + else: + # 连接不健康,关闭并创建新连接 + self._close_connection(conn) + return self._create_connection() + + except Empty: + # 池中没有可用连接,检查是否可以创建新连接 + with self._lock: + if self._active_connections < self.pool_size + self.max_overflow: + try: + return self._create_connection() + except Exception as e: + logger.error(f"创建溢出连接失败: {e}") + raise + + # 已达到最大连接数,等待连接释放 + logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})") + raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)") + + def release_connection(self, conn): + """ + 释放连接回连接池 + + Args: + conn: 要释放的连接 + """ + if conn is None: + return + + conn_id = id(conn) + self._last_used[conn_id] = time.time() + + try: + self._pool.put(conn, block=False) + logger.debug(f"连接 {conn_id} 已释放回连接池") + except Full: + # 池已满,关闭多余连接 + logger.debug(f"连接池已满,关闭多余连接: {conn_id}") + self._close_connection(conn) + + @contextmanager + def connection(self, timeout=None): + """ + 上下文管理器方式获取连接 + + Usage: + with pool.connection() as conn: + result = conn.execute("SELECT * FROM table") + """ + conn = None + try: + conn = self.get_connection(timeout) + yield conn + except Exception as e: + # 发生异常时,检查连接是否需要重建 + if conn: + try: + # 尝试简单查询检测连接状态 + conn.execute("SELECT 1") + except: + # 连接已损坏,关闭它 + self._close_connection(conn) + conn = None + raise + finally: + if conn: + self.release_connection(conn) + + def execute(self, query, params=None, timeout=None): + """ + 执行查询(自动管理连接) + + Args: + query: SQL 查询语句 + params: 查询参数 + timeout: 查询超时时间 + + Returns: + 查询结果 + """ + with self.connection(timeout) as conn: + return conn.execute(query, params) + + def get_pool_status(self): + """获取连接池状态""" + return { + 'pool_size': self.pool_size, + 'max_overflow': self.max_overflow, + 'active_connections': self._active_connections, + 'available_connections': self._pool.qsize(), + 'max_connections': self.pool_size + self.max_overflow + } + + def close_all(self): + """关闭所有连接""" + while not self._pool.empty(): + try: + conn = self._pool.get_nowait() + self._close_connection(conn) + except Empty: + break + logger.info("ClickHouse 连接池已关闭所有连接") + + +# 初始化全局 ClickHouse 连接池 +clickhouse_pool = ClickHouseConnectionPool( + host='222.128.1.157', + port=18000, + user='default', + password='Zzl33818!', + database='stock', + pool_size=10, # 核心连接数 + max_overflow=15, # 最大溢出连接数,总共支持 25 并发 + connection_timeout=10, # 连接超时 10 秒 + query_timeout=30, # 查询超时 30 秒 + health_check_interval=60 # 60 秒未使用的连接进行健康检查 +) +# ===================== ClickHouse 连接池实现结束 ===================== app = Flask(__name__) Compress(app) UPLOAD_FOLDER = 'static/uploads/avatars' @@ -1150,13 +1420,39 @@ def update_investment_preferences(): def get_clickhouse_client(): - return Cclient( - host='222.128.1.157', - port=18000, - user='default', - password='Zzl33818!', - database='stock' - ) + """ + 获取 ClickHouse 客户端(使用连接池) + + 返回连接池对象,支持两种使用方式: + + 方式1(推荐)- 直接调用 execute: + client = get_clickhouse_client() + result = client.execute("SELECT * FROM table", {'param': value}) + + 方式2 - 使用上下文管理器: + client = get_clickhouse_client() + with client.connection() as conn: + result = conn.execute("SELECT * FROM table") + """ + return clickhouse_pool + + +@app.route('/api/system/clickhouse-pool-status', methods=['GET']) +def api_clickhouse_pool_status(): + """获取 ClickHouse 连接池状态(仅供监控使用)""" + try: + status = clickhouse_pool.get_pool_status() + return jsonify({ + 'code': 200, + 'message': 'success', + 'data': status + }) + except Exception as e: + return jsonify({ + 'code': 500, + 'message': str(e), + 'data': None + }), 500 @app.route('/api/stock//kline') From 63e4deb04f3f55d4ccba3b6a866dae83a01f5922 Mon Sep 17 00:00:00 2001 From: zzlgreat Date: Wed, 26 Nov 2025 09:06:02 +0800 Subject: [PATCH 3/3] update pay function --- app_vx.py | 55 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/app_vx.py b/app_vx.py index ddff76e9..190a3045 100644 --- a/app_vx.py +++ b/app_vx.py @@ -130,14 +130,19 @@ class ClickHouseConnectionPool: logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}") def _init_pool(self): - """初始化连接池,预创建核心连接""" - for _ in range(self.pool_size): + """初始化连接池,预创建部分核心连接(非阻塞)""" + # 只预创建 2 个连接,其余按需创建 + init_count = min(2, self.pool_size) + for i in range(init_count): try: conn = self._create_connection() if conn: self._pool.put(conn) + logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功") except Exception as e: - logger.warning(f"预创建 ClickHouse 连接失败: {e}") + logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}") + # 预创建失败不阻塞启动,后续按需创建 + break def _create_connection(self): """创建新的 ClickHouse 连接""" @@ -325,19 +330,30 @@ class ClickHouseConnectionPool: logger.info("ClickHouse 连接池已关闭所有连接") -# 初始化全局 ClickHouse 连接池 -clickhouse_pool = ClickHouseConnectionPool( - host='222.128.1.157', - port=18000, - user='default', - password='Zzl33818!', - database='stock', - pool_size=10, # 核心连接数 - max_overflow=15, # 最大溢出连接数,总共支持 25 并发 - connection_timeout=10, # 连接超时 10 秒 - query_timeout=30, # 查询超时 30 秒 - health_check_interval=60 # 60 秒未使用的连接进行健康检查 -) +# 初始化全局 ClickHouse 连接池(懒加载模式) +clickhouse_pool = None +_pool_lock = Lock() + + +def _init_clickhouse_pool(): + """懒加载初始化 ClickHouse 连接池""" + global clickhouse_pool + if clickhouse_pool is None: + with _pool_lock: + if clickhouse_pool is None: + clickhouse_pool = ClickHouseConnectionPool( + host='222.128.1.157', + port=18000, + user='default', + password='Zzl33818!', + database='stock', + pool_size=5, # 减少预创建连接数 + max_overflow=20, # 增加溢出连接数,总共支持 25 并发 + connection_timeout=10, # 连接超时 10 秒 + query_timeout=30, # 查询超时 30 秒 + health_check_interval=60 # 60 秒未使用的连接进行健康检查 + ) + return clickhouse_pool # ===================== ClickHouse 连接池实现结束 ===================== app = Flask(__name__) Compress(app) @@ -1421,7 +1437,7 @@ def update_investment_preferences(): def get_clickhouse_client(): """ - 获取 ClickHouse 客户端(使用连接池) + 获取 ClickHouse 客户端(使用连接池,懒加载) 返回连接池对象,支持两种使用方式: @@ -1434,14 +1450,15 @@ def get_clickhouse_client(): with client.connection() as conn: result = conn.execute("SELECT * FROM table") """ - return clickhouse_pool + return _init_clickhouse_pool() @app.route('/api/system/clickhouse-pool-status', methods=['GET']) def api_clickhouse_pool_status(): """获取 ClickHouse 连接池状态(仅供监控使用)""" try: - status = clickhouse_pool.get_pool_status() + pool = _init_clickhouse_pool() + status = pool.get_pool_status() return jsonify({ 'code': 200, 'message': 'success',