diff --git a/app.py b/app.py index a83d489f..ffb75909 100755 --- a/app.py +++ b/app.py @@ -28,6 +28,7 @@ from datetime import datetime, timedelta, time as dt_time, date from clickhouse_driver import Client as Cclient from elasticsearch import Elasticsearch from flask_cors import CORS +import redis from collections import defaultdict from functools import lru_cache @@ -151,7 +152,82 @@ es_client = Elasticsearch( app = Flask(__name__) # 存储验证码的临时字典(生产环境应使用Redis) verification_codes = {} -wechat_qr_sessions = {} + +# ============ 微信登录 Session 管理(Redis 存储,支持多进程) ============ +# Redis 客户端配置 +redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) +WECHAT_SESSION_EXPIRE = 300 # Session 过期时间(5分钟) +WECHAT_SESSION_PREFIX = "wechat_session:" + + +def set_wechat_session(state, data): + """存储微信登录 session 到 Redis""" + try: + redis_client.setex( + f"{WECHAT_SESSION_PREFIX}{state}", + WECHAT_SESSION_EXPIRE, + json.dumps(data) + ) + return True + except Exception as e: + print(f"❌ Redis 存储 wechat session 失败: {e}") + return False + + +def get_wechat_session(state): + """从 Redis 获取微信登录 session""" + try: + data = redis_client.get(f"{WECHAT_SESSION_PREFIX}{state}") + if data: + return json.loads(data) + return None + except Exception as e: + print(f"❌ Redis 获取 wechat session 失败: {e}") + return None + + +def update_wechat_session(state, updates): + """更新微信登录 session(合并更新)""" + try: + data = get_wechat_session(state) + if data: + data.update(updates) + # 获取剩余 TTL,保持原有过期时间 + ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{state}") + if ttl > 0: + redis_client.setex( + f"{WECHAT_SESSION_PREFIX}{state}", + ttl, + json.dumps(data) + ) + else: + # 如果 TTL 无效,使用默认过期时间 + set_wechat_session(state, data) + return True + return False + except Exception as e: + print(f"❌ Redis 更新 wechat session 失败: {e}") + return False + + +def delete_wechat_session(state): + """删除微信登录 session""" + try: + redis_client.delete(f"{WECHAT_SESSION_PREFIX}{state}") + return True + except Exception as e: + print(f"❌ Redis 删除 wechat session 失败: {e}") + return False + + +def wechat_session_exists(state): + """检查微信登录 session 是否存在""" + try: + return redis_client.exists(f"{WECHAT_SESSION_PREFIX}{state}") > 0 + except Exception as e: + print(f"❌ Redis 检查 wechat session 失败: {e}") + return False +# ============ 微信登录 Session 管理结束 ============ # 腾讯云短信配置 SMS_SECRET_ID = 'AKID2we9TacdTAhCjCSYTErHVimeJo9Yr00s' SMS_SECRET_KEY = 'pMlBWijlkgT9fz5ziEXdWEnAPTJzRfkf' @@ -3469,14 +3545,14 @@ def get_wechat_qrcode(): "#wechat_redirect" ) - # 存储session信息 - wechat_qr_sessions[state] = { + # 存储session信息到 Redis + if not set_wechat_session(state, { 'status': 'waiting', - 'expires': time.time() + 300, # 5分钟过期 'user_info': None, 'wechat_openid': None, 'wechat_unionid': None - } + }): + return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500 return jsonify({"code":0, "data": @@ -3510,16 +3586,16 @@ def get_wechat_h5_auth_url(): "#wechat_redirect" ) - # 存储 session 信息 - wechat_qr_sessions[state] = { + # 存储 session 信息到 Redis + if not set_wechat_session(state, { 'status': 'waiting', - 'expires': time.time() + 300, 'mode': 'h5', # 标记为 H5 模式 'frontend_redirect': frontend_redirect, 'user_info': None, 'wechat_openid': None, 'wechat_unionid': None - } + }): + return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500 return jsonify({ 'auth_url': auth_url, @@ -3547,16 +3623,16 @@ def get_wechat_bind_qrcode(): "#wechat_redirect" ) - # 存储session信息,标记为绑定模式并记录目标用户 - wechat_qr_sessions[state] = { + # 存储session信息到 Redis,标记为绑定模式并记录目标用户 + if not set_wechat_session(state, { 'status': 'waiting', - 'expires': time.time() + 300, 'mode': 'bind', 'bind_user_id': session.get('user_id'), 'user_info': None, 'wechat_openid': None, 'wechat_unionid': None - } + }): + return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500 return jsonify({ 'auth_url': wechat_auth_url, @@ -3571,20 +3647,22 @@ def check_wechat_scan(): data = request.get_json() session_id = data.get('session_id') - if not session_id or session_id not in wechat_qr_sessions: + if not session_id: return jsonify({'status': 'invalid', 'error': '无效的session'}), 400 - session = wechat_qr_sessions[session_id] + # 从 Redis 获取 session + sess = get_wechat_session(session_id) + if not sess: + return jsonify({'status': 'expired'}), 200 # Redis 自动过期,返回 expired - # 检查是否过期 - if time.time() > session['expires']: - del wechat_qr_sessions[session_id] - return jsonify({'status': 'expired'}), 200 + # 获取剩余 TTL + ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{session_id}") + expires_in = max(0, ttl) if ttl > 0 else 0 return jsonify({ - 'status': session['status'], - 'user_info': session.get('user_info'), - 'expires_in': int(session['expires'] - time.time()) + 'status': sess['status'], + 'user_info': sess.get('user_info'), + 'expires_in': expires_in }), 200 @@ -3594,24 +3672,26 @@ def check_wechat_bind_scan(): data = request.get_json() session_id = data.get('session_id') - if not session_id or session_id not in wechat_qr_sessions: + if not session_id: return jsonify({'status': 'invalid', 'error': '无效的session'}), 400 - sess = wechat_qr_sessions[session_id] + # 从 Redis 获取 session + sess = get_wechat_session(session_id) + if not sess: + return jsonify({'status': 'expired'}), 200 # Redis 自动过期,返回 expired # 绑定模式限制 if sess.get('mode') != 'bind': return jsonify({'status': 'invalid', 'error': '会话模式错误'}), 400 - # 过期处理 - if time.time() > sess['expires']: - del wechat_qr_sessions[session_id] - return jsonify({'status': 'expired'}), 200 + # 获取剩余 TTL + ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{session_id}") + expires_in = max(0, ttl) if ttl > 0 else 0 return jsonify({ 'status': sess['status'], 'user_info': sess.get('user_info'), - 'expires_in': int(sess['expires'] - time.time()) + 'expires_in': expires_in }), 200 @@ -3624,33 +3704,25 @@ def wechat_callback(): # 错误处理:用户拒绝授权 if error: - if state in wechat_qr_sessions: - wechat_qr_sessions[state]['status'] = 'auth_denied' - wechat_qr_sessions[state]['error'] = '用户拒绝授权' + if state and wechat_session_exists(state): + update_wechat_session(state, {'status': 'auth_denied', 'error': '用户拒绝授权'}) print(f"❌ 用户拒绝授权: state={state}") return redirect('/auth/signin?error=wechat_auth_denied') # 参数验证 if not code or not state: - if state in wechat_qr_sessions: - wechat_qr_sessions[state]['status'] = 'auth_failed' - wechat_qr_sessions[state]['error'] = '授权参数缺失' + if state and wechat_session_exists(state): + update_wechat_session(state, {'status': 'auth_failed', 'error': '授权参数缺失'}) return redirect('/auth/signin?error=wechat_auth_failed') - # 验证state - if state not in wechat_qr_sessions: - return redirect('/auth/signin?error=session_expired') - - session_data = wechat_qr_sessions[state] - - # 检查过期 - if time.time() > session_data['expires']: - del wechat_qr_sessions[state] + # 从 Redis 获取 session(自动处理过期) + session_data = get_wechat_session(state) + if not session_data: return redirect('/auth/signin?error=session_expired') try: # 步骤1: 用户已扫码并授权(微信回调过来说明用户已完成扫码+授权) - session_data['status'] = 'scanned' + update_wechat_session(state, {'status': 'scanned'}) print(f"✅ 微信扫码回调: state={state}, code={code[:10]}...") # 步骤2: 根据授权模式选择对应的 AppID/AppSecret @@ -3667,20 +3739,18 @@ def wechat_callback(): # 步骤3: 获取access_token token_data = get_wechat_access_token(code, appid, appsecret) if not token_data: - session_data['status'] = 'auth_failed' - session_data['error'] = '获取访问令牌失败' + update_wechat_session(state, {'status': 'auth_failed', 'error': '获取访问令牌失败'}) print(f"❌ 获取微信access_token失败: state={state}") return redirect('/auth/signin?error=token_failed') # 步骤3: Token获取成功,标记为已授权 - session_data['status'] = 'authorized' + update_wechat_session(state, {'status': 'authorized'}) print(f"✅ 微信授权成功: openid={token_data['openid']}") # 步骤4: 获取用户信息 user_info = get_wechat_userinfo(token_data['access_token'], token_data['openid']) if not user_info: - session_data['status'] = 'auth_failed' - session_data['error'] = '获取用户信息失败' + update_wechat_session(state, {'status': 'auth_failed', 'error': '获取用户信息失败'}) print(f"❌ 获取微信用户信息失败: openid={token_data['openid']}") return redirect('/auth/signin?error=userinfo_failed') @@ -3689,10 +3759,9 @@ def wechat_callback(): unionid = user_info.get('unionid') or token_data.get('unionid') # 如果是绑定流程 - session_item = wechat_qr_sessions.get(state) - if session_item and session_item.get('mode') == 'bind': + if session_data.get('mode') == 'bind': try: - target_user_id = session.get('user_id') or session_item.get('bind_user_id') + target_user_id = session.get('user_id') or session_data.get('bind_user_id') if not target_user_id: return redirect('/auth/signin?error=bind_no_user') @@ -3708,21 +3777,20 @@ def wechat_callback(): existing = User.query.filter_by(wechat_open_id=openid).first() if existing and existing.id != target_user.id: - session_item['status'] = 'bind_conflict' + update_wechat_session(state, {'status': 'bind_conflict'}) return redirect('/home?bind=conflict') # 执行绑定 target_user.bind_wechat(openid, unionid, wechat_info=user_info) # 标记绑定完成,供前端轮询 - session_item['status'] = 'bind_ready' - session_item['user_info'] = {'user_id': target_user.id} + update_wechat_session(state, {'status': 'bind_ready', 'user_info': {'user_id': target_user.id}}) return redirect('/home?bind=success') except Exception as e: print(f"❌ 微信绑定失败: {e}") db.session.rollback() - session_item['status'] = 'bind_failed' + update_wechat_session(state, {'status': 'bind_failed'}) return redirect('/home?bind=failed') user = None @@ -3775,23 +3843,21 @@ def wechat_callback(): login_user(user, remember=True) # 更新微信session状态,供前端轮询检测 - if state in wechat_qr_sessions: - session_item = wechat_qr_sessions[state] - mode = session_item.get('mode') + mode = session_data.get('mode') - # H5 模式:重定向到前端回调页面 - if mode == 'h5': - frontend_redirect = session_item.get('frontend_redirect', '/home/wechat-callback') - # 清理 session - del wechat_qr_sessions[state] - print(f"✅ H5 微信登录成功,重定向到: {frontend_redirect}") - return redirect(f"{frontend_redirect}?wechat_login=success") + # H5 模式:重定向到前端回调页面 + if mode == 'h5': + frontend_redirect = session_data.get('frontend_redirect', '/home/wechat-callback') + # 清理 session + delete_wechat_session(state) + print(f"✅ H5 微信登录成功,重定向到: {frontend_redirect}") + return redirect(f"{frontend_redirect}?wechat_login=success") - # PC 扫码模式:更新状态供前端轮询 - if not mode: - session_item['status'] = 'register_ready' if is_new_user else 'login_ready' - session_item['user_info'] = {'user_id': user.id} - print(f"✅ 微信扫码状态已更新: {session_item['status']}, user_id: {user.id}") + # PC 扫码模式:更新状态供前端轮询 + if not mode: + new_status = 'register_ready' if is_new_user else 'login_ready' + update_wechat_session(state, {'status': new_status, 'user_info': {'user_id': user.id}}) + print(f"✅ 微信扫码状态已更新: {new_status}, user_id: {user.id}") # PC 模式直接跳转到首页 return redirect('/home') @@ -3803,9 +3869,8 @@ def wechat_callback(): db.session.rollback() # 更新session状态为失败 - if state in wechat_qr_sessions: - wechat_qr_sessions[state]['status'] = 'auth_failed' - wechat_qr_sessions[state]['error'] = str(e) + if wechat_session_exists(state): + update_wechat_session(state, {'status': 'auth_failed', 'error': str(e)}) return redirect('/auth/signin?error=login_failed') @@ -3819,17 +3884,17 @@ def login_with_wechat(): if not session_id: return jsonify({'success': False, 'error': 'session_id不能为空'}), 400 - # 验证session - session = wechat_qr_sessions.get(session_id) - if not session: + # 从 Redis 获取 session + wechat_sess = get_wechat_session(session_id) + if not wechat_sess: return jsonify({'success': False, 'error': '会话不存在或已过期'}), 400 # 检查session状态 - if session['status'] not in ['login_ready', 'register_ready']: + if wechat_sess['status'] not in ['login_ready', 'register_ready']: return jsonify({'success': False, 'error': '会话状态无效'}), 400 # 检查是否有用户信息 - user_info = session.get('user_info') + user_info = wechat_sess.get('user_info') if not user_info or not user_info.get('user_id'): return jsonify({'success': False, 'error': '用户信息不完整'}), 400 @@ -3841,25 +3906,13 @@ def login_with_wechat(): # 更新最后登录时间 user.update_last_seen() - # ✅ 修复:不立即删除session,而是标记为已完成,避免轮询报错 - # 原因:前端可能还在轮询检查状态,立即删除会导致 "无效的session" 错误 - # 保留原状态(login_ready/register_ready),前端会正确处理 - # wechat_qr_sessions[session_id]['status'] 保持不变 - - # 设置延迟删除(10秒后自动清理,给前端足够时间完成轮询) - import threading - def delayed_cleanup(): - import time - time.sleep(10) - if session_id in wechat_qr_sessions: - del wechat_qr_sessions[session_id] - print(f"✅ 延迟清理微信登录session: {session_id[:8]}...") - threading.Thread(target=delayed_cleanup, daemon=True).start() + # Redis 会自动过期,无需手动延迟删除 + # 保留 session 状态供前端轮询,Redis TTL 会自动清理 # 生成登录响应 response_data = { 'success': True, - 'message': '登录成功' if session['status'] == 'login_ready' else '注册并登录成功', + 'message': '登录成功' if wechat_sess['status'] == 'login_ready' else '注册并登录成功', 'user': { 'id': user.id, 'username': user.username, @@ -3872,7 +3925,7 @@ def login_with_wechat(): 'created_at': user.created_at.isoformat() if user.created_at else None, 'last_seen': user.last_seen.isoformat() if user.last_seen else None }, - 'isNewUser': session['status'] == 'register_ready' # 标记是否为新用户 + 'isNewUser': wechat_sess['status'] == 'register_ready' # 标记是否为新用户 } # 如果需要token认证,可以在这里生成