update app_vx

This commit is contained in:
2025-11-13 10:20:03 +08:00
parent d64349b606
commit 1c49ddf42c
7 changed files with 3578 additions and 8772 deletions

View File

@@ -1,49 +0,0 @@
# Bytedesk 客服系统集成文件
以下文件和目录属于客服系统集成功能,未提交到当前分支:
## 1. Dify 机器人控制逻辑
**位置**: public/index.html
**状态**: 已存入 stash
**Stash ID**: stash@{0}
**说明**: 根据路径控制 Dify 机器人显示(已设置为完全不显示,只使用 Bytedesk 客服)
## 2. Bytedesk 集成代码
**位置**: src/bytedesk-integration/
**状态**: 未跟踪文件(需要手动管理)
**内容**:
- .env.bytedesk.example - Bytedesk 环境变量配置示例
- App.jsx.example - 集成 Bytedesk 的示例代码
- components/ - Bytedesk 相关组件
- config/ - Bytedesk 配置文件
- 前端工程师集成手册.md - 详细集成文档
## 恢复方法
### 恢复 public/index.html 的改动:
```bash
git stash apply stash@{0}
```
### 使用 Bytedesk 集成代码:
```bash
# 查看集成手册
cat src/bytedesk-integration/前端工程师集成手册.md
# 复制示例配置
cp src/bytedesk-integration/.env.bytedesk.example .env.bytedesk
cp src/bytedesk-integration/App.jsx.example src/App.jsx
```
## 注意事项
⚠️ **重要提示:**
- `src/bytedesk-integration/` 目录中的文件是未跟踪的untracked
- 如果需要提交客服功能,需要先添加到 git:
```bash
git add src/bytedesk-integration/
git commit -m "feat: 集成 Bytedesk 客服系统"
```
- 当前分支feature_bugfix/251110_event专注于非客服功能
- 建议在单独的分支中开发客服功能

295
app_vx.py
View File

@@ -16,6 +16,28 @@ import time
from sqlalchemy import create_engine, text, func, or_, case, event, desc, asc
from flask import Flask, has_request_context, render_template, request, jsonify, redirect, url_for, flash, session, render_template_string, current_app, send_from_directory
# Flask 3.x 兼容性补丁flask-sqlalchemy 旧版本需要 _app_ctx_stack
try:
from flask import _app_ctx_stack
except ImportError:
import flask
from werkzeug.local import LocalStack
import threading
# 创建一个兼容的 LocalStack 子类
class CompatLocalStack(LocalStack):
@property
def __ident_func__(self):
# 返回当前线程的标识函数
# 优先使用 greenlet协程否则使用 threading
try:
from greenlet import getcurrent
return getcurrent
except ImportError:
return threading.get_ident
flask._app_ctx_stack = CompatLocalStack()
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from flask_mail import Mail, Message
@@ -1518,9 +1540,6 @@ def like_post(post_id):
post.likes_count += 1
message = '已点赞'
# 可以在这里添加点赞通知
if post.user_id != request.user.id:
notify_user_post_liked(post)
db.session.commit()
return jsonify({
@@ -1597,15 +1616,6 @@ def add_comment(post_id):
db.session.add(comment)
post.comments_count += 1
# 如果是回复评论,可以添加通知
if parent_id:
parent_comment = Comment.query.get(parent_id)
if parent_comment and parent_comment.user_id != request.user.id:
notify_user_comment_replied(parent_comment)
# 如果是评论帖子,通知帖子作者
elif post.user_id != request.user.id:
notify_user_post_commented(post)
db.session.commit()
@@ -3853,17 +3863,171 @@ def api_event_related_stocks(event_id):
print(f"Error fetching minute data for {stock_code}: {e}")
return []
# ==================== 性能优化:批量查询所有股票数据 ====================
# 1. 收集所有股票代码
stock_codes = [stock.stock_code for stock in related_stocks]
# 2. 批量查询股票基本信息
stock_info_map = {}
if stock_codes:
stock_infos = StockBasicInfo.query.filter(StockBasicInfo.SECCODE.in_(stock_codes)).all()
for info in stock_infos:
stock_info_map[info.SECCODE] = info
# 处理不带后缀的股票代码
base_codes = [code.split('.')[0] for code in stock_codes if '.' in code and code not in stock_info_map]
if base_codes:
base_infos = StockBasicInfo.query.filter(StockBasicInfo.SECCODE.in_(base_codes)).all()
for info in base_infos:
# 将不带后缀的信息映射到带后缀的代码
for code in stock_codes:
if code.split('.')[0] == info.SECCODE and code not in stock_info_map:
stock_info_map[code] = info
# 3. 批量查询 ClickHouse 数据(价格、涨跌幅、分时图数据)
price_data_map = {} # 存储价格和涨跌幅数据
minute_chart_map = {} # 存储分时图数据
try:
if stock_codes:
print(f"批量查询 {len(stock_codes)} 只股票的价格数据...")
# 3.1 批量查询价格和涨跌幅数据(使用子查询方式,避免窗口函数与 GROUP BY 冲突)
batch_price_query = """
WITH first_prices AS (
SELECT
code,
close as first_price,
ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp ASC) as rn
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
),
last_prices AS (
SELECT
code,
close as last_price,
open as open_price,
high as high_price,
low as low_price,
volume,
amt as amount,
ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp DESC) as rn
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
)
SELECT
fp.code,
fp.first_price,
lp.last_price,
(lp.last_price - fp.first_price) / fp.first_price * 100 as change_pct,
lp.open_price,
lp.high_price,
lp.low_price,
lp.volume,
lp.amount
FROM first_prices fp
INNER JOIN last_prices lp ON fp.code = lp.code
WHERE fp.rn = 1 AND lp.rn = 1
"""
price_data = client.execute(batch_price_query, {
'codes': stock_codes,
'start': start_datetime,
'end': end_datetime
})
print(f"批量查询返回 {len(price_data)} 条价格数据")
# 解析批量查询结果
for row in price_data:
code = row[0]
first_price = float(row[1]) if row[1] is not None else None
last_price = float(row[2]) if row[2] is not None else None
change_pct = float(row[3]) if row[3] is not None else None
open_price = float(row[4]) if row[4] is not None else None
high_price = float(row[5]) if row[5] is not None else None
low_price = float(row[6]) if row[6] is not None else None
volume = int(row[7]) if row[7] is not None else None
amount = float(row[8]) if row[8] is not None else None
change_amount = None
if last_price is not None and first_price is not None:
change_amount = last_price - first_price
price_data_map[code] = {
'latest_price': last_price,
'first_price': first_price,
'change_pct': change_pct,
'change_amount': change_amount,
'open_price': open_price,
'high_price': high_price,
'low_price': low_price,
'volume': volume,
'amount': amount,
}
# 3.2 批量查询分时图数据
print(f"批量查询分时图数据...")
minute_chart_query = """
SELECT
code,
timestamp,
open,
high,
low,
close,
volume,
amt
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY code, timestamp
"""
minute_data = client.execute(minute_chart_query, {
'codes': stock_codes,
'start': start_datetime,
'end': end_datetime
})
print(f"批量查询返回 {len(minute_data)} 条分时数据")
# 按股票代码分组分时数据
for row in minute_data:
code = row[0]
if code not in minute_chart_map:
minute_chart_map[code] = []
minute_chart_map[code].append({
'time': row[1].strftime('%H:%M'),
'open': float(row[2]) if row[2] else None,
'high': float(row[3]) if row[3] else None,
'low': float(row[4]) if row[4] else None,
'close': float(row[5]) if row[5] else None,
'volume': float(row[6]) if row[6] else None,
'amount': float(row[7]) if row[7] else None
})
except Exception as e:
print(f"批量查询 ClickHouse 失败: {e}")
# 如果批量查询失败price_data_map 和 minute_chart_map 为空,后续会使用降级方案
# 4. 组装每个股票的数据(从批量查询结果中获取)
stocks_data = []
for stock in related_stocks:
print(f"正在处理股票 {stock.stock_code}价格数据...")
print(f"正在组装股票 {stock.stock_code} 的数据...")
# 获取股票基本信息
stock_info = StockBasicInfo.query.filter_by(SECCODE=stock.stock_code).first()
if not stock_info:
base_code = stock.stock_code.split('.')[0]
stock_info = StockBasicInfo.query.filter_by(SECCODE=base_code).first()
# 从批量查询结果中获取股票基本信息
stock_info = stock_info_map.get(stock.stock_code)
# 从批量查询结果中获取价格数据
price_info = price_data_map.get(stock.stock_code)
# 使用与 get_stock_quotes 完全相同的逻辑计算涨跌幅
latest_price = None
first_price = None
change_pct = None
@@ -3875,79 +4039,20 @@ def api_event_related_stocks(event_id):
amount = None
trade_date = trading_day
try:
# 使用与 get_stock_quotes 完全相同的 SQL 查询
# 获取事件时间点的第一个价格 (first_price) 和当前时间的最后一个价格 (last_price)
data = client.execute("""
WITH first_price AS (
SELECT close
FROM stock_minute
WHERE code = %(code)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY timestamp
LIMIT 1
),
last_price AS (
SELECT close
FROM stock_minute
WHERE code = %(code)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY timestamp DESC
LIMIT 1
)
SELECT
last_price.close as last_price,
(last_price.close - first_price.close) / first_price.close * 100 as change,
first_price.close as first_price
FROM last_price
CROSS JOIN first_price
WHERE EXISTS (SELECT 1 FROM first_price)
AND EXISTS (SELECT 1 FROM last_price)
""", {
'code': stock.stock_code,
'start': start_datetime,
'end': end_datetime
})
print(f"股票 {stock.stock_code} 查询结果: {data}")
if data and data[0] and data[0][0] is not None:
latest_price = float(data[0][0])
change_pct = float(data[0][1]) if data[0][1] is not None else None
first_price = float(data[0][2]) if len(data[0]) > 2 and data[0][2] is not None else None
# 计算涨跌额
if latest_price is not None and first_price is not None:
change_amount = latest_price - first_price
# 获取额外的价格信息(开盘价、最高价、最低价等)
extra_data = client.execute("""
SELECT
open, high, low, volume, amt
FROM stock_minute
WHERE code = %(code)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY timestamp DESC
LIMIT 1
""", {
'code': stock.stock_code,
'start': start_datetime,
'end': end_datetime
})
if extra_data and extra_data[0]:
open_price = float(extra_data[0][0]) if extra_data[0][0] else None
high_price = float(extra_data[0][1]) if extra_data[0][1] else None
low_price = float(extra_data[0][2]) if extra_data[0][2] else None
volume = int(extra_data[0][3]) if extra_data[0][3] else None
amount = float(extra_data[0][4]) if extra_data[0][4] else None
except Exception as e:
print(f"Error fetching price data for {stock.stock_code}: {e}")
# 如果 ClickHouse 查询失败,尝试使用 TradeData 作为降级方案
if price_info:
# 使用批量查询的结果
latest_price = price_info['latest_price']
first_price = price_info['first_price']
change_pct = price_info['change_pct']
change_amount = price_info['change_amount']
open_price = price_info['open_price']
high_price = price_info['high_price']
low_price = price_info['low_price']
volume = price_info['volume']
amount = price_info['amount']
else:
# 如果批量查询没有返回数据使用降级方案TradeData
print(f"股票 {stock.stock_code} 批量查询无数据,使用降级方案...")
try:
latest_trade = None
search_codes = [stock.stock_code, stock.stock_code.split('.')[0]]
@@ -3974,10 +4079,10 @@ def api_event_related_stocks(event_id):
if latest_trade.F009N:
change_amount = float(latest_trade.F009N)
except Exception as fallback_error:
print(f"Fallback query also failed for {stock.stock_code}: {fallback_error}")
print(f"降级查询也失败 {stock.stock_code}: {fallback_error}")
# 获取分时图数据
minute_chart_data = get_minute_chart_data(stock.stock_code)
# 从批量查询结果中获取分时图数据
minute_chart_data = minute_chart_map.get(stock.stock_code, [])
stock_data = {
'id': stock.id,

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,97 +0,0 @@
# 这是要替换的涨跌幅计算逻辑第3729-3738行
# 计算基于事件时间的涨跌幅(参考 app.py /api/stock/quotes 的实现)
change_pct = None
change_amount = None
current_price = None
try:
# 获取事件时间和当前时间
evt_time = event.start_time if event.start_time else event.created_at
cur_time = datetime.now()
# 获取交易日和时间范围
evt_date = evt_time.date()
evt_time_only = evt_time.time()
market_open = dt_time(9, 30)
market_close = dt_time(15, 0)
# 检查是否是交易日
is_trading_day_result = db.session.execute(text("""
SELECT 1 FROM trading_days WHERE EXCHANGE_DATE = :date
"""), {"date": evt_date}).fetchone()
trading_day = None
start_time = None
end_time = None
if is_trading_day_result:
# 是交易日
if evt_time_only < market_open:
# 盘前 - 使用当日开盘
trading_day, start_time, end_time = evt_date, market_open, market_close
elif evt_time_only > market_close:
# 盘后 - 使用下一交易日
next_day_result = db.session.execute(text("""
SELECT EXCHANGE_DATE FROM trading_days
WHERE EXCHANGE_DATE > :date ORDER BY EXCHANGE_DATE LIMIT 1
"""), {"date": evt_date}).fetchone()
if next_day_result:
trading_day, start_time, end_time = next_day_result[0].date(), market_open, market_close
else:
# 盘中 - 从事件时间到收盘
trading_day, start_time, end_time = evt_date, evt_time_only, market_close
else:
# 非交易日 - 获取下一交易日
next_day_result = db.session.execute(text("""
SELECT EXCHANGE_DATE FROM trading_days
WHERE EXCHANGE_DATE > :date ORDER BY EXCHANGE_DATE LIMIT 1
"""), {"date": evt_date}).fetchone()
if next_day_result:
trading_day, start_time, end_time = next_day_result[0].date(), market_open, market_close
# 如果有有效的交易日且不在未来,查询涨跌幅
if trading_day and trading_day <= cur_time.date():
start_dt = datetime.combine(trading_day, start_time)
end_dt = datetime.combine(trading_day, end_time)
# 查询第一个bar和最后一个bar的价格
price_data = client.execute("""
WITH first_price AS (
SELECT close FROM stock_minute
WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s
ORDER BY timestamp LIMIT 1
),
last_price AS (
SELECT close FROM stock_minute
WHERE code = %(code)s AND timestamp >= %(start)s AND timestamp <= %(end)s
ORDER BY timestamp DESC LIMIT 1
)
SELECT first_price.close as first_price,
last_price.close as last_price,
(last_price.close - first_price.close) / first_price.close * 100 as change_pct
FROM last_price CROSS JOIN first_price
WHERE EXISTS (SELECT 1 FROM first_price) AND EXISTS (SELECT 1 FROM last_price)
""", {'code': stock.stock_code, 'start': start_dt, 'end': end_dt})
if price_data and price_data[0] and price_data[0][0] is not None:
first_price = float(price_data[0][0])
current_price = float(price_data[0][1])
change_pct = float(price_data[0][2])
change_amount = current_price - first_price
except Exception as e:
print(f"计算事件涨跌幅失败 {stock.stock_code}: {e}")
# 如果ClickHouse没有数据fallback到原来的逻辑
if change_pct is None:
if latest_trade and prev_trade:
if prev_trade.F007N and prev_trade.F007N != 0:
change_amount = float(latest_trade.F007N) - float(prev_trade.F007N)
change_pct = (change_amount / float(prev_trade.F007N)) * 100
elif latest_trade and latest_trade.F010N:
change_pct = float(latest_trade.F010N)
change_amount = float(latest_trade.F009N) if latest_trade.F009N else None
# 如果还没有当前价格使用latest_trade
if current_price is None and latest_trade and latest_trade.F007N:
current_price = float(latest_trade.F007N)

View File

@@ -1,540 +0,0 @@
"""
ClickHouse 查询优化方案 - 针对 /api/event/<int:event_id>/related-stocks-detail
问题分析:
1. N+1 查询问题:每只股票执行 3 次独立查询(共 30+ 次)
2. 重复扫描first_price 和 last_price 需要扫描表两次
3. 缺少批量查询优化
优化方案对比:
┌─────────────┬──────────────┬──────────────┬────────────┐
│ 方案 │ 查询次数 │ 性能提升 │ 实现难度 │
├─────────────┼──────────────┼──────────────┼────────────┤
│ 当前代码 │ N * 3 │ 基准 │ - │
│ 方案1 批量 │ 1 │ 80-90% │ 中等 │
│ 方案2 并行 │ N * 3 (并行)│ 40-60% │ 简单 │
│ 方案3 缓存 │ 减少重复 │ 20-40% │ 简单 │
└─────────────┴──────────────┴──────────────┴────────────┘
"""
# ============================================================================
# 方案 1: 批量查询(推荐)- 将所有股票的查询合并为一次
# ============================================================================
def get_batch_stock_prices_optimized(client, stock_codes, start_datetime, end_datetime):
"""
批量获取多只股票的价格数据(一次查询)
性能对比:
- 旧方案10 只股票 = 20 次查询first + last
- 新方案10 只股票 = 1 次查询
- 性能提升:约 20 倍
Args:
client: ClickHouse 客户端
stock_codes: 股票代码列表 ['600519.SH', '601088.SH', ...]
start_datetime: 开始时间
end_datetime: 结束时间
Returns:
dict: {
'600519.SH': {
'first_price': 1850.0,
'last_price': 1860.0,
'change_pct': 0.54,
'open': 1850.0,
'high': 1865.0,
'low': 1848.0,
'volume': 1234567,
'amount': 2345678900.0
},
...
}
"""
if not stock_codes:
return {}
# 构建批量查询 SQL使用 IN 子句)
query = """
SELECT
code,
-- 第一个价格(事件发生时)
anyIf(close, rownum_asc = 1) as first_price,
-- 最后一个价格(当前时间)
anyIf(close, rownum_desc = 1) as last_price,
-- 涨跌幅
(last_price - first_price) / first_price * 100 as change_pct,
-- 涨跌额
last_price - first_price as change_amount,
-- 其他价格信息(取最后一条记录)
anyIf(open, rownum_desc = 1) as open_price,
anyIf(high, rownum_desc = 1) as high_price,
anyIf(low, rownum_desc = 1) as low_price,
anyIf(volume, rownum_desc = 1) as volume,
anyIf(amt, rownum_desc = 1) as amount
FROM (
SELECT
code,
timestamp,
close,
open,
high,
low,
volume,
amt,
-- 正序排名(用于获取第一个价格)
ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp ASC) as rownum_asc,
-- 倒序排名(用于获取最后一个价格)
ROW_NUMBER() OVER (PARTITION BY code ORDER BY timestamp DESC) as rownum_desc
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
)
GROUP BY code
"""
try:
# 执行批量查询
data = client.execute(query, {
'codes': tuple(stock_codes), # ClickHouse IN 需要 tuple
'start': start_datetime,
'end': end_datetime
})
# 格式化结果为字典
result = {}
for row in data:
code = row[0]
result[code] = {
'first_price': float(row[1]) if row[1] else None,
'last_price': float(row[2]) if row[2] else None,
'change_pct': float(row[3]) if row[3] else None,
'change_amount': float(row[4]) if row[4] else None,
'open_price': float(row[5]) if row[5] else None,
'high_price': float(row[6]) if row[6] else None,
'low_price': float(row[7]) if row[7] else None,
'volume': int(row[8]) if row[8] else None,
'amount': float(row[9]) if row[9] else None,
}
print(f"批量查询完成,获取了 {len(result)} 只股票的数据")
return result
except Exception as e:
print(f"批量查询失败: {e}")
return {}
def get_batch_minute_chart_data(client, stock_codes, start_datetime, end_datetime):
"""
批量获取多只股票的分时图数据
Args:
client: ClickHouse 客户端
stock_codes: 股票代码列表
start_datetime: 开始时间
end_datetime: 结束时间
Returns:
dict: {
'600519.SH': [
{'time': '09:30', 'close': 1850.0, 'volume': 12345, ...},
{'time': '09:31', 'close': 1851.0, 'volume': 12346, ...},
...
],
...
}
"""
if not stock_codes:
return {}
query = """
SELECT
code,
timestamp,
open,
high,
low,
close,
volume,
amt
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY code, timestamp
"""
try:
data = client.execute(query, {
'codes': tuple(stock_codes),
'start': start_datetime,
'end': end_datetime
})
# 按股票代码分组
result = {}
for row in data:
code = row[0]
if code not in result:
result[code] = []
result[code].append({
'time': row[1].strftime('%H:%M'),
'open': float(row[2]) if row[2] else None,
'high': float(row[3]) if row[3] else None,
'low': float(row[4]) if row[4] else None,
'close': float(row[5]) if row[5] else None,
'volume': float(row[6]) if row[6] else None,
'amount': float(row[7]) if row[7] else None
})
print(f"批量获取分时数据完成,获取了 {len(result)} 只股票的数据")
return result
except Exception as e:
print(f"批量获取分时数据失败: {e}")
return {}
# ============================================================================
# 使用示例:替换原来的 for 循环
# ============================================================================
def api_event_related_stocks_optimized(event_id):
"""优化后的端点实现"""
try:
from datetime import datetime
event = Event.query.get_or_404(event_id)
related_stocks = event.related_stocks.order_by(RelatedStock.correlation.desc()).all()
if not related_stocks:
return jsonify({'code': 200, 'data': {'related_stocks': []}})
# 获取 ClickHouse 客户端
client = get_clickhouse_client()
# 计算时间范围(省略交易日计算逻辑,与原代码相同)
event_time = event.start_time if event.start_time else event.created_at
trading_day, start_time, end_time = get_trading_day_and_times(event_time)
start_datetime = datetime.combine(trading_day, start_time)
end_datetime = datetime.combine(trading_day, end_time)
# ✅ 批量查询所有股票的价格数据(只查询 1 次)
stock_codes = [stock.stock_code for stock in related_stocks]
prices_data = get_batch_stock_prices_optimized(
client, stock_codes, start_datetime, end_datetime
)
# ✅ 批量查询所有股票的分时图数据(只查询 1 次)
minute_data = get_batch_minute_chart_data(
client, stock_codes, start_datetime, end_datetime
)
# 组装返回数据
stocks_data = []
for stock in related_stocks:
# 从批量查询结果中获取数据(无需再次查询)
price_info = prices_data.get(stock.stock_code, {})
chart_data = minute_data.get(stock.stock_code, [])
# 获取股票基本信息(这里可以考虑也批量查询)
stock_info = StockBasicInfo.query.filter_by(SECCODE=stock.stock_code).first()
if not stock_info:
base_code = stock.stock_code.split('.')[0]
stock_info = StockBasicInfo.query.filter_by(SECCODE=base_code).first()
stock_data = {
'id': stock.id,
'stock_code': stock.stock_code,
'stock_name': stock.stock_name,
'sector': stock.sector,
'relation_desc': stock.relation_desc,
'correlation': stock.correlation,
'momentum': stock.momentum,
'listing_date': stock_info.F006D.isoformat() if stock_info and stock_info.F006D else None,
'market': stock_info.F005V if stock_info else None,
# 交易数据(从批量查询结果获取)
'trade_data': {
'latest_price': price_info.get('last_price'),
'first_price': price_info.get('first_price'),
'open_price': price_info.get('open_price'),
'high_price': price_info.get('high_price'),
'low_price': price_info.get('low_price'),
'change_amount': round(price_info['change_amount'], 2) if price_info.get('change_amount') else None,
'change_pct': round(price_info['change_pct'], 2) if price_info.get('change_pct') else None,
'volume': price_info.get('volume'),
'amount': price_info.get('amount'),
'trade_date': trading_day.isoformat(),
},
# 分时图数据
'minute_chart': chart_data
}
stocks_data.append(stock_data)
return jsonify({
'code': 200,
'message': 'success',
'data': {
'event_id': event_id,
'event_title': event.title,
'related_stocks': stocks_data,
'total_count': len(stocks_data)
}
})
except Exception as e:
print(f"Error in api_event_related_stocks_optimized: {e}")
return jsonify({'code': 500, 'message': str(e)}), 500
# ============================================================================
# 方案 2: 异步并行查询(适用于无法批量查询的场景)
# ============================================================================
import asyncio
from concurrent.futures import ThreadPoolExecutor
def get_stock_price_async(client, stock_code, start_datetime, end_datetime):
"""单个股票的查询函数(线程安全)"""
# 与原代码相同的查询逻辑
try:
data = client.execute("""
WITH first_price AS (
SELECT close FROM stock_minute WHERE code = %(code)s ...
)
...
""", {'code': stock_code, 'start': start_datetime, 'end': end_datetime})
return stock_code, data
except Exception as e:
return stock_code, None
def get_all_stocks_parallel(client, stock_codes, start_datetime, end_datetime):
"""
并行查询多只股票(使用线程池)
性能对比:
- 串行10 只股票 * 0.1 秒 = 1 秒
- 并行max(0.1 秒) = 0.1 秒10 倍提速)
"""
with ThreadPoolExecutor(max_workers=10) as executor:
# 提交所有查询任务
futures = [
executor.submit(get_stock_price_async, client, code, start_datetime, end_datetime)
for code in stock_codes
]
# 等待所有任务完成
results = {}
for future in futures:
stock_code, data = future.result()
results[stock_code] = data
return results
# ============================================================================
# 方案 3: 添加缓存层Redis
# ============================================================================
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_stock_price_with_cache(client, stock_code, start_datetime, end_datetime):
"""
带缓存的查询(适用于历史数据)
缓存策略:
- 历史数据(非当日):缓存 24 小时
- 当日数据:缓存 1 分钟
"""
from datetime import datetime
# 生成缓存键
cache_key = f"stock_price:{stock_code}:{start_datetime.date()}:{end_datetime.date()}"
# 尝试从缓存获取
cached_data = redis_client.get(cache_key)
if cached_data:
print(f"从缓存获取 {stock_code} 数据")
return json.loads(cached_data)
# 缓存未命中,查询数据库
print(f"从 ClickHouse 查询 {stock_code} 数据")
data = client.execute("""...""", {
'code': stock_code,
'start': start_datetime,
'end': end_datetime
})
# 格式化数据
result = {
'first_price': float(data[0][2]) if data else None,
'last_price': float(data[0][0]) if data else None,
# ...
}
# 写入缓存
is_today = start_datetime.date() == datetime.now().date()
ttl = 60 if is_today else 86400 # 当日数据缓存 1 分钟,历史数据缓存 24 小时
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
# ============================================================================
# 方案 4: ClickHouse 查询优化(索引提示)
# ============================================================================
def get_stock_price_with_hints(client, stock_code, start_datetime, end_datetime):
"""
使用 ClickHouse 特性优化查询
优化点:
1. PREWHERE 子句(提前过滤,减少数据扫描)
2. FINAL 修饰符(如果使用了 ReplacingMergeTree
3. 分区裁剪(如果表按日期分区)
"""
query = """
SELECT
code,
anyLast(close) as last_price,
any(close) as first_price,
(last_price - first_price) / first_price * 100 as change_pct
FROM stock_minute
PREWHERE code = %(code)s -- 使用 PREWHERE 提前过滤(比 WHERE 快)
WHERE timestamp >= %(start)s
AND timestamp <= %(end)s
GROUP BY code
SETTINGS max_threads = 2 -- 限制线程数(避免资源竞争)
"""
data = client.execute(query, {
'code': stock_code,
'start': start_datetime,
'end': end_datetime
})
return data
# ============================================================================
# 数据库层面优化建议
# ============================================================================
"""
1. 确保 stock_minute 表有以下索引:
- PRIMARY KEY (code, timestamp) -- 主键索引
- INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 3 -- 时间索引
2. 表分区策略(如果数据量大):
CREATE TABLE stock_minute (
code String,
timestamp DateTime,
...
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp) -- 按月分区
ORDER BY (code, timestamp)
SETTINGS index_granularity = 8192;
3. 使用物化视图预计算(适用于固定查询模式):
CREATE MATERIALIZED VIEW stock_minute_summary
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (code, timestamp)
AS SELECT
code,
toStartOfMinute(timestamp) as minute,
anyLast(close) as last_close,
any(close) as first_close,
...
FROM stock_minute
GROUP BY code, minute;
4. 检查表统计信息:
SELECT
table,
partition,
rows,
bytes_on_disk
FROM system.parts
WHERE table = 'stock_minute';
"""
# ============================================================================
# 性能对比测试
# ============================================================================
def benchmark_query_methods():
"""
性能对比测试
测试场景:查询 10 只股票的价格数据
预期结果:
- 原方案(串行 N+1~1000ms
- 方案 1批量查询~50ms20 倍提速)
- 方案 2并行查询~200ms5 倍提速)
- 方案 3带缓存~10ms100 倍提速,第二次请求)
"""
import time
stock_codes = ['600519.SH', '601088.SH', '600276.SH', '000001.SZ', ...]
# 测试方案 1批量查询
start = time.time()
result1 = get_batch_stock_prices_optimized(client, stock_codes, start_dt, end_dt)
print(f"批量查询耗时: {(time.time() - start) * 1000:.2f}ms")
# 测试方案 2并行查询
start = time.time()
result2 = get_all_stocks_parallel(client, stock_codes, start_dt, end_dt)
print(f"并行查询耗时: {(time.time() - start) * 1000:.2f}ms")
# 测试原方案(串行)
start = time.time()
result3 = {}
for code in stock_codes:
result3[code] = get_stock_price_original(client, code, start_dt, end_dt)
print(f"串行查询耗时: {(time.time() - start) * 1000:.2f}ms")
# ============================================================================
# 总结与建议
# ============================================================================
"""
推荐实施顺序:
第一步(立即实施):方案 1 - 批量查询
- 实现难度:中等
- 性能提升80-90%
- 风险:低
- 时间1-2 小时
第二步(可选):方案 3 - 添加缓存
- 实现难度:简单
- 性能提升:额外 20-40%
- 风险:低
- 时间30 分钟
第三步(长期):方案 4 - 数据库优化
- 实现难度:中等
- 性能提升20-30%
- 风险:中(需要测试)
- 时间2-4 小时
监控指标:
- 查询时间:目标 < 200ms当前 > 1000ms
- ClickHouse 查询次数:目标 1-2 次(当前 30+ 次)
- 缓存命中率:目标 > 80%(如果使用缓存)
"""

View File

@@ -1,465 +0,0 @@
"""
性能优化补丁 - 修复 /api/event/<int:event_id>/related-stocks-detail 的 N+1 查询问题
使用方法:
1. 将下面的两个函数复制到 app_vx.py 中
2. 替换原来的 api_event_related_stocks 函数
预期效果:
- 查询时间:从 1000-3000ms 降低到 100-300ms
- ClickHouse 查询次数:从 30+ 次降低到 2 次
- 性能提升:约 80-90%
"""
def get_batch_stock_prices(client, stock_codes, start_datetime, end_datetime):
"""
批量获取多只股票的价格数据(只查询一次 ClickHouse
Args:
client: ClickHouse 客户端
stock_codes: 股票代码列表 ['600519.SH', '601088.SH', ...]
start_datetime: 开始时间
end_datetime: 结束时间
Returns:
dict: {
'600519.SH': {
'first_price': 1850.0,
'last_price': 1860.0,
'change_pct': 0.54,
'change_amount': 10.0,
'open': 1850.0,
'high': 1865.0,
'low': 1848.0,
'volume': 1234567,
'amount': 2345678900.0
},
...
}
"""
if not stock_codes:
return {}
try:
# 批量查询 SQL - 使用窗口函数一次性获取所有股票的数据
query = """
SELECT
code,
first_price,
last_price,
(last_price - first_price) / nullIf(first_price, 0) * 100 as change_pct,
last_price - first_price as change_amount,
open_price,
high_price,
low_price,
volume,
amount
FROM (
SELECT
code,
-- 使用 anyIf 获取第一个和最后一个价格
anyIf(close, rn_asc = 1) as first_price,
anyIf(close, rn_desc = 1) as last_price,
anyIf(open, rn_desc = 1) as open_price,
-- 使用 max 获取最高价
max(high) as high_price,
-- 使用 min 获取最低价
min(low) as low_price,
anyIf(volume, rn_desc = 1) as volume,
anyIf(amt, rn_desc = 1) as amount
FROM (
SELECT
code,
timestamp,
close,
open,
high,
low,
volume,
amt,
-- 正序行号(用于获取第一个价格)
row_number() OVER (PARTITION BY code ORDER BY timestamp ASC) as rn_asc,
-- 倒序行号(用于获取最后一个价格)
row_number() OVER (PARTITION BY code ORDER BY timestamp DESC) as rn_desc
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
)
GROUP BY code
)
"""
# 执行查询
data = client.execute(query, {
'codes': tuple(stock_codes), # ClickHouse IN 需要 tuple
'start': start_datetime,
'end': end_datetime
})
# 格式化结果
result = {}
for row in data:
code = row[0]
result[code] = {
'first_price': float(row[1]) if row[1] is not None else None,
'last_price': float(row[2]) if row[2] is not None else None,
'change_pct': float(row[3]) if row[3] is not None else None,
'change_amount': float(row[4]) if row[4] is not None else None,
'open_price': float(row[5]) if row[5] is not None else None,
'high_price': float(row[6]) if row[6] is not None else None,
'low_price': float(row[7]) if row[7] is not None else None,
'volume': int(row[8]) if row[8] is not None else None,
'amount': float(row[9]) if row[9] is not None else None,
}
print(f"✅ 批量查询完成,获取了 {len(result)}/{len(stock_codes)} 只股票的数据")
return result
except Exception as e:
print(f"❌ 批量查询失败: {e}")
import traceback
traceback.print_exc()
return {}
def get_batch_minute_chart_data(client, stock_codes, start_datetime, end_datetime):
"""
批量获取多只股票的分时图数据
Args:
client: ClickHouse 客户端
stock_codes: 股票代码列表
start_datetime: 开始时间
end_datetime: 结束时间
Returns:
dict: {
'600519.SH': [
{'time': '09:30', 'open': 1850.0, 'close': 1851.0, 'volume': 12345, ...},
{'time': '09:31', 'open': 1851.0, 'close': 1852.0, 'volume': 12346, ...},
...
],
...
}
"""
if not stock_codes:
return {}
try:
query = """
SELECT
code,
timestamp,
open,
high,
low,
close,
volume,
amt
FROM stock_minute
WHERE code IN %(codes)s
AND timestamp >= %(start)s
AND timestamp <= %(end)s
ORDER BY code, timestamp
"""
data = client.execute(query, {
'codes': tuple(stock_codes),
'start': start_datetime,
'end': end_datetime
})
# 按股票代码分组
result = {}
for row in data:
code = row[0]
if code not in result:
result[code] = []
result[code].append({
'time': row[1].strftime('%H:%M'),
'open': float(row[2]) if row[2] is not None else None,
'high': float(row[3]) if row[3] is not None else None,
'low': float(row[4]) if row[4] is not None else None,
'close': float(row[5]) if row[5] is not None else None,
'volume': float(row[6]) if row[6] is not None else None,
'amount': float(row[7]) if row[7] is not None else None
})
print(f"✅ 批量获取分时数据完成,获取了 {len(result)}/{len(stock_codes)} 只股票的数据")
return result
except Exception as e:
print(f"❌ 批量获取分时数据失败: {e}")
import traceback
traceback.print_exc()
return {}
# ============================================================================
# 优化后的端点函数(替换原来的 api_event_related_stocks
# ============================================================================
@app.route('/api/event/<int:event_id>/related-stocks-detail', methods=['GET'])
def api_event_related_stocks(event_id):
"""事件相关标的详情接口 - 仅限 Pro/Max 会员(已优化性能)"""
try:
from datetime import datetime, timedelta, time as dt_time
from sqlalchemy import text
import time as time_module
# 记录开始时间
start_time = time_module.time()
event = Event.query.get_or_404(event_id)
related_stocks = event.related_stocks.order_by(RelatedStock.correlation.desc()).all()
if not related_stocks:
return jsonify({
'code': 200,
'message': 'success',
'data': {
'event_id': event_id,
'event_title': event.title,
'related_stocks': [],
'total_count': 0
}
})
# 获取ClickHouse客户端
client = get_clickhouse_client()
# 获取事件时间和交易日(与原代码逻辑相同)
event_time = event.start_time if event.start_time else event.created_at
current_time = datetime.now()
# 定义交易日和时间范围计算函数(与原代码完全一致)
def get_trading_day_and_times(event_datetime):
event_date = event_datetime.date()
event_time_only = event_datetime.time()
market_open = dt_time(9, 30)
market_close = dt_time(15, 0)
with engine.connect() as conn:
is_trading_day = conn.execute(text("""
SELECT 1
FROM trading_days
WHERE EXCHANGE_DATE = :date
"""), {"date": event_date}).fetchone() is not None
if is_trading_day:
if event_time_only < market_open:
return event_date, market_open, market_close
elif event_time_only > market_close:
next_trading_day = conn.execute(text("""
SELECT EXCHANGE_DATE
FROM trading_days
WHERE EXCHANGE_DATE > :date
ORDER BY EXCHANGE_DATE LIMIT 1
"""), {"date": event_date}).fetchone()
return (next_trading_day[0].date() if next_trading_day else None,
market_open, market_close)
else:
return event_date, event_time_only, market_close
else:
next_trading_day = conn.execute(text("""
SELECT EXCHANGE_DATE
FROM trading_days
WHERE EXCHANGE_DATE > :date
ORDER BY EXCHANGE_DATE LIMIT 1
"""), {"date": event_date}).fetchone()
return (next_trading_day[0].date() if next_trading_day else None,
market_open, market_close)
trading_day, start_time_val, end_time_val = get_trading_day_and_times(event_time)
if not trading_day:
return jsonify({
'code': 200,
'message': 'success',
'data': {
'event_id': event_id,
'event_title': event.title,
'event_desc': event.description,
'event_type': event.event_type,
'event_importance': event.importance,
'event_status': event.status,
'event_created_at': event.created_at.strftime("%Y-%m-%d %H:%M:%S"),
'event_start_time': event.start_time.isoformat() if event.start_time else None,
'event_end_time': event.end_time.isoformat() if event.end_time else None,
'keywords': event.keywords,
'view_count': event.view_count,
'post_count': event.post_count,
'follower_count': event.follower_count,
'related_stocks': [],
'total_count': 0
}
})
start_datetime = datetime.combine(trading_day, start_time_val)
end_datetime = datetime.combine(trading_day, end_time_val)
print(f"📊 事件时间: {event_time}, 交易日: {trading_day}, 时间范围: {start_datetime} - {end_datetime}")
# ✅ 批量查询所有股票的价格数据(关键优化点 1
stock_codes = [stock.stock_code for stock in related_stocks]
print(f"📈 开始批量查询 {len(stock_codes)} 只股票的价格数据...")
query_start = time_module.time()
prices_data = get_batch_stock_prices(client, stock_codes, start_datetime, end_datetime)
query_time = (time_module.time() - query_start) * 1000
print(f"⏱️ 价格查询耗时: {query_time:.2f}ms")
# ✅ 批量查询所有股票的分时图数据(关键优化点 2
print(f"📈 开始批量查询 {len(stock_codes)} 只股票的分时数据...")
chart_start = time_module.time()
minute_data = get_batch_minute_chart_data(client, stock_codes, start_datetime, end_datetime)
chart_time = (time_module.time() - chart_start) * 1000
print(f"⏱️ 分时数据查询耗时: {chart_time:.2f}ms")
# 组装返回数据(不再需要循环查询)
stocks_data = []
for stock in related_stocks:
# 从批量查询结果中获取数据O(1) 查找)
price_info = prices_data.get(stock.stock_code, {})
chart_data = minute_data.get(stock.stock_code, [])
# 获取股票基本信息
stock_info = StockBasicInfo.query.filter_by(SECCODE=stock.stock_code).first()
if not stock_info:
base_code = stock.stock_code.split('.')[0]
stock_info = StockBasicInfo.query.filter_by(SECCODE=base_code).first()
# 如果批量查询没有返回数据,尝试使用 TradeData 作为降级方案
if not price_info or price_info.get('last_price') is None:
try:
latest_trade = None
search_codes = [stock.stock_code, stock.stock_code.split('.')[0]]
for code in search_codes:
latest_trade = TradeData.query.filter_by(SECCODE=code) \
.order_by(TradeData.TRADEDATE.desc()).first()
if latest_trade:
break
if latest_trade:
price_info = {
'last_price': float(latest_trade.F007N) if latest_trade.F007N else None,
'first_price': float(latest_trade.F002N) if latest_trade.F002N else None,
'open_price': float(latest_trade.F003N) if latest_trade.F003N else None,
'high_price': float(latest_trade.F005N) if latest_trade.F005N else None,
'low_price': float(latest_trade.F006N) if latest_trade.F006N else None,
'volume': float(latest_trade.F004N) if latest_trade.F004N else None,
'amount': float(latest_trade.F011N) if latest_trade.F011N else None,
'change_pct': float(latest_trade.F010N) if latest_trade.F010N else None,
'change_amount': float(latest_trade.F009N) if latest_trade.F009N else None,
}
except Exception as fallback_error:
print(f"⚠️ 降级查询失败 {stock.stock_code}: {fallback_error}")
stock_data = {
'id': stock.id,
'stock_code': stock.stock_code,
'stock_name': stock.stock_name,
'sector': stock.sector,
'relation_desc': stock.relation_desc,
'correlation': stock.correlation,
'momentum': stock.momentum,
'listing_date': stock_info.F006D.isoformat() if stock_info and stock_info.F006D else None,
'market': stock_info.F005V if stock_info else None,
# 交易数据(从批量查询结果获取)
'trade_data': {
'latest_price': price_info.get('last_price'),
'first_price': price_info.get('first_price'),
'open_price': price_info.get('open_price'),
'high_price': price_info.get('high_price'),
'low_price': price_info.get('low_price'),
'change_amount': round(price_info['change_amount'], 2) if price_info.get('change_amount') is not None else None,
'change_pct': round(price_info['change_pct'], 2) if price_info.get('change_pct') is not None else None,
'volume': price_info.get('volume'),
'amount': price_info.get('amount'),
'trade_date': trading_day.isoformat() if trading_day else None,
},
# 分时图数据
'minute_chart': chart_data
}
stocks_data.append(stock_data)
# 计算总耗时
total_time = (time_module.time() - start_time) * 1000
print(f"✅ 请求完成,总耗时: {total_time:.2f}ms (价格: {query_time:.2f}ms, 分时: {chart_time:.2f}ms)")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'event_id': event_id,
'event_title': event.title,
'event_desc': event.description,
'event_type': event.event_type,
'event_importance': event.importance,
'event_status': event.status,
'event_created_at': event.created_at.strftime("%Y-%m-%d %H:%M:%S"),
'event_start_time': event.start_time.isoformat() if event.start_time else None,
'event_end_time': event.end_time.isoformat() if event.end_time else None,
'keywords': event.keywords,
'view_count': event.view_count,
'post_count': event.post_count,
'follower_count': event.follower_count,
'related_stocks': stocks_data,
'total_count': len(stocks_data),
# 性能指标(可选,调试用)
'performance': {
'total_time_ms': round(total_time, 2),
'price_query_ms': round(query_time, 2),
'chart_query_ms': round(chart_time, 2)
}
}
})
except Exception as e:
print(f"❌ Error in api_event_related_stocks: {e}")
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'message': str(e)}), 500
# ============================================================================
# 使用说明
# ============================================================================
"""
1. 将上面的 3 个函数复制到 app_vx.py 中:
- get_batch_stock_prices()
- get_batch_minute_chart_data()
- api_event_related_stocks()(替换原函数)
2. 重启 Flask 应用:
python app_vx.py
3. 测试端点:
curl http://localhost:5001/api/event/18058/related-stocks-detail
4. 观察日志输出:
✅ 批量查询完成,获取了 10/10 只股票的数据
⏱️ 价格查询耗时: 45.23ms
⏱️ 分时数据查询耗时: 78.56ms
✅ 请求完成,总耗时: 234.67ms
5. 性能对比10 只股票):
- 优化前1000-3000ms30+ 次查询)
- 优化后100-300ms2 次查询)
- 提升80-90%
6. 如果还是慢,检查:
- ClickHouse 表是否有索引SHOW CREATE TABLE stock_minute;
- 数据量是否过大SELECT count() FROM stock_minute WHERE code = '600519.SH';
- 网络延迟ping ClickHouse 服务器
"""