#!/usr/bin/env python # -*- coding: utf-8 -*- """ 支付宝支付工作脚本 用于在 subprocess 中执行,绕过 eventlet DNS 问题 用法: python alipay_pay_worker.py check # 检查配置 python alipay_pay_worker.py create [body] [pay_type] # 创建订单 pay_type: page=电脑网站支付(默认), wap=手机网站支付 python alipay_pay_worker.py query # 查询订单 """ import sys import json def check_config(): """检查支付宝配置""" try: from alipay_pay import check_alipay_ready is_ready, message = check_alipay_ready() return { 'success': is_ready, 'message': message if is_ready else None, 'error': None if is_ready else message } except Exception as e: return { 'success': False, 'error': str(e) } def create_order(order_no, amount, subject, body=None, pay_type='page'): """创建支付宝订单 Args: order_no: 订单号 amount: 金额 subject: 标题 body: 描述 pay_type: 支付类型 'page'=电脑网站支付, 'wap'=手机网站支付 """ try: from alipay_pay import create_alipay_instance alipay = create_alipay_instance() if pay_type == 'wap': # 手机网站支付 result = alipay.create_wap_pay_url( out_trade_no=order_no, total_amount=str(amount), subject=subject, body=body, timeout_express='30m', quit_url='https://valuefrontier.cn/pricing' # 用户取消支付时返回的页面 ) else: # 电脑网站支付(默认) result = alipay.create_page_pay_url( out_trade_no=order_no, total_amount=str(amount), subject=subject, body=body, timeout_express='30m' ) return result except Exception as e: return { 'success': False, 'error': str(e) } def query_order(order_no): """查询支付宝订单""" try: from alipay_pay import create_alipay_instance alipay = create_alipay_instance() result = alipay.query_order(out_trade_no=order_no) # 转换状态为统一格式 if result.get('success'): trade_status = result.get('trade_status') # 映射支付宝状态到统一状态 if trade_status in ['TRADE_SUCCESS', 'TRADE_FINISHED']: result['trade_state'] = 'SUCCESS' elif trade_status == 'WAIT_BUYER_PAY': result['trade_state'] = 'NOTPAY' elif trade_status == 'TRADE_CLOSED': result['trade_state'] = 'CLOSED' else: result['trade_state'] = trade_status return result except Exception as e: return { 'success': False, 'error': str(e) } def main(): """主函数""" if len(sys.argv) < 2: print(json.dumps({ 'success': False, 'error': '缺少命令参数。用法: check | create | query ' })) sys.exit(1) command = sys.argv[1].lower() try: if command == 'check': result = check_config() elif command == 'create': if len(sys.argv) < 5: result = { 'success': False, 'error': '创建订单需要参数: order_no, amount, subject' } else: order_no = sys.argv[2] amount = sys.argv[3] subject = sys.argv[4] body = sys.argv[5] if len(sys.argv) > 5 else None # pay_type: page=电脑网站支付, wap=手机网站支付 pay_type = sys.argv[6] if len(sys.argv) > 6 else 'page' result = create_order(order_no, amount, subject, body, pay_type) elif command == 'query': if len(sys.argv) < 3: result = { 'success': False, 'error': '查询订单需要参数: order_no' } else: order_no = sys.argv[2] result = query_order(order_no) else: result = { 'success': False, 'error': f'未知命令: {command}' } print(json.dumps(result, ensure_ascii=False)) except Exception as e: print(json.dumps({ 'success': False, 'error': str(e) }, ensure_ascii=False)) sys.exit(1) if __name__ == '__main__': main()