update pay ui

This commit is contained in:
2025-12-12 08:44:45 +08:00
parent 969b7d3b82
commit 34bc635072
5 changed files with 3742 additions and 826 deletions

View File

@@ -309,7 +309,11 @@ def generate_embedding(text: str) -> List[float]:
def calculate_semantic_weight(query: str) -> float:
"""根据查询长度动态计算语义权重"""
query_length = len(query)
# 空查询不需要语义搜索
if not query or not query.strip():
return 0.0
query_length = len(query.strip())
if query_length < 10:
return 0.3
@@ -321,6 +325,90 @@ def calculate_semantic_weight(query: str) -> float:
return 0.7
async def get_top_concepts_by_change(
trade_date: Optional[date],
limit: int = 100,
offset: int = 0,
filter_lv1: Optional[str] = None,
filter_lv2: Optional[str] = None
) -> tuple[List[Dict], int, date]:
"""
直接从 MySQL 获取涨跌幅排序的概念列表(用于空查询优化)
返回: (概念列表, 总数, 实际查询日期)
"""
if not mysql_pool:
return [], 0, None
try:
async with mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# 获取交易日期
query_date = trade_date
if query_date is None:
await cursor.execute("SELECT MAX(trade_date) as max_date FROM concept_daily_stats WHERE concept_type = 'leaf'")
result = await cursor.fetchone()
if not result or not result['max_date']:
return [], 0, None
query_date = result['max_date']
# 构建基础条件
where_conditions = ["trade_date = %s", "concept_type = 'leaf'"]
params = [query_date]
# 添加层级过滤(通过 concept_name 关联)
# 注意:这里需要根据 concept_to_hierarchy 过滤
if filter_lv1 or filter_lv2:
# 获取符合层级条件的概念名称
filtered_concepts = []
for concept_name, hierarchy in concept_to_hierarchy.items():
if filter_lv1 and hierarchy.get('lv1') != filter_lv1:
continue
if filter_lv2 and hierarchy.get('lv2') != filter_lv2:
continue
filtered_concepts.append(concept_name)
if not filtered_concepts:
return [], 0, query_date
placeholders = ','.join(['%s'] * len(filtered_concepts))
where_conditions.append(f"concept_name IN ({placeholders})")
params.extend(filtered_concepts)
where_clause = " AND ".join(where_conditions)
# 查询总数
count_query = f"SELECT COUNT(*) as cnt FROM concept_daily_stats WHERE {where_clause}"
await cursor.execute(count_query, params)
total = (await cursor.fetchone())['cnt']
# 查询数据(按涨跌幅降序)
data_query = f"""
SELECT concept_id, concept_name, avg_change_pct, stock_count
FROM concept_daily_stats
WHERE {where_clause}
ORDER BY avg_change_pct DESC
LIMIT %s OFFSET %s
"""
await cursor.execute(data_query, params + [limit, offset])
rows = await cursor.fetchall()
concepts = []
for row in rows:
concepts.append({
'concept_id': row['concept_id'],
'concept_name': row['concept_name'],
'avg_change_pct': float(row['avg_change_pct']) if row['avg_change_pct'] else None,
'stock_count': row['stock_count'],
'trade_date': query_date
})
return concepts, total, query_date
except Exception as e:
logger.error(f"获取涨跌幅排序概念失败: {e}")
return [], 0, None
async def get_concept_price_data(concept_ids: List[str], trade_date: Optional[date] = None) -> Dict[str, ConceptPriceInfo]:
"""获取概念的涨跌幅数据"""
if not mysql_pool or not concept_ids:
@@ -451,10 +539,117 @@ async def root():
async def search_concepts(request: SearchRequest):
"""
搜索概念 - 支持语义搜索和层级过滤
优化:空查询 + 涨跌幅排序时,直接从 MySQL 查询,避免 ES 大量数据获取
"""
start_time = datetime.now()
try:
# ========== 优化路径:空查询 + 涨跌幅排序 ==========
# 这是概念中心首页的默认场景,直接从 MySQL 获取排序结果
is_empty_query = not request.query or not request.query.strip()
is_change_sort = request.sort_by == "change_pct"
no_stock_filter = not request.filter_stocks
if is_empty_query and is_change_sort and no_stock_filter:
logger.info(f"[Search优化] 使用 MySQL 快速路径: page={request.page}, size={request.size}")
# 计算分页
offset = (request.page - 1) * request.size
# 从 MySQL 获取涨跌幅排序的概念
top_concepts, total, actual_date = await get_top_concepts_by_change(
trade_date=request.trade_date,
limit=request.size,
offset=offset,
filter_lv1=request.filter_lv1,
filter_lv2=request.filter_lv2
)
if not top_concepts:
took_ms = int((datetime.now() - start_time).total_seconds() * 1000)
return SearchResponse(
total=0,
took_ms=took_ms,
results=[],
search_info={"query": "", "semantic_weight": 0, "match_type": "mysql_optimized"},
price_date=actual_date,
page=request.page,
total_pages=0
)
# 用 concept_id 从 ES 批量获取详细信息
concept_ids = [c['concept_id'] for c in top_concepts]
es_body = {
"query": {"terms": {"concept_id": concept_ids}},
"size": len(concept_ids),
"_source": {"excludes": ["description_embedding", "insight"]}
}
es_response = es_client.search(index=INDEX_NAME, body=es_body, timeout="10s")
# 构建 concept_id -> ES 详情的映射
es_details = {}
for hit in es_response['hits']['hits']:
source = hit['_source']
es_details[source.get('concept_id', '')] = source
# 组装结果(保持 MySQL 的排序顺序)
results = []
for mysql_concept in top_concepts:
cid = mysql_concept['concept_id']
es_data = es_details.get(cid, {})
concept_name = mysql_concept['concept_name']
# 解析股票
stocks_list, _ = parse_stocks_from_es(es_data)
# 获取层级信息
hierarchy_info = get_concept_hierarchy(concept_name)
hierarchy = HierarchyInfo(**hierarchy_info) if hierarchy_info else None
result = ConceptResult(
concept_id=cid,
concept=concept_name,
description=es_data.get('description'),
tags=es_data.get('tags', []),
outbreak_dates=es_data.get('outbreak_dates', []),
stocks=stocks_list,
stock_count=mysql_concept['stock_count'] or len(es_data.get('stocks', [])),
hierarchy=hierarchy,
score=0.0,
match_type="mysql_optimized",
highlights=None,
price_info=ConceptPriceInfo(
trade_date=actual_date,
avg_change_pct=mysql_concept['avg_change_pct']
)
)
results.append(result)
took_ms = int((datetime.now() - start_time).total_seconds() * 1000)
total_pages = max(1, (total + request.size - 1) // request.size)
logger.info(f"[Search优化] MySQL 快速路径完成: took={took_ms}ms, total={total}, returned={len(results)}")
return SearchResponse(
total=total,
took_ms=took_ms,
results=results,
search_info={
"query": "",
"semantic_weight": 0,
"match_type": "mysql_optimized",
"filter_lv1": request.filter_lv1,
"filter_lv2": request.filter_lv2,
"sort_by": request.sort_by
},
price_date=actual_date,
page=request.page,
total_pages=total_pages
)
# ========== 常规路径:有搜索词或其他排序方式 ==========
# 计算语义权重
if request.semantic_weight is not None:
semantic_weight = request.semantic_weight