update pay function

This commit is contained in:
2025-11-26 09:00:38 +08:00
parent 65deea43e2
commit bef3e86f60

310
app_vx.py
View File

@@ -55,6 +55,9 @@ from datetime import datetime, timedelta, time as dt_time
from werkzeug.security import generate_password_hash, check_password_hash
import json
from clickhouse_driver import Client as Cclient
from queue import Queue, Empty, Full
from threading import Lock, RLock
from contextlib import contextmanager
import jwt
from docx import Document
from tencentcloud.common import credential
@@ -69,6 +72,273 @@ engine_med = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060
engine_2 = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060/valuefrontier", echo=False)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ===================== ClickHouse 连接池实现 =====================
class ClickHouseConnectionPool:
"""
ClickHouse 连接池
- 支持连接复用,避免频繁创建/销毁连接
- 支持连接超时和健康检查
- 支持自动重连
- 线程安全
"""
def __init__(self, host, port, user, password, database,
pool_size=10, max_overflow=10,
connection_timeout=10, query_timeout=30,
health_check_interval=60):
"""
初始化连接池
Args:
host: ClickHouse 主机地址
port: ClickHouse 端口
user: 用户名
password: 密码
database: 数据库名
pool_size: 连接池核心大小(预创建连接数)
max_overflow: 最大溢出连接数(总连接数 = pool_size + max_overflow
connection_timeout: 获取连接超时时间(秒)
query_timeout: 查询超时时间(秒)
health_check_interval: 健康检查间隔(秒)
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.pool_size = pool_size
self.max_overflow = max_overflow
self.connection_timeout = connection_timeout
self.query_timeout = query_timeout
self.health_check_interval = health_check_interval
# 连接池队列
self._pool = Queue(maxsize=pool_size + max_overflow)
# 当前活跃连接数
self._active_connections = 0
# 锁
self._lock = RLock()
# 连接最后使用时间记录
self._last_used = {}
# 连接创建时间记录
self._created_at = {}
# 初始化核心连接
self._init_pool()
logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}")
def _init_pool(self):
"""初始化连接池,预创建核心连接"""
for _ in range(self.pool_size):
try:
conn = self._create_connection()
if conn:
self._pool.put(conn)
except Exception as e:
logger.warning(f"预创建 ClickHouse 连接失败: {e}")
def _create_connection(self):
"""创建新的 ClickHouse 连接"""
try:
client = Cclient(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
connect_timeout=self.connection_timeout,
send_receive_timeout=self.query_timeout,
sync_request_timeout=self.query_timeout,
settings={
'max_execution_time': self.query_timeout,
'connect_timeout': self.connection_timeout,
}
)
conn_id = id(client)
self._created_at[conn_id] = time.time()
self._last_used[conn_id] = time.time()
with self._lock:
self._active_connections += 1
logger.debug(f"创建新的 ClickHouse 连接: {conn_id}")
return client
except Exception as e:
logger.error(f"创建 ClickHouse 连接失败: {e}")
raise
def _check_connection_health(self, conn):
"""检查连接健康状态"""
try:
conn_id = id(conn)
last_used = self._last_used.get(conn_id, 0)
# 如果连接长时间未使用,进行健康检查
if time.time() - last_used > self.health_check_interval:
# 执行简单查询检查连接
conn.execute("SELECT 1")
logger.debug(f"连接 {conn_id} 健康检查通过")
return True
except Exception as e:
logger.warning(f"连接健康检查失败: {e}")
return False
def _close_connection(self, conn):
"""关闭连接"""
try:
conn_id = id(conn)
conn.disconnect()
self._last_used.pop(conn_id, None)
self._created_at.pop(conn_id, None)
with self._lock:
self._active_connections = max(0, self._active_connections - 1)
logger.debug(f"关闭 ClickHouse 连接: {conn_id}")
except Exception as e:
logger.warning(f"关闭连接时出错: {e}")
def get_connection(self, timeout=None):
"""
从连接池获取连接
Args:
timeout: 获取连接的超时时间,默认使用 connection_timeout
Returns:
ClickHouse 客户端连接
Raises:
TimeoutError: 获取连接超时
Exception: 创建连接失败
"""
timeout = timeout or self.connection_timeout
# 首先尝试从池中获取连接
try:
conn = self._pool.get(block=True, timeout=timeout)
# 检查连接健康状态
if self._check_connection_health(conn):
self._last_used[id(conn)] = time.time()
return conn
else:
# 连接不健康,关闭并创建新连接
self._close_connection(conn)
return self._create_connection()
except Empty:
# 池中没有可用连接,检查是否可以创建新连接
with self._lock:
if self._active_connections < self.pool_size + self.max_overflow:
try:
return self._create_connection()
except Exception as e:
logger.error(f"创建溢出连接失败: {e}")
raise
# 已达到最大连接数,等待连接释放
logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})")
raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)")
def release_connection(self, conn):
"""
释放连接回连接池
Args:
conn: 要释放的连接
"""
if conn is None:
return
conn_id = id(conn)
self._last_used[conn_id] = time.time()
try:
self._pool.put(conn, block=False)
logger.debug(f"连接 {conn_id} 已释放回连接池")
except Full:
# 池已满,关闭多余连接
logger.debug(f"连接池已满,关闭多余连接: {conn_id}")
self._close_connection(conn)
@contextmanager
def connection(self, timeout=None):
"""
上下文管理器方式获取连接
Usage:
with pool.connection() as conn:
result = conn.execute("SELECT * FROM table")
"""
conn = None
try:
conn = self.get_connection(timeout)
yield conn
except Exception as e:
# 发生异常时,检查连接是否需要重建
if conn:
try:
# 尝试简单查询检测连接状态
conn.execute("SELECT 1")
except:
# 连接已损坏,关闭它
self._close_connection(conn)
conn = None
raise
finally:
if conn:
self.release_connection(conn)
def execute(self, query, params=None, timeout=None):
"""
执行查询(自动管理连接)
Args:
query: SQL 查询语句
params: 查询参数
timeout: 查询超时时间
Returns:
查询结果
"""
with self.connection(timeout) as conn:
return conn.execute(query, params)
def get_pool_status(self):
"""获取连接池状态"""
return {
'pool_size': self.pool_size,
'max_overflow': self.max_overflow,
'active_connections': self._active_connections,
'available_connections': self._pool.qsize(),
'max_connections': self.pool_size + self.max_overflow
}
def close_all(self):
"""关闭所有连接"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
self._close_connection(conn)
except Empty:
break
logger.info("ClickHouse 连接池已关闭所有连接")
# 初始化全局 ClickHouse 连接池
clickhouse_pool = ClickHouseConnectionPool(
host='222.128.1.157',
port=18000,
user='default',
password='Zzl33818!',
database='stock',
pool_size=10, # 核心连接数
max_overflow=15, # 最大溢出连接数,总共支持 25 并发
connection_timeout=10, # 连接超时 10 秒
query_timeout=30, # 查询超时 30 秒
health_check_interval=60 # 60 秒未使用的连接进行健康检查
)
# ===================== ClickHouse 连接池实现结束 =====================
app = Flask(__name__)
Compress(app)
UPLOAD_FOLDER = 'static/uploads/avatars'
@@ -1150,13 +1420,39 @@ def update_investment_preferences():
def get_clickhouse_client():
return Cclient(
host='222.128.1.157',
port=18000,
user='default',
password='Zzl33818!',
database='stock'
)
"""
获取 ClickHouse 客户端(使用连接池)
返回连接池对象,支持两种使用方式:
方式1推荐- 直接调用 execute
client = get_clickhouse_client()
result = client.execute("SELECT * FROM table", {'param': value})
方式2 - 使用上下文管理器:
client = get_clickhouse_client()
with client.connection() as conn:
result = conn.execute("SELECT * FROM table")
"""
return clickhouse_pool
@app.route('/api/system/clickhouse-pool-status', methods=['GET'])
def api_clickhouse_pool_status():
"""获取 ClickHouse 连接池状态(仅供监控使用)"""
try:
status = clickhouse_pool.get_pool_status()
return jsonify({
'code': 200,
'message': 'success',
'data': status
})
except Exception as e:
return jsonify({
'code': 500,
'message': str(e),
'data': None
}), 500
@app.route('/api/stock/<stock_code>/kline')