#!/usr/bin/env python # -*- coding: utf-8 -*- """ 支付宝支付集成模块 电脑网站支付 (alipay.trade.page.pay) """ import json import time import base64 import hashlib from datetime import datetime from urllib.parse import quote_plus, urlencode from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.backends import default_backend class AlipayPay: """支付宝电脑网站支付类""" def __init__(self, app_id, app_private_key, alipay_public_key, notify_url, return_url, gateway_url='https://openapi.alipay.com/gateway.do', sign_type='RSA2', charset='utf-8'): """ 初始化支付宝支付配置 Args: app_id: 支付宝应用ID app_private_key: 应用私钥(Base64编码的字符串) alipay_public_key: 支付宝公钥(Base64编码的字符串) notify_url: 异步通知地址 return_url: 同步跳转地址 gateway_url: 支付宝网关URL sign_type: 签名类型,默认RSA2 charset: 编码,默认utf-8 """ self.app_id = app_id self.notify_url = notify_url self.return_url = return_url self.gateway_url = gateway_url self.sign_type = sign_type self.charset = charset # 加载密钥 self.app_private_key = self._load_private_key(app_private_key) self.alipay_public_key = self._load_public_key(alipay_public_key) # 注意:不要在这里使用 print,会影响 subprocess 的 JSON 输出 def _load_private_key(self, key_str): """加载RSA私钥(支持 PKCS#1 和 PKCS#8 格式)""" try: # 如果密钥不包含头尾,尝试添加PEM格式头尾 if '-----BEGIN' not in key_str: # 支付宝通常使用 PKCS#8 格式(MIIEv 开头) # 先尝试 PKCS#8 格式 key_str_pkcs8 = f"-----BEGIN PRIVATE KEY-----\n{key_str}\n-----END PRIVATE KEY-----" try: private_key = serialization.load_pem_private_key( key_str_pkcs8.encode('utf-8'), password=None, backend=default_backend() ) return private_key except Exception: # 如果 PKCS#8 失败,尝试 PKCS#1 格式 key_str = f"-----BEGIN RSA PRIVATE KEY-----\n{key_str}\n-----END RSA PRIVATE KEY-----" private_key = serialization.load_pem_private_key( key_str.encode('utf-8'), password=None, backend=default_backend() ) return private_key except Exception as e: import sys print(f"[AlipayPay] Load private key failed: {e}", file=sys.stderr) raise def _load_public_key(self, key_str): """加载RSA公钥""" import sys try: # 如果密钥不包含头尾,添加PEM格式头尾 if '-----BEGIN' not in key_str: key_str = f"-----BEGIN PUBLIC KEY-----\n{key_str}\n-----END PUBLIC KEY-----" public_key = serialization.load_pem_public_key( key_str.encode('utf-8'), backend=default_backend() ) return public_key except Exception as e: print(f"[AlipayPay] Load public key failed: {e}", file=sys.stderr) raise def _sign(self, unsigned_string): """ RSA2签名 Args: unsigned_string: 待签名字符串 Returns: Base64编码的签名 """ import sys try: signature = self.app_private_key.sign( unsigned_string.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) return base64.b64encode(signature).decode('utf-8') except Exception as e: print(f"[AlipayPay] Sign failed: {e}", file=sys.stderr) raise def _verify(self, message, signature): """ 验证支付宝签名 Args: message: 原始消息 signature: Base64编码的签名 Returns: bool: 验证是否通过 """ import sys try: self.alipay_public_key.verify( base64.b64decode(signature), message.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) return True except Exception as e: print(f"[AlipayPay] Verify signature failed: {e}", file=sys.stderr) return False def _get_sign_content(self, params): """ 获取待签名字符串 Args: params: 参数字典 Returns: 排序后的参数字符串 """ # 过滤空值和sign参数,按key排序 filtered_params = {k: v for k, v in params.items() if v is not None and v != '' and k != 'sign'} sorted_params = sorted(filtered_params.items()) # 拼接字符串 sign_content = '&'.join([f'{k}={v}' for k, v in sorted_params]) return sign_content def create_page_pay_url(self, out_trade_no, total_amount, subject, body=None, timeout_express='30m'): """ 创建电脑网站支付URL Args: out_trade_no: 商户订单号 total_amount: 订单总金额(元,精确到小数点后两位) subject: 订单标题 body: 订单描述(可选) timeout_express: 订单超时时间,默认30分钟 Returns: dict: 包含支付URL的响应 """ try: # 金额格式化为两位小数(支付宝要求) formatted_amount = f"{float(total_amount):.2f}" # 业务参数 biz_content = { 'out_trade_no': out_trade_no, 'total_amount': formatted_amount, 'subject': subject, 'product_code': 'FAST_INSTANT_TRADE_PAY', # 电脑网站支付固定值 'timeout_express': timeout_express, } if body: biz_content['body'] = body # 公共请求参数 params = { 'app_id': self.app_id, 'method': 'alipay.trade.page.pay', 'format': 'json', 'charset': self.charset, 'sign_type': self.sign_type, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'version': '1.0', 'notify_url': self.notify_url, 'return_url': self.return_url, 'biz_content': json.dumps(biz_content, ensure_ascii=False), } # 生成签名 sign_content = self._get_sign_content(params) params['sign'] = self._sign(sign_content) # 构建完整的支付URL pay_url = f"{self.gateway_url}?{urlencode(params)}" # 日志输出到 stderr,避免影响 subprocess JSON 输出 import sys print(f"[AlipayPay] Order created: {out_trade_no}, amount: {formatted_amount}", file=sys.stderr) return { 'success': True, 'pay_url': pay_url, 'order_no': out_trade_no } except Exception as e: import sys print(f"[AlipayPay] Create order failed: {e}", file=sys.stderr) return { 'success': False, 'error': str(e) } def create_wap_pay_url(self, out_trade_no, total_amount, subject, body=None, timeout_express='30m', quit_url=None): """ 创建手机网站支付URL (H5支付,可调起手机支付宝APP) Args: out_trade_no: 商户订单号 total_amount: 订单总金额(元,精确到小数点后两位) subject: 订单标题 body: 订单描述(可选) timeout_express: 订单超时时间,默认30分钟 quit_url: 用户付款中途退出返回商户网站的地址(可选) Returns: dict: 包含支付URL的响应 """ try: # 金额格式化为两位小数(支付宝要求) formatted_amount = f"{float(total_amount):.2f}" # 业务参数 biz_content = { 'out_trade_no': out_trade_no, 'total_amount': formatted_amount, 'subject': subject, 'product_code': 'QUICK_WAP_WAY', # 手机网站支付固定值 'timeout_express': timeout_express, } if body: biz_content['body'] = body if quit_url: biz_content['quit_url'] = quit_url # 公共请求参数 params = { 'app_id': self.app_id, 'method': 'alipay.trade.wap.pay', # 手机网站支付接口 'format': 'json', 'charset': self.charset, 'sign_type': self.sign_type, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'version': '1.0', 'notify_url': self.notify_url, 'return_url': self.return_url, 'biz_content': json.dumps(biz_content, ensure_ascii=False), } # 生成签名 sign_content = self._get_sign_content(params) params['sign'] = self._sign(sign_content) # 构建完整的支付URL pay_url = f"{self.gateway_url}?{urlencode(params)}" # 日志输出到 stderr,避免影响 subprocess JSON 输出 import sys print(f"[AlipayPay] WAP Order created: {out_trade_no}, amount: {formatted_amount}", file=sys.stderr) return { 'success': True, 'pay_url': pay_url, 'order_no': out_trade_no, 'pay_type': 'wap' # 标识为手机网站支付 } except Exception as e: import sys print(f"[AlipayPay] Create WAP order failed: {e}", file=sys.stderr) return { 'success': False, 'error': str(e) } def query_order(self, out_trade_no=None, trade_no=None): """ 查询订单状态 Args: out_trade_no: 商户订单号 trade_no: 支付宝交易号 Returns: dict: 订单状态信息 """ import requests try: if not out_trade_no and not trade_no: return {'success': False, 'error': '订单号和交易号不能同时为空'} # 业务参数 biz_content = {} if out_trade_no: biz_content['out_trade_no'] = out_trade_no if trade_no: biz_content['trade_no'] = trade_no # 公共请求参数 params = { 'app_id': self.app_id, 'method': 'alipay.trade.query', 'format': 'json', 'charset': self.charset, 'sign_type': self.sign_type, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'version': '1.0', 'biz_content': json.dumps(biz_content, ensure_ascii=False), } # 生成签名 sign_content = self._get_sign_content(params) params['sign'] = self._sign(sign_content) # 发送请求 response = requests.get(self.gateway_url, params=params, timeout=30) result = response.json() # 日志输出到 stderr import sys print(f"[AlipayPay] Query response: {result}", file=sys.stderr) # 解析响应 query_response = result.get('alipay_trade_query_response', {}) if query_response.get('code') == '10000': trade_status = query_response.get('trade_status') return { 'success': True, 'trade_status': trade_status, 'trade_no': query_response.get('trade_no'), 'out_trade_no': query_response.get('out_trade_no'), 'total_amount': query_response.get('total_amount'), 'buyer_logon_id': query_response.get('buyer_logon_id'), 'send_pay_date': query_response.get('send_pay_date'), # 状态映射 'is_paid': trade_status == 'TRADE_SUCCESS' or trade_status == 'TRADE_FINISHED', 'is_closed': trade_status == 'TRADE_CLOSED', 'is_waiting': trade_status == 'WAIT_BUYER_PAY', } else: error_msg = query_response.get('sub_msg') or query_response.get('msg', '查询失败') return { 'success': False, 'error': error_msg, 'code': query_response.get('code'), 'sub_code': query_response.get('sub_code') } except requests.RequestException as e: import sys print(f"[AlipayPay] API request failed: {e}", file=sys.stderr) return {'success': False, 'error': f'网络请求失败: {e}'} except Exception as e: import sys print(f"[AlipayPay] Query order error: {e}", file=sys.stderr) return {'success': False, 'error': str(e)} def verify_callback(self, params): """ 验证支付宝异步回调 Args: params: 回调参数字典 Returns: dict: 验证结果 """ try: # 获取签名 sign = params.pop('sign', None) sign_type = params.pop('sign_type', 'RSA2') if not sign: return {'success': False, 'error': '缺少签名参数'} # 构建待验签字符串 sign_content = self._get_sign_content(params) # 验证签名 import sys if self._verify(sign_content, sign): print(f"[AlipayPay] Callback signature verified", file=sys.stderr) return { 'success': True, 'data': params } else: print(f"[AlipayPay] Callback signature verification failed", file=sys.stderr) return { 'success': False, 'error': '签名验证失败' } except Exception as e: print(f"[AlipayPay] Verify callback error: {e}", file=sys.stderr) return {'success': False, 'error': str(e)} def verify_return(self, params): """ 验证支付宝同步返回(用户支付后跳转回来) Args: params: 同步返回参数字典 Returns: dict: 验证结果 """ # 同步返回的验签逻辑与异步回调相同 return self.verify_callback(params) # 工厂函数 def create_alipay_instance(): """创建支付宝支付实例""" try: from alipay_config import ALIPAY_CONFIG, get_app_private_key, get_alipay_public_key, validate_config # 验证配置 is_valid, issues = validate_config() if not is_valid: raise ValueError(f"支付宝配置错误: {'; '.join(issues)}") # 获取密钥 app_private_key = get_app_private_key() alipay_public_key = get_alipay_public_key() if not app_private_key or not alipay_public_key: raise ValueError("密钥加载失败") return AlipayPay( app_id=ALIPAY_CONFIG['app_id'], app_private_key=app_private_key, alipay_public_key=alipay_public_key, notify_url=ALIPAY_CONFIG['notify_url'], return_url=ALIPAY_CONFIG['return_url'], gateway_url=ALIPAY_CONFIG['gateway_url'], sign_type=ALIPAY_CONFIG['sign_type'], charset=ALIPAY_CONFIG['charset'] ) except ImportError as e: raise ValueError(f"无法导入支付宝配置: {e}") except Exception as e: raise ValueError(f"创建支付宝实例失败: {e}") def check_alipay_ready(): """检查支付宝支付是否就绪""" try: instance = create_alipay_instance() return True, "支付宝支付配置正确" except Exception as e: return False, str(e) if __name__ == '__main__': """测试代码""" import sys print("=" * 60) print("Alipay Payment Test") print("=" * 60) try: # 检查配置 is_ready, message = check_alipay_ready() print(f"\nConfig status: {'READY' if is_ready else 'NOT READY'}") print(f"Details: {message}") if is_ready: # 创建实例 alipay = create_alipay_instance() # 测试创建订单 test_order_no = f"TEST{int(time.time())}" result = alipay.create_page_pay_url( out_trade_no=test_order_no, total_amount='0.01', subject='Test Product', body='This is a test order' ) print(f"\nCreate order result:") print(json.dumps(result, indent=2, ensure_ascii=False)) if result['success']: print(f"\nPayment URL (open in browser):") print(result['pay_url'][:200] + '...') except Exception as e: print(f"\nTest failed: {e}") import traceback traceback.print_exc() print("\n" + "=" * 60)