Merge branch 'feature_2025/251117_pref' into feature_2025/251121_h5UI

* feature_2025/251117_pref:
  update pay function
  update pay function
  update pay function
This commit is contained in:
zdl
2025-11-26 09:59:01 +08:00
2 changed files with 857 additions and 559 deletions

332
app_vx.py
View File

@@ -55,6 +55,9 @@ from datetime import datetime, timedelta, time as dt_time
from werkzeug.security import generate_password_hash, check_password_hash
import json
from clickhouse_driver import Client as Cclient
from queue import Queue, Empty, Full
from threading import Lock, RLock
from contextlib import contextmanager
import jwt
from docx import Document
from tencentcloud.common import credential
@@ -69,6 +72,289 @@ engine_med = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060
engine_2 = create_engine("mysql+pymysql://root:Zzl5588161!@222.128.1.157:33060/valuefrontier", echo=False)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ===================== ClickHouse 连接池实现 =====================
class ClickHouseConnectionPool:
"""
ClickHouse 连接池
- 支持连接复用,避免频繁创建/销毁连接
- 支持连接超时和健康检查
- 支持自动重连
- 线程安全
"""
def __init__(self, host, port, user, password, database,
pool_size=10, max_overflow=10,
connection_timeout=10, query_timeout=30,
health_check_interval=60):
"""
初始化连接池
Args:
host: ClickHouse 主机地址
port: ClickHouse 端口
user: 用户名
password: 密码
database: 数据库名
pool_size: 连接池核心大小(预创建连接数)
max_overflow: 最大溢出连接数(总连接数 = pool_size + max_overflow
connection_timeout: 获取连接超时时间(秒)
query_timeout: 查询超时时间(秒)
health_check_interval: 健康检查间隔(秒)
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.pool_size = pool_size
self.max_overflow = max_overflow
self.connection_timeout = connection_timeout
self.query_timeout = query_timeout
self.health_check_interval = health_check_interval
# 连接池队列
self._pool = Queue(maxsize=pool_size + max_overflow)
# 当前活跃连接数
self._active_connections = 0
# 锁
self._lock = RLock()
# 连接最后使用时间记录
self._last_used = {}
# 连接创建时间记录
self._created_at = {}
# 初始化核心连接
self._init_pool()
logger.info(f"ClickHouse 连接池初始化完成: pool_size={pool_size}, max_overflow={max_overflow}")
def _init_pool(self):
"""初始化连接池,预创建部分核心连接(非阻塞)"""
# 只预创建 2 个连接,其余按需创建
init_count = min(2, self.pool_size)
for i in range(init_count):
try:
conn = self._create_connection()
if conn:
self._pool.put(conn)
logger.info(f"预创建 ClickHouse 连接 {i+1}/{init_count} 成功")
except Exception as e:
logger.warning(f"预创建 ClickHouse 连接失败 ({i+1}/{init_count}): {e}")
# 预创建失败不阻塞启动,后续按需创建
break
def _create_connection(self):
"""创建新的 ClickHouse 连接"""
try:
client = Cclient(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
connect_timeout=self.connection_timeout,
send_receive_timeout=self.query_timeout,
sync_request_timeout=self.query_timeout,
settings={
'max_execution_time': self.query_timeout,
'connect_timeout': self.connection_timeout,
}
)
conn_id = id(client)
self._created_at[conn_id] = time.time()
self._last_used[conn_id] = time.time()
with self._lock:
self._active_connections += 1
logger.debug(f"创建新的 ClickHouse 连接: {conn_id}")
return client
except Exception as e:
logger.error(f"创建 ClickHouse 连接失败: {e}")
raise
def _check_connection_health(self, conn):
"""检查连接健康状态"""
try:
conn_id = id(conn)
last_used = self._last_used.get(conn_id, 0)
# 如果连接长时间未使用,进行健康检查
if time.time() - last_used > self.health_check_interval:
# 执行简单查询检查连接
conn.execute("SELECT 1")
logger.debug(f"连接 {conn_id} 健康检查通过")
return True
except Exception as e:
logger.warning(f"连接健康检查失败: {e}")
return False
def _close_connection(self, conn):
"""关闭连接"""
try:
conn_id = id(conn)
conn.disconnect()
self._last_used.pop(conn_id, None)
self._created_at.pop(conn_id, None)
with self._lock:
self._active_connections = max(0, self._active_connections - 1)
logger.debug(f"关闭 ClickHouse 连接: {conn_id}")
except Exception as e:
logger.warning(f"关闭连接时出错: {e}")
def get_connection(self, timeout=None):
"""
从连接池获取连接
Args:
timeout: 获取连接的超时时间,默认使用 connection_timeout
Returns:
ClickHouse 客户端连接
Raises:
TimeoutError: 获取连接超时
Exception: 创建连接失败
"""
timeout = timeout or self.connection_timeout
# 首先尝试从池中获取连接
try:
conn = self._pool.get(block=True, timeout=timeout)
# 检查连接健康状态
if self._check_connection_health(conn):
self._last_used[id(conn)] = time.time()
return conn
else:
# 连接不健康,关闭并创建新连接
self._close_connection(conn)
return self._create_connection()
except Empty:
# 池中没有可用连接,检查是否可以创建新连接
with self._lock:
if self._active_connections < self.pool_size + self.max_overflow:
try:
return self._create_connection()
except Exception as e:
logger.error(f"创建溢出连接失败: {e}")
raise
# 已达到最大连接数,等待连接释放
logger.warning(f"连接池已满,等待连接释放... (当前连接数: {self._active_connections})")
raise TimeoutError(f"获取 ClickHouse 连接超时 (timeout={timeout}s)")
def release_connection(self, conn):
"""
释放连接回连接池
Args:
conn: 要释放的连接
"""
if conn is None:
return
conn_id = id(conn)
self._last_used[conn_id] = time.time()
try:
self._pool.put(conn, block=False)
logger.debug(f"连接 {conn_id} 已释放回连接池")
except Full:
# 池已满,关闭多余连接
logger.debug(f"连接池已满,关闭多余连接: {conn_id}")
self._close_connection(conn)
@contextmanager
def connection(self, timeout=None):
"""
上下文管理器方式获取连接
Usage:
with pool.connection() as conn:
result = conn.execute("SELECT * FROM table")
"""
conn = None
try:
conn = self.get_connection(timeout)
yield conn
except Exception as e:
# 发生异常时,检查连接是否需要重建
if conn:
try:
# 尝试简单查询检测连接状态
conn.execute("SELECT 1")
except:
# 连接已损坏,关闭它
self._close_connection(conn)
conn = None
raise
finally:
if conn:
self.release_connection(conn)
def execute(self, query, params=None, timeout=None):
"""
执行查询(自动管理连接)
Args:
query: SQL 查询语句
params: 查询参数
timeout: 查询超时时间
Returns:
查询结果
"""
with self.connection(timeout) as conn:
return conn.execute(query, params)
def get_pool_status(self):
"""获取连接池状态"""
return {
'pool_size': self.pool_size,
'max_overflow': self.max_overflow,
'active_connections': self._active_connections,
'available_connections': self._pool.qsize(),
'max_connections': self.pool_size + self.max_overflow
}
def close_all(self):
"""关闭所有连接"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
self._close_connection(conn)
except Empty:
break
logger.info("ClickHouse 连接池已关闭所有连接")
# 初始化全局 ClickHouse 连接池(懒加载模式)
clickhouse_pool = None
_pool_lock = Lock()
def _init_clickhouse_pool():
"""懒加载初始化 ClickHouse 连接池"""
global clickhouse_pool
if clickhouse_pool is None:
with _pool_lock:
if clickhouse_pool is None:
clickhouse_pool = ClickHouseConnectionPool(
host='222.128.1.157',
port=18000,
user='default',
password='Zzl33818!',
database='stock',
pool_size=5, # 减少预创建连接数
max_overflow=20, # 增加溢出连接数,总共支持 25 并发
connection_timeout=10, # 连接超时 10 秒
query_timeout=30, # 查询超时 30 秒
health_check_interval=60 # 60 秒未使用的连接进行健康检查
)
return clickhouse_pool
# ===================== ClickHouse 连接池实现结束 =====================
app = Flask(__name__)
Compress(app)
UPLOAD_FOLDER = 'static/uploads/avatars'
@@ -1150,13 +1436,40 @@ def update_investment_preferences():
def get_clickhouse_client():
return Cclient(
host='222.128.1.157',
port=18000,
user='default',
password='Zzl33818!',
database='stock'
)
"""
获取 ClickHouse 客户端(使用连接池,懒加载)
返回连接池对象,支持两种使用方式:
方式1推荐- 直接调用 execute
client = get_clickhouse_client()
result = client.execute("SELECT * FROM table", {'param': value})
方式2 - 使用上下文管理器:
client = get_clickhouse_client()
with client.connection() as conn:
result = conn.execute("SELECT * FROM table")
"""
return _init_clickhouse_pool()
@app.route('/api/system/clickhouse-pool-status', methods=['GET'])
def api_clickhouse_pool_status():
"""获取 ClickHouse 连接池状态(仅供监控使用)"""
try:
pool = _init_clickhouse_pool()
status = pool.get_pool_status()
return jsonify({
'code': 200,
'message': 'success',
'data': status
})
except Exception as e:
return jsonify({
'code': 500,
'message': str(e),
'data': None
}), 500
@app.route('/api/stock/<stock_code>/kline')
@@ -1292,6 +1605,7 @@ def get_daily_kline(stock_code, event_datetime, stock_name):
'name': stock_name,
'data': kline_data,
'trade_date': event_datetime.date().strftime('%Y-%m-%d'),
'event_time': event_datetime.isoformat(),
'type': 'daily',
'is_history': True,
'data_count': len(kline_data)
@@ -4120,7 +4434,9 @@ def api_stock_detail(event_id, stock_code):
'event_info': {
'event_id': event.id,
'event_title': event.title,
'event_description': event.description
'event_description': event.description,
'event_start_time': event.start_time.isoformat() if event.start_time else None,
'event_created_at': event.created_at.strftime("%Y-%m-%d %H:%M:%S") if event.created_at else None
},
'basic_info': {
'stock_code': basic_info.SECCODE,

File diff suppressed because it is too large Load Diff