fix: 微信H5登录Session改用Redis存储,解决多进程不共享问题

问题:Gunicorn 4 workers 多进程部署时,wechat_qr_sessions 内存字典
不共享,导致 H5 微信授权回调被不同 worker 处理时找不到 session,
用户点击允许后登录状态无法更新。

解决方案:
- 新增 Redis 客户端配置和 session 操作函数(set/get/update/delete/exists)
- 将 wechat_qr_sessions 内存字典改为 Redis 存储
- Session 自动过期(TTL 5分钟),无需手动清理
- 添加 Redis 不可用时的错误处理

修改的接口:
- /api/auth/wechat/qrcode - PC 扫码登录
- /api/auth/wechat/h5-auth - H5 授权登录
- /api/account/wechat/qrcode - 账号绑定
- /api/auth/wechat/check - 登录状态检查
- /api/account/wechat/check - 绑定状态检查
- /api/auth/wechat/callback - 微信回调
- /api/auth/login/wechat - 微信登录确认

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
zdl
2025-12-11 11:12:05 +08:00
parent 29cf0d7013
commit 6c26f6dabc

245
app.py
View File

@@ -28,6 +28,7 @@ from datetime import datetime, timedelta, time as dt_time, date
from clickhouse_driver import Client as Cclient
from elasticsearch import Elasticsearch
from flask_cors import CORS
import redis
from collections import defaultdict
from functools import lru_cache
@@ -151,7 +152,82 @@ es_client = Elasticsearch(
app = Flask(__name__)
# 存储验证码的临时字典生产环境应使用Redis
verification_codes = {}
wechat_qr_sessions = {}
# ============ 微信登录 Session 管理Redis 存储,支持多进程) ============
# Redis 客户端配置
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
WECHAT_SESSION_EXPIRE = 300 # Session 过期时间5分钟
WECHAT_SESSION_PREFIX = "wechat_session:"
def set_wechat_session(state, data):
"""存储微信登录 session 到 Redis"""
try:
redis_client.setex(
f"{WECHAT_SESSION_PREFIX}{state}",
WECHAT_SESSION_EXPIRE,
json.dumps(data)
)
return True
except Exception as e:
print(f"❌ Redis 存储 wechat session 失败: {e}")
return False
def get_wechat_session(state):
"""从 Redis 获取微信登录 session"""
try:
data = redis_client.get(f"{WECHAT_SESSION_PREFIX}{state}")
if data:
return json.loads(data)
return None
except Exception as e:
print(f"❌ Redis 获取 wechat session 失败: {e}")
return None
def update_wechat_session(state, updates):
"""更新微信登录 session合并更新"""
try:
data = get_wechat_session(state)
if data:
data.update(updates)
# 获取剩余 TTL保持原有过期时间
ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{state}")
if ttl > 0:
redis_client.setex(
f"{WECHAT_SESSION_PREFIX}{state}",
ttl,
json.dumps(data)
)
else:
# 如果 TTL 无效,使用默认过期时间
set_wechat_session(state, data)
return True
return False
except Exception as e:
print(f"❌ Redis 更新 wechat session 失败: {e}")
return False
def delete_wechat_session(state):
"""删除微信登录 session"""
try:
redis_client.delete(f"{WECHAT_SESSION_PREFIX}{state}")
return True
except Exception as e:
print(f"❌ Redis 删除 wechat session 失败: {e}")
return False
def wechat_session_exists(state):
"""检查微信登录 session 是否存在"""
try:
return redis_client.exists(f"{WECHAT_SESSION_PREFIX}{state}") > 0
except Exception as e:
print(f"❌ Redis 检查 wechat session 失败: {e}")
return False
# ============ 微信登录 Session 管理结束 ============
# 腾讯云短信配置
SMS_SECRET_ID = 'AKID2we9TacdTAhCjCSYTErHVimeJo9Yr00s'
SMS_SECRET_KEY = 'pMlBWijlkgT9fz5ziEXdWEnAPTJzRfkf'
@@ -3469,14 +3545,14 @@ def get_wechat_qrcode():
"#wechat_redirect"
)
# 存储session信息
wechat_qr_sessions[state] = {
# 存储session信息到 Redis
if not set_wechat_session(state, {
'status': 'waiting',
'expires': time.time() + 300, # 5分钟过期
'user_info': None,
'wechat_openid': None,
'wechat_unionid': None
}
}):
return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500
return jsonify({"code":0,
"data":
@@ -3510,16 +3586,16 @@ def get_wechat_h5_auth_url():
"#wechat_redirect"
)
# 存储 session 信息
wechat_qr_sessions[state] = {
# 存储 session 信息到 Redis
if not set_wechat_session(state, {
'status': 'waiting',
'expires': time.time() + 300,
'mode': 'h5', # 标记为 H5 模式
'frontend_redirect': frontend_redirect,
'user_info': None,
'wechat_openid': None,
'wechat_unionid': None
}
}):
return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500
return jsonify({
'auth_url': auth_url,
@@ -3547,16 +3623,16 @@ def get_wechat_bind_qrcode():
"#wechat_redirect"
)
# 存储session信息标记为绑定模式并记录目标用户
wechat_qr_sessions[state] = {
# 存储session信息到 Redis,标记为绑定模式并记录目标用户
if not set_wechat_session(state, {
'status': 'waiting',
'expires': time.time() + 300,
'mode': 'bind',
'bind_user_id': session.get('user_id'),
'user_info': None,
'wechat_openid': None,
'wechat_unionid': None
}
}):
return jsonify({'error': '服务暂时不可用,请稍后重试'}), 500
return jsonify({
'auth_url': wechat_auth_url,
@@ -3571,20 +3647,22 @@ def check_wechat_scan():
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in wechat_qr_sessions:
if not session_id:
return jsonify({'status': 'invalid', 'error': '无效的session'}), 400
session = wechat_qr_sessions[session_id]
# 从 Redis 获取 session
sess = get_wechat_session(session_id)
if not sess:
return jsonify({'status': 'expired'}), 200 # Redis 自动过期,返回 expired
# 检查是否过期
if time.time() > session['expires']:
del wechat_qr_sessions[session_id]
return jsonify({'status': 'expired'}), 200
# 获取剩余 TTL
ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{session_id}")
expires_in = max(0, ttl) if ttl > 0 else 0
return jsonify({
'status': session['status'],
'user_info': session.get('user_info'),
'expires_in': int(session['expires'] - time.time())
'status': sess['status'],
'user_info': sess.get('user_info'),
'expires_in': expires_in
}), 200
@@ -3594,24 +3672,26 @@ def check_wechat_bind_scan():
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in wechat_qr_sessions:
if not session_id:
return jsonify({'status': 'invalid', 'error': '无效的session'}), 400
sess = wechat_qr_sessions[session_id]
# 从 Redis 获取 session
sess = get_wechat_session(session_id)
if not sess:
return jsonify({'status': 'expired'}), 200 # Redis 自动过期,返回 expired
# 绑定模式限制
if sess.get('mode') != 'bind':
return jsonify({'status': 'invalid', 'error': '会话模式错误'}), 400
# 过期处理
if time.time() > sess['expires']:
del wechat_qr_sessions[session_id]
return jsonify({'status': 'expired'}), 200
# 获取剩余 TTL
ttl = redis_client.ttl(f"{WECHAT_SESSION_PREFIX}{session_id}")
expires_in = max(0, ttl) if ttl > 0 else 0
return jsonify({
'status': sess['status'],
'user_info': sess.get('user_info'),
'expires_in': int(sess['expires'] - time.time())
'expires_in': expires_in
}), 200
@@ -3624,33 +3704,25 @@ def wechat_callback():
# 错误处理:用户拒绝授权
if error:
if state in wechat_qr_sessions:
wechat_qr_sessions[state]['status'] = 'auth_denied'
wechat_qr_sessions[state]['error'] = '用户拒绝授权'
if state and wechat_session_exists(state):
update_wechat_session(state, {'status': 'auth_denied', 'error': '用户拒绝授权'})
print(f"❌ 用户拒绝授权: state={state}")
return redirect('/auth/signin?error=wechat_auth_denied')
# 参数验证
if not code or not state:
if state in wechat_qr_sessions:
wechat_qr_sessions[state]['status'] = 'auth_failed'
wechat_qr_sessions[state]['error'] = '授权参数缺失'
if state and wechat_session_exists(state):
update_wechat_session(state, {'status': 'auth_failed', 'error': '授权参数缺失'})
return redirect('/auth/signin?error=wechat_auth_failed')
# 验证state
if state not in wechat_qr_sessions:
return redirect('/auth/signin?error=session_expired')
session_data = wechat_qr_sessions[state]
# 检查过期
if time.time() > session_data['expires']:
del wechat_qr_sessions[state]
# 从 Redis 获取 session自动处理过期
session_data = get_wechat_session(state)
if not session_data:
return redirect('/auth/signin?error=session_expired')
try:
# 步骤1: 用户已扫码并授权(微信回调过来说明用户已完成扫码+授权)
session_data['status'] = 'scanned'
update_wechat_session(state, {'status': 'scanned'})
print(f"✅ 微信扫码回调: state={state}, code={code[:10]}...")
# 步骤2: 根据授权模式选择对应的 AppID/AppSecret
@@ -3667,20 +3739,18 @@ def wechat_callback():
# 步骤3: 获取access_token
token_data = get_wechat_access_token(code, appid, appsecret)
if not token_data:
session_data['status'] = 'auth_failed'
session_data['error'] = '获取访问令牌失败'
update_wechat_session(state, {'status': 'auth_failed', 'error': '获取访问令牌失败'})
print(f"❌ 获取微信access_token失败: state={state}")
return redirect('/auth/signin?error=token_failed')
# 步骤3: Token获取成功标记为已授权
session_data['status'] = 'authorized'
update_wechat_session(state, {'status': 'authorized'})
print(f"✅ 微信授权成功: openid={token_data['openid']}")
# 步骤4: 获取用户信息
user_info = get_wechat_userinfo(token_data['access_token'], token_data['openid'])
if not user_info:
session_data['status'] = 'auth_failed'
session_data['error'] = '获取用户信息失败'
update_wechat_session(state, {'status': 'auth_failed', 'error': '获取用户信息失败'})
print(f"❌ 获取微信用户信息失败: openid={token_data['openid']}")
return redirect('/auth/signin?error=userinfo_failed')
@@ -3689,10 +3759,9 @@ def wechat_callback():
unionid = user_info.get('unionid') or token_data.get('unionid')
# 如果是绑定流程
session_item = wechat_qr_sessions.get(state)
if session_item and session_item.get('mode') == 'bind':
if session_data.get('mode') == 'bind':
try:
target_user_id = session.get('user_id') or session_item.get('bind_user_id')
target_user_id = session.get('user_id') or session_data.get('bind_user_id')
if not target_user_id:
return redirect('/auth/signin?error=bind_no_user')
@@ -3708,21 +3777,20 @@ def wechat_callback():
existing = User.query.filter_by(wechat_open_id=openid).first()
if existing and existing.id != target_user.id:
session_item['status'] = 'bind_conflict'
update_wechat_session(state, {'status': 'bind_conflict'})
return redirect('/home?bind=conflict')
# 执行绑定
target_user.bind_wechat(openid, unionid, wechat_info=user_info)
# 标记绑定完成,供前端轮询
session_item['status'] = 'bind_ready'
session_item['user_info'] = {'user_id': target_user.id}
update_wechat_session(state, {'status': 'bind_ready', 'user_info': {'user_id': target_user.id}})
return redirect('/home?bind=success')
except Exception as e:
print(f"❌ 微信绑定失败: {e}")
db.session.rollback()
session_item['status'] = 'bind_failed'
update_wechat_session(state, {'status': 'bind_failed'})
return redirect('/home?bind=failed')
user = None
@@ -3775,23 +3843,21 @@ def wechat_callback():
login_user(user, remember=True)
# 更新微信session状态供前端轮询检测
if state in wechat_qr_sessions:
session_item = wechat_qr_sessions[state]
mode = session_item.get('mode')
mode = session_data.get('mode')
# H5 模式:重定向到前端回调页面
if mode == 'h5':
frontend_redirect = session_item.get('frontend_redirect', '/home/wechat-callback')
# 清理 session
del wechat_qr_sessions[state]
print(f"✅ H5 微信登录成功,重定向到: {frontend_redirect}")
return redirect(f"{frontend_redirect}?wechat_login=success")
# H5 模式:重定向到前端回调页面
if mode == 'h5':
frontend_redirect = session_data.get('frontend_redirect', '/home/wechat-callback')
# 清理 session
delete_wechat_session(state)
print(f"✅ H5 微信登录成功,重定向到: {frontend_redirect}")
return redirect(f"{frontend_redirect}?wechat_login=success")
# PC 扫码模式:更新状态供前端轮询
if not mode:
session_item['status'] = 'register_ready' if is_new_user else 'login_ready'
session_item['user_info'] = {'user_id': user.id}
print(f"✅ 微信扫码状态已更新: {session_item['status']}, user_id: {user.id}")
# PC 扫码模式:更新状态供前端轮询
if not mode:
new_status = 'register_ready' if is_new_user else 'login_ready'
update_wechat_session(state, {'status': new_status, 'user_info': {'user_id': user.id}})
print(f"✅ 微信扫码状态已更新: {new_status}, user_id: {user.id}")
# PC 模式直接跳转到首页
return redirect('/home')
@@ -3803,9 +3869,8 @@ def wechat_callback():
db.session.rollback()
# 更新session状态为失败
if state in wechat_qr_sessions:
wechat_qr_sessions[state]['status'] = 'auth_failed'
wechat_qr_sessions[state]['error'] = str(e)
if wechat_session_exists(state):
update_wechat_session(state, {'status': 'auth_failed', 'error': str(e)})
return redirect('/auth/signin?error=login_failed')
@@ -3819,17 +3884,17 @@ def login_with_wechat():
if not session_id:
return jsonify({'success': False, 'error': 'session_id不能为空'}), 400
# 验证session
session = wechat_qr_sessions.get(session_id)
if not session:
# 从 Redis 获取 session
wechat_sess = get_wechat_session(session_id)
if not wechat_sess:
return jsonify({'success': False, 'error': '会话不存在或已过期'}), 400
# 检查session状态
if session['status'] not in ['login_ready', 'register_ready']:
if wechat_sess['status'] not in ['login_ready', 'register_ready']:
return jsonify({'success': False, 'error': '会话状态无效'}), 400
# 检查是否有用户信息
user_info = session.get('user_info')
user_info = wechat_sess.get('user_info')
if not user_info or not user_info.get('user_id'):
return jsonify({'success': False, 'error': '用户信息不完整'}), 400
@@ -3841,25 +3906,13 @@ def login_with_wechat():
# 更新最后登录时间
user.update_last_seen()
# ✅ 修复不立即删除session而是标记为已完成避免轮询报错
# 原因:前端可能还在轮询检查状态,立即删除会导致 "无效的session" 错误
# 保留原状态login_ready/register_ready前端会正确处理
# wechat_qr_sessions[session_id]['status'] 保持不变
# 设置延迟删除10秒后自动清理给前端足够时间完成轮询
import threading
def delayed_cleanup():
import time
time.sleep(10)
if session_id in wechat_qr_sessions:
del wechat_qr_sessions[session_id]
print(f"✅ 延迟清理微信登录session: {session_id[:8]}...")
threading.Thread(target=delayed_cleanup, daemon=True).start()
# Redis 会自动过期,无需手动延迟删除
# 保留 session 状态供前端轮询Redis TTL 会自动清理
# 生成登录响应
response_data = {
'success': True,
'message': '登录成功' if session['status'] == 'login_ready' else '注册并登录成功',
'message': '登录成功' if wechat_sess['status'] == 'login_ready' else '注册并登录成功',
'user': {
'id': user.id,
'username': user.username,
@@ -3872,7 +3925,7 @@ def login_with_wechat():
'created_at': user.created_at.isoformat() if user.created_at else None,
'last_seen': user.last_seen.isoformat() if user.last_seen else None
},
'isNewUser': session['status'] == 'register_ready' # 标记是否为新用户
'isNewUser': wechat_sess['status'] == 'register_ready' # 标记是否为新用户
}
# 如果需要token认证可以在这里生成