Files
vf_react/wechat_pay_worker.py
2025-12-12 01:22:31 +08:00

185 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
独立的微信支付脚本(绕过 eventlet DNS 问题)
使用方式:
# 创建订单
python wechat_pay_worker.py create <order_no> <total_fee> <body> [product_id]
# 查询订单
python wechat_pay_worker.py query <order_no>
# 检查配置
python wechat_pay_worker.py check
返回值:
成功返回 0失败返回 1
输出 JSON 格式的结果(到 stdout
调试信息输出到 stderr
"""
import sys
import json
import os
import io
from contextlib import redirect_stdout
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def create_order(order_no, total_fee, body, product_id=None):
"""创建微信支付订单"""
# 捕获所有 stdout 输出wechat_pay.py 的调试信息)
debug_output = io.StringIO()
try:
with redirect_stdout(debug_output):
from wechat_pay import create_wechat_pay_instance
wechat_pay = create_wechat_pay_instance()
result = wechat_pay.create_native_order(
order_no=order_no,
total_fee=float(total_fee),
body=body,
product_id=product_id
)
# 调试信息输出到 stderr
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
# JSON 结果输出到 stdout
print(json.dumps(result, ensure_ascii=False))
return result.get('success', False)
except Exception as e:
# 调试信息输出到 stderr
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
print(json.dumps({
'success': False,
'error': f'{type(e).__name__}: {str(e)}'
}, ensure_ascii=False))
return False
def query_order(order_no):
"""查询订单状态"""
debug_output = io.StringIO()
try:
with redirect_stdout(debug_output):
from wechat_pay import create_wechat_pay_instance
wechat_pay = create_wechat_pay_instance()
result = wechat_pay.query_order(order_no=order_no)
# 调试信息输出到 stderr
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
print(json.dumps(result, ensure_ascii=False))
return result.get('success', False)
except Exception as e:
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
print(json.dumps({
'success': False,
'error': f'{type(e).__name__}: {str(e)}'
}, ensure_ascii=False))
return False
def check_config():
"""检查微信支付配置"""
debug_output = io.StringIO()
try:
with redirect_stdout(debug_output):
from wechat_pay import check_wechat_pay_ready
is_ready, msg = check_wechat_pay_ready()
# 调试信息输出到 stderr
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
result = {
'success': is_ready,
'message': msg
}
print(json.dumps(result, ensure_ascii=False))
return is_ready
except Exception as e:
debug_info = debug_output.getvalue()
if debug_info:
print(debug_info, file=sys.stderr)
print(json.dumps({
'success': False,
'error': f'{type(e).__name__}: {str(e)}'
}, ensure_ascii=False))
return False
if __name__ == "__main__":
if len(sys.argv) < 2:
print(json.dumps({
'success': False,
'error': 'Usage: python wechat_pay_worker.py <command> [args...]'
}))
sys.exit(1)
command = sys.argv[1]
if command == 'create':
if len(sys.argv) < 5:
print(json.dumps({
'success': False,
'error': 'Usage: python wechat_pay_worker.py create <order_no> <total_fee> <body> [product_id]'
}))
sys.exit(1)
order_no = sys.argv[2]
total_fee = sys.argv[3]
body = sys.argv[4]
product_id = sys.argv[5] if len(sys.argv) > 5 else None
success = create_order(order_no, total_fee, body, product_id)
sys.exit(0 if success else 1)
elif command == 'query':
if len(sys.argv) < 3:
print(json.dumps({
'success': False,
'error': 'Usage: python wechat_pay_worker.py query <order_no>'
}))
sys.exit(1)
order_no = sys.argv[2]
success = query_order(order_no)
sys.exit(0 if success else 1)
elif command == 'check':
success = check_config()
sys.exit(0 if success else 1)
else:
print(json.dumps({
'success': False,
'error': f'Unknown command: {command}'
}))
sys.exit(1)