update pay function

This commit is contained in:
2025-11-30 09:15:24 +08:00
parent fc738dc639
commit 14ab2f62f3
5 changed files with 419 additions and 6 deletions

View File

@@ -781,3 +781,246 @@ async def remove_favorite_event(user_id: str, event_id: int) -> Dict[str, Any]:
return {"success": True, "message": "删除自选事件成功"}
else:
return {"success": False, "message": "未找到该自选事件"}
# ==================== ClickHouse 分钟频数据查询 ====================
from clickhouse_driver import Client as ClickHouseClient
# ClickHouse 连接配置
CLICKHOUSE_CONFIG = {
'host': '222.128.1.157',
'port': 18000,
'user': 'default',
'password': 'Zzl33818!',
'database': 'stock'
}
# ClickHouse 客户端(懒加载)
_clickhouse_client = None
def get_clickhouse_client():
"""获取 ClickHouse 客户端(单例模式)"""
global _clickhouse_client
if _clickhouse_client is None:
_clickhouse_client = ClickHouseClient(
host=CLICKHOUSE_CONFIG['host'],
port=CLICKHOUSE_CONFIG['port'],
user=CLICKHOUSE_CONFIG['user'],
password=CLICKHOUSE_CONFIG['password'],
database=CLICKHOUSE_CONFIG['database']
)
logger.info("ClickHouse client created")
return _clickhouse_client
async def get_stock_minute_data(
code: str,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
limit: int = 240
) -> List[Dict[str, Any]]:
"""
获取股票分钟频数据
Args:
code: 股票代码(例如:'600519''600519.SH'
start_time: 开始时间格式YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD
end_time: 结束时间格式YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD
limit: 返回条数默认240一个交易日的分钟数据
Returns:
分钟频数据列表
"""
try:
client = get_clickhouse_client()
# 标准化股票代码(去除后缀)
stock_code = code.split('.')[0] if '.' in code else code
# 构建查询
query = """
SELECT
code,
timestamp,
open,
high,
low,
close,
volume,
amt
FROM stock_minute
WHERE code = %(code)s
"""
params = {'code': stock_code}
if start_time:
query += " AND timestamp >= %(start_time)s"
params['start_time'] = start_time
if end_time:
query += " AND timestamp <= %(end_time)s"
params['end_time'] = end_time
query += " ORDER BY timestamp DESC LIMIT %(limit)s"
params['limit'] = limit
# 执行查询
result = client.execute(query, params, with_column_types=True)
rows = result[0]
columns = [col[0] for col in result[1]]
# 转换为字典列表
data = []
for row in rows:
record = {}
for i, col in enumerate(columns):
value = row[i]
# 处理 datetime 类型
if hasattr(value, 'isoformat'):
record[col] = value.isoformat()
else:
record[col] = value
data.append(record)
logger.info(f"[ClickHouse] 查询分钟数据: code={stock_code}, 返回 {len(data)} 条记录")
return data
except Exception as e:
logger.error(f"[ClickHouse] 查询分钟数据失败: {e}", exc_info=True)
return []
async def get_stock_minute_aggregation(
code: str,
date: str,
interval: int = 5
) -> List[Dict[str, Any]]:
"""
获取股票分钟频数据的聚合(按指定分钟间隔)
Args:
code: 股票代码
date: 日期格式YYYY-MM-DD
interval: 聚合间隔分钟默认5分钟
Returns:
聚合后的K线数据
"""
try:
client = get_clickhouse_client()
# 标准化股票代码
stock_code = code.split('.')[0] if '.' in code else code
# 使用 ClickHouse 的时间函数进行聚合
query = f"""
SELECT
code,
toStartOfInterval(timestamp, INTERVAL {interval} MINUTE) as interval_start,
argMin(open, timestamp) as open,
max(high) as high,
min(low) as low,
argMax(close, timestamp) as close,
sum(volume) as volume,
sum(amt) as amt
FROM stock_minute
WHERE code = %(code)s
AND toDate(timestamp) = %(date)s
GROUP BY code, interval_start
ORDER BY interval_start
"""
params = {'code': stock_code, 'date': date}
result = client.execute(query, params, with_column_types=True)
rows = result[0]
columns = [col[0] for col in result[1]]
data = []
for row in rows:
record = {}
for i, col in enumerate(columns):
value = row[i]
if hasattr(value, 'isoformat'):
record[col] = value.isoformat()
else:
record[col] = value
data.append(record)
logger.info(f"[ClickHouse] 聚合分钟数据: code={stock_code}, date={date}, interval={interval}min, 返回 {len(data)} 条记录")
return data
except Exception as e:
logger.error(f"[ClickHouse] 聚合分钟数据失败: {e}", exc_info=True)
return []
async def get_stock_intraday_statistics(
code: str,
date: str
) -> Dict[str, Any]:
"""
获取股票日内统计数据
Args:
code: 股票代码
date: 日期格式YYYY-MM-DD
Returns:
日内统计数据(开盘价、最高价、最低价、收盘价、成交量、成交额、波动率等)
"""
try:
client = get_clickhouse_client()
stock_code = code.split('.')[0] if '.' in code else code
query = """
SELECT
code,
toDate(timestamp) as trade_date,
argMin(open, timestamp) as open,
max(high) as high,
min(low) as low,
argMax(close, timestamp) as close,
sum(volume) as total_volume,
sum(amt) as total_amount,
count(*) as data_points,
min(timestamp) as first_time,
max(timestamp) as last_time,
(max(high) - min(low)) / min(low) * 100 as intraday_range_pct,
stddevPop(close) as price_volatility
FROM stock_minute
WHERE code = %(code)s
AND toDate(timestamp) = %(date)s
GROUP BY code, trade_date
"""
params = {'code': stock_code, 'date': date}
result = client.execute(query, params, with_column_types=True)
if not result[0]:
return {"success": False, "error": f"未找到 {stock_code}{date} 的分钟数据"}
row = result[0][0]
columns = [col[0] for col in result[1]]
data = {}
for i, col in enumerate(columns):
value = row[i]
if hasattr(value, 'isoformat'):
data[col] = value.isoformat()
else:
data[col] = float(value) if isinstance(value, (int, float)) else value
logger.info(f"[ClickHouse] 日内统计: code={stock_code}, date={date}")
return {"success": True, "data": data}
except Exception as e:
logger.error(f"[ClickHouse] 日内统计失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}