441 lines
14 KiB
Python
441 lines
14 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
支付宝支付集成模块
|
||
电脑网站支付 (alipay.trade.page.pay)
|
||
"""
|
||
|
||
import json
|
||
import time
|
||
import base64
|
||
import hashlib
|
||
from datetime import datetime
|
||
from urllib.parse import quote_plus, urlencode
|
||
from cryptography.hazmat.primitives import hashes, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import padding
|
||
from cryptography.hazmat.backends import default_backend
|
||
|
||
|
||
class AlipayPay:
|
||
"""支付宝电脑网站支付类"""
|
||
|
||
def __init__(self, app_id, app_private_key, alipay_public_key, notify_url, return_url,
|
||
gateway_url='https://openapi.alipay.com/gateway.do',
|
||
sign_type='RSA2', charset='utf-8'):
|
||
"""
|
||
初始化支付宝支付配置
|
||
|
||
Args:
|
||
app_id: 支付宝应用ID
|
||
app_private_key: 应用私钥(Base64编码的字符串)
|
||
alipay_public_key: 支付宝公钥(Base64编码的字符串)
|
||
notify_url: 异步通知地址
|
||
return_url: 同步跳转地址
|
||
gateway_url: 支付宝网关URL
|
||
sign_type: 签名类型,默认RSA2
|
||
charset: 编码,默认utf-8
|
||
"""
|
||
self.app_id = app_id
|
||
self.notify_url = notify_url
|
||
self.return_url = return_url
|
||
self.gateway_url = gateway_url
|
||
self.sign_type = sign_type
|
||
self.charset = charset
|
||
|
||
# 加载密钥
|
||
self.app_private_key = self._load_private_key(app_private_key)
|
||
self.alipay_public_key = self._load_public_key(alipay_public_key)
|
||
|
||
print(f"✅ 支付宝支付初始化成功")
|
||
print(f" App ID: {app_id}")
|
||
print(f" 网关: {gateway_url}")
|
||
|
||
def _load_private_key(self, key_str):
|
||
"""加载RSA私钥(支持 PKCS#1 和 PKCS#8 格式)"""
|
||
try:
|
||
# 如果密钥不包含头尾,尝试添加PEM格式头尾
|
||
if '-----BEGIN' not in key_str:
|
||
# 支付宝通常使用 PKCS#8 格式(MIIEv 开头)
|
||
# 先尝试 PKCS#8 格式
|
||
key_str_pkcs8 = f"-----BEGIN PRIVATE KEY-----\n{key_str}\n-----END PRIVATE KEY-----"
|
||
try:
|
||
private_key = serialization.load_pem_private_key(
|
||
key_str_pkcs8.encode('utf-8'),
|
||
password=None,
|
||
backend=default_backend()
|
||
)
|
||
return private_key
|
||
except Exception:
|
||
# 如果 PKCS#8 失败,尝试 PKCS#1 格式
|
||
key_str = f"-----BEGIN RSA PRIVATE KEY-----\n{key_str}\n-----END RSA PRIVATE KEY-----"
|
||
|
||
private_key = serialization.load_pem_private_key(
|
||
key_str.encode('utf-8'),
|
||
password=None,
|
||
backend=default_backend()
|
||
)
|
||
return private_key
|
||
except Exception as e:
|
||
print(f"[AlipayPay] Load private key failed: {e}")
|
||
raise
|
||
|
||
def _load_public_key(self, key_str):
|
||
"""加载RSA公钥"""
|
||
try:
|
||
# 如果密钥不包含头尾,添加PEM格式头尾
|
||
if '-----BEGIN' not in key_str:
|
||
key_str = f"-----BEGIN PUBLIC KEY-----\n{key_str}\n-----END PUBLIC KEY-----"
|
||
|
||
public_key = serialization.load_pem_public_key(
|
||
key_str.encode('utf-8'),
|
||
backend=default_backend()
|
||
)
|
||
return public_key
|
||
except Exception as e:
|
||
print(f"❌ 加载公钥失败: {e}")
|
||
raise
|
||
|
||
def _sign(self, unsigned_string):
|
||
"""
|
||
RSA2签名
|
||
|
||
Args:
|
||
unsigned_string: 待签名字符串
|
||
|
||
Returns:
|
||
Base64编码的签名
|
||
"""
|
||
try:
|
||
signature = self.app_private_key.sign(
|
||
unsigned_string.encode('utf-8'),
|
||
padding.PKCS1v15(),
|
||
hashes.SHA256()
|
||
)
|
||
return base64.b64encode(signature).decode('utf-8')
|
||
except Exception as e:
|
||
print(f"❌ 签名失败: {e}")
|
||
raise
|
||
|
||
def _verify(self, message, signature):
|
||
"""
|
||
验证支付宝签名
|
||
|
||
Args:
|
||
message: 原始消息
|
||
signature: Base64编码的签名
|
||
|
||
Returns:
|
||
bool: 验证是否通过
|
||
"""
|
||
try:
|
||
self.alipay_public_key.verify(
|
||
base64.b64decode(signature),
|
||
message.encode('utf-8'),
|
||
padding.PKCS1v15(),
|
||
hashes.SHA256()
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ 签名验证失败: {e}")
|
||
return False
|
||
|
||
def _get_sign_content(self, params):
|
||
"""
|
||
获取待签名字符串
|
||
|
||
Args:
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
排序后的参数字符串
|
||
"""
|
||
# 过滤空值和sign参数,按key排序
|
||
filtered_params = {k: v for k, v in params.items()
|
||
if v is not None and v != '' and k != 'sign'}
|
||
sorted_params = sorted(filtered_params.items())
|
||
|
||
# 拼接字符串
|
||
sign_content = '&'.join([f'{k}={v}' for k, v in sorted_params])
|
||
return sign_content
|
||
|
||
def create_page_pay_url(self, out_trade_no, total_amount, subject, body=None, timeout_express='30m'):
|
||
"""
|
||
创建电脑网站支付URL
|
||
|
||
Args:
|
||
out_trade_no: 商户订单号
|
||
total_amount: 订单总金额(元,精确到小数点后两位)
|
||
subject: 订单标题
|
||
body: 订单描述(可选)
|
||
timeout_express: 订单超时时间,默认30分钟
|
||
|
||
Returns:
|
||
dict: 包含支付URL的响应
|
||
"""
|
||
try:
|
||
# 业务参数
|
||
biz_content = {
|
||
'out_trade_no': out_trade_no,
|
||
'total_amount': str(total_amount),
|
||
'subject': subject,
|
||
'product_code': 'FAST_INSTANT_TRADE_PAY', # 电脑网站支付固定值
|
||
'timeout_express': timeout_express,
|
||
}
|
||
if body:
|
||
biz_content['body'] = body
|
||
|
||
# 公共请求参数
|
||
params = {
|
||
'app_id': self.app_id,
|
||
'method': 'alipay.trade.page.pay',
|
||
'format': 'json',
|
||
'charset': self.charset,
|
||
'sign_type': self.sign_type,
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'version': '1.0',
|
||
'notify_url': self.notify_url,
|
||
'return_url': self.return_url,
|
||
'biz_content': json.dumps(biz_content, ensure_ascii=False),
|
||
}
|
||
|
||
# 生成签名
|
||
sign_content = self._get_sign_content(params)
|
||
params['sign'] = self._sign(sign_content)
|
||
|
||
# 构建完整的支付URL
|
||
pay_url = f"{self.gateway_url}?{urlencode(params)}"
|
||
|
||
print(f"✅ 支付宝订单创建成功: {out_trade_no}")
|
||
print(f" 金额: {total_amount}元")
|
||
print(f" 标题: {subject}")
|
||
|
||
return {
|
||
'success': True,
|
||
'pay_url': pay_url,
|
||
'order_no': out_trade_no
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"❌ 创建支付宝订单失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': str(e)
|
||
}
|
||
|
||
def query_order(self, out_trade_no=None, trade_no=None):
|
||
"""
|
||
查询订单状态
|
||
|
||
Args:
|
||
out_trade_no: 商户订单号
|
||
trade_no: 支付宝交易号
|
||
|
||
Returns:
|
||
dict: 订单状态信息
|
||
"""
|
||
import requests
|
||
|
||
try:
|
||
if not out_trade_no and not trade_no:
|
||
return {'success': False, 'error': '订单号和交易号不能同时为空'}
|
||
|
||
# 业务参数
|
||
biz_content = {}
|
||
if out_trade_no:
|
||
biz_content['out_trade_no'] = out_trade_no
|
||
if trade_no:
|
||
biz_content['trade_no'] = trade_no
|
||
|
||
# 公共请求参数
|
||
params = {
|
||
'app_id': self.app_id,
|
||
'method': 'alipay.trade.query',
|
||
'format': 'json',
|
||
'charset': self.charset,
|
||
'sign_type': self.sign_type,
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'version': '1.0',
|
||
'biz_content': json.dumps(biz_content, ensure_ascii=False),
|
||
}
|
||
|
||
# 生成签名
|
||
sign_content = self._get_sign_content(params)
|
||
params['sign'] = self._sign(sign_content)
|
||
|
||
# 发送请求
|
||
response = requests.get(self.gateway_url, params=params, timeout=30)
|
||
result = response.json()
|
||
|
||
print(f"📡 支付宝查询响应: {result}")
|
||
|
||
# 解析响应
|
||
query_response = result.get('alipay_trade_query_response', {})
|
||
if query_response.get('code') == '10000':
|
||
trade_status = query_response.get('trade_status')
|
||
return {
|
||
'success': True,
|
||
'trade_status': trade_status,
|
||
'trade_no': query_response.get('trade_no'),
|
||
'out_trade_no': query_response.get('out_trade_no'),
|
||
'total_amount': query_response.get('total_amount'),
|
||
'buyer_logon_id': query_response.get('buyer_logon_id'),
|
||
'send_pay_date': query_response.get('send_pay_date'),
|
||
# 状态映射
|
||
'is_paid': trade_status == 'TRADE_SUCCESS' or trade_status == 'TRADE_FINISHED',
|
||
'is_closed': trade_status == 'TRADE_CLOSED',
|
||
'is_waiting': trade_status == 'WAIT_BUYER_PAY',
|
||
}
|
||
else:
|
||
error_msg = query_response.get('sub_msg') or query_response.get('msg', '查询失败')
|
||
return {
|
||
'success': False,
|
||
'error': error_msg,
|
||
'code': query_response.get('code'),
|
||
'sub_code': query_response.get('sub_code')
|
||
}
|
||
|
||
except requests.RequestException as e:
|
||
print(f"❌ 支付宝API请求失败: {e}")
|
||
return {'success': False, 'error': f'网络请求失败: {e}'}
|
||
except Exception as e:
|
||
print(f"❌ 查询订单异常: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def verify_callback(self, params):
|
||
"""
|
||
验证支付宝异步回调
|
||
|
||
Args:
|
||
params: 回调参数字典
|
||
|
||
Returns:
|
||
dict: 验证结果
|
||
"""
|
||
try:
|
||
# 获取签名
|
||
sign = params.pop('sign', None)
|
||
sign_type = params.pop('sign_type', 'RSA2')
|
||
|
||
if not sign:
|
||
return {'success': False, 'error': '缺少签名参数'}
|
||
|
||
# 构建待验签字符串
|
||
sign_content = self._get_sign_content(params)
|
||
|
||
# 验证签名
|
||
if self._verify(sign_content, sign):
|
||
print(f"✅ 支付宝回调签名验证通过")
|
||
return {
|
||
'success': True,
|
||
'data': params
|
||
}
|
||
else:
|
||
print(f"❌ 支付宝回调签名验证失败")
|
||
return {
|
||
'success': False,
|
||
'error': '签名验证失败'
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"❌ 验证回调异常: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def verify_return(self, params):
|
||
"""
|
||
验证支付宝同步返回(用户支付后跳转回来)
|
||
|
||
Args:
|
||
params: 同步返回参数字典
|
||
|
||
Returns:
|
||
dict: 验证结果
|
||
"""
|
||
# 同步返回的验签逻辑与异步回调相同
|
||
return self.verify_callback(params)
|
||
|
||
|
||
# 工厂函数
|
||
def create_alipay_instance():
|
||
"""创建支付宝支付实例"""
|
||
try:
|
||
from alipay_config import ALIPAY_CONFIG, get_app_private_key, get_alipay_public_key, validate_config
|
||
|
||
# 验证配置
|
||
is_valid, issues = validate_config()
|
||
if not is_valid:
|
||
raise ValueError(f"支付宝配置错误: {'; '.join(issues)}")
|
||
|
||
# 获取密钥
|
||
app_private_key = get_app_private_key()
|
||
alipay_public_key = get_alipay_public_key()
|
||
|
||
if not app_private_key or not alipay_public_key:
|
||
raise ValueError("密钥加载失败")
|
||
|
||
return AlipayPay(
|
||
app_id=ALIPAY_CONFIG['app_id'],
|
||
app_private_key=app_private_key,
|
||
alipay_public_key=alipay_public_key,
|
||
notify_url=ALIPAY_CONFIG['notify_url'],
|
||
return_url=ALIPAY_CONFIG['return_url'],
|
||
gateway_url=ALIPAY_CONFIG['gateway_url'],
|
||
sign_type=ALIPAY_CONFIG['sign_type'],
|
||
charset=ALIPAY_CONFIG['charset']
|
||
)
|
||
|
||
except ImportError as e:
|
||
raise ValueError(f"无法导入支付宝配置: {e}")
|
||
except Exception as e:
|
||
raise ValueError(f"创建支付宝实例失败: {e}")
|
||
|
||
|
||
def check_alipay_ready():
|
||
"""检查支付宝支付是否就绪"""
|
||
try:
|
||
instance = create_alipay_instance()
|
||
return True, "支付宝支付配置正确"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
"""测试代码"""
|
||
import sys
|
||
|
||
print("=" * 60)
|
||
print("支付宝支付测试")
|
||
print("=" * 60)
|
||
|
||
try:
|
||
# 检查配置
|
||
is_ready, message = check_alipay_ready()
|
||
print(f"\n配置状态: {'✅ 就绪' if is_ready else '❌ 未就绪'}")
|
||
print(f"详情: {message}")
|
||
|
||
if is_ready:
|
||
# 创建实例
|
||
alipay = create_alipay_instance()
|
||
|
||
# 测试创建订单
|
||
test_order_no = f"TEST{int(time.time())}"
|
||
result = alipay.create_page_pay_url(
|
||
out_trade_no=test_order_no,
|
||
total_amount='0.01',
|
||
subject='测试商品',
|
||
body='这是一个测试订单'
|
||
)
|
||
|
||
print(f"\n创建订单结果:")
|
||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
if result['success']:
|
||
print(f"\n🔗 支付链接(复制到浏览器打开):")
|
||
print(result['pay_url'][:200] + '...')
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
print("\n" + "=" * 60)
|