diff --git a/__pycache__/app.cpython-310.pyc b/__pycache__/app.cpython-310.pyc index 9733ad91..a8142f82 100644 Binary files a/__pycache__/app.cpython-310.pyc and b/__pycache__/app.cpython-310.pyc differ diff --git a/app.py b/app.py index e17739d5..f2d3cd78 100755 --- a/app.py +++ b/app.py @@ -2156,35 +2156,47 @@ def create_payment_order(): db.session.rollback() return jsonify({'success': False, 'error': f'订单创建失败: {str(e)}'}), 500 - # 尝试调用真实的微信支付API + # 尝试调用真实的微信支付API(使用 subprocess 绕过 eventlet DNS 问题) try: - from wechat_pay import create_wechat_pay_instance, check_wechat_pay_ready + import subprocess + import urllib.parse - # 检查微信支付是否就绪 - is_ready, ready_msg = check_wechat_pay_ready() - if not is_ready: - # 使用模拟二维码 + # 使用独立脚本检查配置 + script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'wechat_pay_worker.py') + + # 先检查配置 + check_result = subprocess.run( + [sys.executable, script_path, 'check'], + capture_output=True, text=True, timeout=10 + ) + + if check_result.returncode != 0: + check_data = json.loads(check_result.stdout) if check_result.stdout else {} + ready_msg = check_data.get('error', check_data.get('message', '未知错误')) order.qr_code_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data=wxpay://order/{order.order_no}" order.remark = f"演示模式 - {ready_msg}" else: - wechat_pay = create_wechat_pay_instance() - # 创建微信支付订单 plan_display_name = f"{plan_name.upper()}版本-{billing_cycle}" - wechat_result = wechat_pay.create_native_order( - order_no=order.order_no, - total_fee=float(amount), - body=f"VFr-{plan_display_name}", - product_id=f"{plan_name}_{billing_cycle}" + body = f"VFr-{plan_display_name}" + product_id = f"{plan_name}_{billing_cycle}" + + create_result = subprocess.run( + [sys.executable, script_path, 'create', order.order_no, str(float(amount)), body, product_id], + capture_output=True, text=True, timeout=60 ) - if wechat_result['success']: + print(f"[微信支付] 创建订单返回: {create_result.stdout}") + if create_result.stderr: + print(f"[微信支付] 错误输出: {create_result.stderr}") + wechat_result = json.loads(create_result.stdout) if create_result.stdout else {'success': False, 'error': '无返回'} + + if wechat_result.get('success'): # 获取微信返回的原始code_url wechat_code_url = wechat_result['code_url'] # 将微信协议URL转换为二维码图片URL - import urllib.parse encoded_url = urllib.parse.quote(wechat_code_url, safe='') qr_image_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data={encoded_url}" @@ -2196,10 +2208,16 @@ def create_payment_order(): order.qr_code_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data=wxpay://order/{order.order_no}" order.remark = f"微信支付失败: {wechat_result.get('error')}" - except ImportError as e: + except subprocess.TimeoutExpired: order.qr_code_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data=wxpay://order/{order.order_no}" - order.remark = "微信支付模块未配置" + order.remark = "微信支付超时" + except json.JSONDecodeError as e: + order.qr_code_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data=wxpay://order/{order.order_no}" + order.remark = f"微信支付返回解析失败: {str(e)}" except Exception as e: + import traceback + print(f"[微信支付] Exception: {e}") + traceback.print_exc() order.qr_code_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data=wxpay://order/{order.order_no}" order.remark = f"支付异常: {str(e)}" @@ -10455,12 +10473,39 @@ def broadcast_new_event(event): # Redis Key 用于多 Worker 协调 REDIS_KEY_LAST_MAX_EVENT_ID = 'vf:event_polling:last_max_id' REDIS_KEY_POLLING_LOCK = 'vf:event_polling:lock' +REDIS_KEY_PENDING_EVENTS = 'vf:event_polling:pending_events' # 待推送事件集合(没有 related_stocks 的事件) # 本地缓存(减少 Redis 查询) _local_last_max_event_id = 0 _polling_initialized = False +def _add_pending_event(event_id): + """将事件添加到待推送列表""" + try: + redis_client.sadd(REDIS_KEY_PENDING_EVENTS, str(event_id)) + except Exception as e: + print(f'[轮询 WARN] 添加待推送事件失败: {e}') + + +def _remove_pending_event(event_id): + """从待推送列表移除事件""" + try: + redis_client.srem(REDIS_KEY_PENDING_EVENTS, str(event_id)) + except Exception as e: + print(f'[轮询 WARN] 移除待推送事件失败: {e}') + + +def _get_pending_events(): + """获取所有待推送事件ID""" + try: + pending = redis_client.smembers(REDIS_KEY_PENDING_EVENTS) + return [int(eid) for eid in pending] if pending else [] + except Exception as e: + print(f'[轮询 WARN] 获取待推送事件失败: {e}') + return [] + + def _get_last_max_event_id(): """从 Redis 获取最大事件 ID""" try: @@ -10491,6 +10536,11 @@ def poll_new_events(): 1. 使用 Redis 分布式锁,确保同一时刻只有一个 Worker 执行轮询 2. 使用 Redis 存储 last_max_event_id,所有 Worker 共享状态 3. 通过 Redis 消息队列广播到所有 Worker 的客户端 + + 待推送事件机制: + - 当事件首次被检测到但没有 related_stocks 时,加入待推送列表 + - 每次轮询时检查待推送列表中的事件是否已有 related_stocks + - 有则推送并从列表移除,超过24小时的事件自动清理 """ import os @@ -10528,6 +10578,36 @@ def poll_new_events(): print(f'[轮询] 数据库查询: 找到 {len(events_in_24h)} 个近24小时内的事件') + # 创建事件ID到事件对象的映射 + events_map = {event.id: event for event in events_in_24h} + + # === 步骤1: 检查待推送列表中的事件 === + pending_event_ids = _get_pending_events() + print(f'[轮询] 待推送列表: {len(pending_event_ids)} 个事件') + + pushed_from_pending = 0 + for pending_id in pending_event_ids: + if pending_id in events_map: + event = events_map[pending_id] + related_stocks_count = event.related_stocks.count() + + if related_stocks_count > 0: + # 事件现在有 related_stocks 了,推送它 + broadcast_new_event(event) + _remove_pending_event(pending_id) + pushed_from_pending += 1 + print(f'[轮询] ✓ 待推送事件 ID={pending_id} 现在有 {related_stocks_count} 个关联股票,已推送') + else: + print(f'[轮询] - 待推送事件 ID={pending_id} 仍无关联股票,继续等待') + else: + # 事件已超过24小时或已删除,从待推送列表移除 + _remove_pending_event(pending_id) + print(f'[轮询] × 待推送事件 ID={pending_id} 已过期或不存在,已移除') + + if pushed_from_pending > 0: + print(f'[轮询] 从待推送列表推送了 {pushed_from_pending} 个事件') + + # === 步骤2: 检查新事件 === # 找出新插入的事件(ID > last_max_event_id) new_events = [ event for event in events_in_24h @@ -10540,6 +10620,7 @@ def poll_new_events(): print(f'[轮询] 发现 {len(new_events)} 个新事件') pushed_count = 0 + pending_count = 0 for event in new_events: # 检查事件是否有关联股票(只推送有关联股票的事件) related_stocks_count = event.related_stocks.count() @@ -10552,9 +10633,12 @@ def poll_new_events(): pushed_count += 1 print(f'[轮询] ✓ 已推送事件 ID={event.id}') else: - print(f'[轮询] - 跳过(暂无关联股票)') + # 没有关联股票,加入待推送列表 + _add_pending_event(event.id) + pending_count += 1 + print(f'[轮询] → 加入待推送列表(暂无关联股票)') - print(f'[轮询] 本轮共推送 {pushed_count}/{len(new_events)} 个事件') + print(f'[轮询] 本轮: 推送 {pushed_count} 个, 加入待推送 {pending_count} 个') # 更新最大事件ID new_max_id = max(event.id for event in events_in_24h) diff --git a/wechat_pay_config.py b/wechat_pay_config.py index 8edb3eea..f298810c 100644 --- a/wechat_pay_config.py +++ b/wechat_pay_config.py @@ -4,6 +4,10 @@ 微信支付真实配置文件 请根据您的微信商户平台信息填写 """ +import os + +# 获取当前文件所在目录(确保无论从哪里启动都能找到证书) +_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 微信支付配置 - 请替换为您的真实信息 WECHAT_PAY_CONFIG = { @@ -11,10 +15,10 @@ WECHAT_PAY_CONFIG = { 'app_id': 'wx0edeaab76d4fa414', # 微信公众平台AppID 'mch_id': '1718543833', # 微信支付商户号 'api_key': '141a5753c42526bb44bc44d6c4277664', # 微信商户平台设置的API密钥 - - # 证书文件路径 - 'cert_path': './certs/apiclient_cert.pem', - 'key_path': './certs/apiclient_key.pem', + + # 证书文件路径(使用绝对路径,兼容 gunicorn 多进程启动) + 'cert_path': os.path.join(_BASE_DIR, 'certs', 'apiclient_cert.pem'), + 'key_path': os.path.join(_BASE_DIR, 'certs', 'apiclient_key.pem'), # 回调配置 'notify_url': 'https://valuefrontier.cn/api/payment/wechat/callback', @@ -38,8 +42,7 @@ def validate_config(): if WECHAT_PAY_CONFIG['api_key'].startswith('你的'): issues.append("❌ api_key 还是示例值,请替换为真实的32位API密钥") - # 检查证书文件 - import os + # 检查证书文件(路径已是绝对路径) for key in ['cert_path', 'key_path']: if not os.path.exists(WECHAT_PAY_CONFIG[key]): issues.append(f"❌ 证书文件不存在: {WECHAT_PAY_CONFIG[key]}") diff --git a/wechat_pay_worker.py b/wechat_pay_worker.py new file mode 100644 index 00000000..a16574a7 --- /dev/null +++ b/wechat_pay_worker.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +独立的微信支付脚本(绕过 eventlet DNS 问题) + +使用方式: + # 创建订单 + python wechat_pay_worker.py create [product_id] + + # 查询订单 + python wechat_pay_worker.py query + + # 检查配置 + python wechat_pay_worker.py check + +返回值: + 成功返回 0,失败返回 1 + 输出 JSON 格式的结果 +""" + +import sys +import json +import os + +# 添加当前目录到路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +def create_order(order_no, total_fee, body, product_id=None): + """创建微信支付订单""" + try: + from wechat_pay import create_wechat_pay_instance + + wechat_pay = create_wechat_pay_instance() + result = wechat_pay.create_native_order( + order_no=order_no, + total_fee=float(total_fee), + body=body, + product_id=product_id + ) + + print(json.dumps(result, ensure_ascii=False)) + return result.get('success', False) + + except Exception as e: + print(json.dumps({ + 'success': False, + 'error': f'{type(e).__name__}: {str(e)}' + }, ensure_ascii=False)) + return False + + +def query_order(order_no): + """查询订单状态""" + try: + from wechat_pay import create_wechat_pay_instance + + wechat_pay = create_wechat_pay_instance() + result = wechat_pay.query_order(order_no=order_no) + + print(json.dumps(result, ensure_ascii=False)) + return result.get('success', False) + + except Exception as e: + print(json.dumps({ + 'success': False, + 'error': f'{type(e).__name__}: {str(e)}' + }, ensure_ascii=False)) + return False + + +def check_config(): + """检查微信支付配置""" + try: + from wechat_pay import check_wechat_pay_ready + + is_ready, msg = check_wechat_pay_ready() + result = { + 'success': is_ready, + 'message': msg + } + + print(json.dumps(result, ensure_ascii=False)) + return is_ready + + except Exception as e: + print(json.dumps({ + 'success': False, + 'error': f'{type(e).__name__}: {str(e)}' + }, ensure_ascii=False)) + return False + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(json.dumps({ + 'success': False, + 'error': 'Usage: python wechat_pay_worker.py [args...]' + })) + sys.exit(1) + + command = sys.argv[1] + + if command == 'create': + if len(sys.argv) < 5: + print(json.dumps({ + 'success': False, + 'error': 'Usage: python wechat_pay_worker.py create [product_id]' + })) + sys.exit(1) + + order_no = sys.argv[2] + total_fee = sys.argv[3] + body = sys.argv[4] + product_id = sys.argv[5] if len(sys.argv) > 5 else None + + success = create_order(order_no, total_fee, body, product_id) + sys.exit(0 if success else 1) + + elif command == 'query': + if len(sys.argv) < 3: + print(json.dumps({ + 'success': False, + 'error': 'Usage: python wechat_pay_worker.py query ' + })) + sys.exit(1) + + order_no = sys.argv[2] + success = query_order(order_no) + sys.exit(0 if success else 1) + + elif command == 'check': + success = check_config() + sys.exit(0 if success else 1) + + else: + print(json.dumps({ + 'success': False, + 'error': f'Unknown command: {command}' + })) + sys.exit(1)