diff --git a/app_vx.py b/app_vx.py index 598f4626..832ab6fd 100644 --- a/app_vx.py +++ b/app_vx.py @@ -2732,6 +2732,7 @@ def api_get_events(): end_date = request.args.get('end_date') date_range = request.args.get('date_range') recent_days = request.args.get('recent_days', type=int) + time_filter = request.args.get('time_filter') # 时间快速筛选参数 # 行业筛选参数(重新设计) ind_type = request.args.get('ind_type', 'all') @@ -2792,9 +2793,13 @@ def api_get_events(): if event_type != 'all': query = query.filter_by(event_type=event_type) - # 重要性筛选 + # 重要性筛选(支持多选,逗号分隔,如 importance=S,A,B) if importance != 'all': - query = query.filter_by(importance=importance) + importance_list = [i.strip().upper() for i in importance.split(',') if i.strip()] + if len(importance_list) == 1: + query = query.filter_by(importance=importance_list[0]) + elif len(importance_list) > 1: + query = query.filter(Event.importance.in_(importance_list)) # 行业类型筛选(使用ind_type字段) if ind_type != 'all': @@ -2806,41 +2811,117 @@ def api_get_events(): # ==================== 日期筛选 ==================== - if recent_days: - cutoff_date = datetime.now() - timedelta(days=recent_days) - query = query.filter(Event.created_at >= cutoff_date) - else: - # 处理日期范围字符串 - if date_range and ' 至 ' in date_range: - try: - start_date_str, end_date_str = date_range.split(' 至 ') - start_date = start_date_str.strip() - end_date = end_date_str.strip() - except ValueError: - pass + # 时间快速筛选(优先级最高) + time_filter_applied = False + if time_filter: + now = datetime.now() + today = now.date() - # 开始日期 - if start_date: - try: - if len(start_date) == 10: - start_datetime = datetime.strptime(start_date, '%Y-%m-%d') - else: - start_datetime = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') - query = query.filter(Event.created_at >= start_datetime) - except ValueError: - pass + if time_filter == 'latest': + # 最新:最近100条,不设时间筛选,但限制数量(在后面排序后处理) + time_filter_applied = True + # 特殊处理:latest模式下per_page强制为100,忽略分页 + per_page = 100 + page = 1 - # 结束日期 - if end_date: - try: - if len(end_date) == 10: - end_datetime = datetime.strptime(end_date, '%Y-%m-%d') - end_datetime = end_datetime.replace(hour=23, minute=59, second=59) - else: - end_datetime = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S') - query = query.filter(Event.created_at <= end_datetime) - except ValueError: - pass + elif time_filter == 'intraday': + # 盘中:从今天早上9:30到当前时间 + start_time = datetime.combine(today, datetime.strptime('09:30', '%H:%M').time()) + query = query.filter(Event.created_at >= start_time) + query = query.filter(Event.created_at <= now) + time_filter_applied = True + + elif time_filter == 'morning': + # 早盘:从今天早上9:30到11:30(上午盘交易时段) + start_time = datetime.combine(today, datetime.strptime('09:30', '%H:%M').time()) + end_time = datetime.combine(today, datetime.strptime('11:30', '%H:%M').time()) + query = query.filter(Event.created_at >= start_time) + query = query.filter(Event.created_at <= end_time) + time_filter_applied = True + + elif time_filter == 'afternoon': + # 午盘:从今天上午11:30至今(下午盘交易时段开始后) + start_time = datetime.combine(today, datetime.strptime('11:30', '%H:%M').time()) + query = query.filter(Event.created_at >= start_time) + query = query.filter(Event.created_at <= now) + time_filter_applied = True + + elif time_filter == 'today': + # 今日全天:从昨天15:00到现在 + yesterday = today - timedelta(days=1) + start_time = datetime.combine(yesterday, datetime.strptime('15:00', '%H:%M').time()) + query = query.filter(Event.created_at >= start_time) + query = query.filter(Event.created_at <= now) + time_filter_applied = True + + elif time_filter == 'yesterday': + # 昨日:上一个交易日的完整数据 + # 计算上一个交易日(跳过周末) + def get_previous_trading_day(d): + d = d - timedelta(days=1) + while d.weekday() >= 5: # 5=周六, 6=周日 + d = d - timedelta(days=1) + return d + + last_trading_day = get_previous_trading_day(today) + day_before_last = get_previous_trading_day(last_trading_day) + + # 从上上个交易日15:00到上个交易日15:00 + start_time = datetime.combine(day_before_last, datetime.strptime('15:00', '%H:%M').time()) + end_time = datetime.combine(last_trading_day, datetime.strptime('15:00', '%H:%M').time()) + query = query.filter(Event.created_at >= start_time) + query = query.filter(Event.created_at <= end_time) + time_filter_applied = True + + elif time_filter == 'week': + # 近一周:自然日7天内 + start_time = datetime.combine(today - timedelta(days=7), datetime.min.time()) + query = query.filter(Event.created_at >= start_time) + time_filter_applied = True + + elif time_filter == 'month': + # 近一月:自然日30天内 + start_time = datetime.combine(today - timedelta(days=30), datetime.min.time()) + query = query.filter(Event.created_at >= start_time) + time_filter_applied = True + + # 如果没有使用time_filter,则使用其他日期筛选方式 + if not time_filter_applied: + if recent_days: + cutoff_date = datetime.now() - timedelta(days=recent_days) + query = query.filter(Event.created_at >= cutoff_date) + else: + # 处理日期范围字符串 + if date_range and ' 至 ' in date_range: + try: + start_date_str, end_date_str = date_range.split(' 至 ') + start_date = start_date_str.strip() + end_date = end_date_str.strip() + except ValueError: + pass + + # 开始日期 + if start_date: + try: + if len(start_date) == 10: + start_datetime = datetime.strptime(start_date, '%Y-%m-%d') + else: + start_datetime = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') + query = query.filter(Event.created_at >= start_datetime) + except ValueError: + pass + + # 结束日期 + if end_date: + try: + if len(end_date) == 10: + end_datetime = datetime.strptime(end_date, '%Y-%m-%d') + end_datetime = end_datetime.replace(hour=23, minute=59, second=59) + else: + end_datetime = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S') + query = query.filter(Event.created_at <= end_datetime) + except ValueError: + pass # ==================== 行业层级筛选(申银万国行业分类) ==================== @@ -3189,6 +3270,8 @@ def api_get_events(): applied_filters['type'] = event_type if importance != 'all': applied_filters['importance'] = importance + if time_filter: + applied_filters['time_filter'] = time_filter if start_date: applied_filters['start_date'] = start_date if end_date: @@ -4057,25 +4140,34 @@ def api_login_wechat(): _WECHAT_ACCESS_TOKEN_KEY = 'vf_wechat_access_token' -def get_wechat_access_token(): +def get_wechat_access_token(force_refresh=False): """ 获取微信小程序 access_token(使用 Redis 缓存,支持多 worker 共享) access_token 有效期为 7200 秒,提前 5 分钟刷新 + + Args: + force_refresh: 是否强制刷新(当 token 失效时使用) """ import time + redis_client = None try: import redis redis_client = redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379/0')) - # 尝试从 Redis 获取缓存的 token - cached = redis_client.get(_WECHAT_ACCESS_TOKEN_KEY) - if cached: - logger.debug("从 Redis 获取微信 access_token") - return cached.decode('utf-8') + # 如果不是强制刷新,尝试从 Redis 获取缓存的 token + if not force_refresh: + cached = redis_client.get(_WECHAT_ACCESS_TOKEN_KEY) + if cached: + logger.debug("从 Redis 获取微信 access_token") + return cached.decode('utf-8') + else: + # 强制刷新时,先删除旧的缓存 + redis_client.delete(_WECHAT_ACCESS_TOKEN_KEY) + logger.info("强制刷新:已删除旧的 access_token 缓存") except Exception as e: - logger.warning(f"Redis 获取 access_token 失败: {e},将直接请求微信接口") + logger.warning(f"Redis 操作失败: {e},将直接请求微信接口") redis_client = None # 请求新的 access_token @@ -4153,44 +4245,48 @@ def api_bindphone_wechat(): 'data': None }), 400 - # 1. 获取 access_token - access_token = get_wechat_access_token() - if not access_token: - return jsonify({ - 'code': 500, - 'message': '获取微信凭证失败,请稍后重试', - 'data': None - }), 500 + # 调用微信接口获取手机号(支持 token 失效自动重试) + def call_wechat_phone_api(force_refresh=False): + access_token = get_wechat_access_token(force_refresh=force_refresh) + if not access_token: + return None, {'errcode': -1, 'errmsg': '获取 access_token 失败'} - # 2. 调用微信接口获取手机号 - wx_phone_url = f'https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}' - payload = {'code': code} + wx_phone_url = f'https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}' + payload = {'code': code} - try: - response = requests.post(wx_phone_url, json=payload, timeout=10) - result = response.json() - logger.info(f"微信获取手机号响应: {result}") - except Exception as e: - logger.error(f"调用微信获取手机号接口异常: {e}") - return jsonify({ - 'code': 500, - 'message': '调用微信接口失败,请稍后重试', - 'data': None - }), 500 + try: + response = requests.post(wx_phone_url, json=payload, timeout=10) + return access_token, response.json() + except Exception as e: + logger.error(f"调用微信获取手机号接口异常: {e}") + return access_token, {'errcode': -1, 'errmsg': str(e)} - # 3. 解析微信返回结果 - if result.get('errcode') != 0: + # 第一次尝试 + access_token, result = call_wechat_phone_api(force_refresh=False) + logger.info(f"微信获取手机号响应: {result}") + + # 如果 token 失效(40001, 42001),强制刷新后重试一次 + errcode = result.get('errcode') + if errcode in (40001, 42001, 40014): + logger.warning(f"access_token 失效 (errcode={errcode}),强制刷新后重试") + access_token, result = call_wechat_phone_api(force_refresh=True) + logger.info(f"重试后微信获取手机号响应: {result}") + errcode = result.get('errcode') + + # 解析微信返回结果 + if errcode != 0: error_msg = result.get('errmsg', '未知错误') - logger.error(f"微信获取手机号失败: errcode={result.get('errcode')}, errmsg={error_msg}") + logger.error(f"微信获取手机号失败: errcode={errcode}, errmsg={error_msg}") # 常见错误码处理 - errcode = result.get('errcode') if errcode == 40029: return jsonify({'code': 400, 'message': 'code无效或已过期,请重新获取', 'data': None}), 400 elif errcode == 40013: return jsonify({'code': 400, 'message': 'AppID无效', 'data': None}), 400 elif errcode == -1: return jsonify({'code': 500, 'message': '微信服务繁忙,请稍后重试', 'data': None}), 500 + elif errcode in (40001, 42001, 40014): + return jsonify({'code': 500, 'message': '微信凭证失效,请稍后重试', 'data': None}), 500 else: return jsonify({'code': 400, 'message': f'获取手机号失败: {error_msg}', 'data': None}), 400 @@ -5807,9 +5903,17 @@ def api_calendar_events(): if end_date: query += " AND calendar_time <= :end_date" params['end_date'] = datetime.fromisoformat(end_date) + # 重要性筛选(支持多选,逗号分隔,如 importance=S,A,B) if importance != 'all': - query += " AND star = :importance" - params['importance'] = importance + importance_list = [i.strip().upper() for i in importance.split(',') if i.strip()] + if len(importance_list) == 1: + query += " AND star = :importance" + params['importance'] = importance_list[0] + elif len(importance_list) > 1: + placeholders = ', '.join([f':imp_{i}' for i in range(len(importance_list))]) + query += f" AND star IN ({placeholders})" + for i, imp in enumerate(importance_list): + params[f'imp_{i}'] = imp if category != 'all': # category参数用于筛选inferred_tag字段(如"大周期"、"大消费"等) query += " AND inferred_tag = :category" @@ -5847,8 +5951,14 @@ def api_calendar_events(): count_query += " AND calendar_time >= :start_date" if end_date: count_query += " AND calendar_time <= :end_date" + # 重要性筛选(支持多选,逗号分隔) if importance != 'all': - count_query += " AND star = :importance" + importance_list = [i.strip().upper() for i in importance.split(',') if i.strip()] + if len(importance_list) == 1: + count_query += " AND star = :importance" + elif len(importance_list) > 1: + placeholders = ', '.join([f':imp_{i}' for i in range(len(importance_list))]) + count_query += f" AND star IN ({placeholders})" if category != 'all': count_query += " AND inferred_tag = :category"