442 lines
16 KiB
Python
442 lines
16 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
微信支付Native支付集成模块
|
||
使用您提供的微信支付证书文件进行支付处理
|
||
"""
|
||
|
||
import time
|
||
import json
|
||
import hashlib
|
||
import uuid
|
||
import requests
|
||
from datetime import datetime, timedelta
|
||
from cryptography.hazmat.primitives import serialization
|
||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.primitives.asymmetric import padding
|
||
import base64
|
||
import os
|
||
|
||
class WechatPay:
|
||
"""微信支付Native支付类"""
|
||
|
||
def __init__(self, app_id, mch_id, cert_path, key_path, notify_url, api_key=None):
|
||
"""
|
||
初始化微信支付配置
|
||
|
||
Args:
|
||
app_id: 微信公众账号或开放平台APP的唯一标识(可选,Native支付可不需要)
|
||
mch_id: 微信支付分配的商户号
|
||
cert_path: apiclient_cert.pem 文件路径
|
||
key_path: apiclient_key.pem 文件路径
|
||
notify_url: 支付结果通知回调地址
|
||
api_key: API密钥(可选,默认使用商户号)
|
||
"""
|
||
self.app_id = app_id # Native支付可以为None
|
||
self.mch_id = mch_id
|
||
self.cert_path = cert_path
|
||
self.key_path = key_path
|
||
self.notify_url = notify_url
|
||
# 处理API密钥
|
||
if api_key:
|
||
# 清理API密钥(去除可能的空格、换行等)
|
||
self.api_key = str(api_key).strip()
|
||
|
||
# 验证API密钥长度
|
||
if len(self.api_key) == 32:
|
||
print(f"✅ API密钥长度正确: 32位")
|
||
elif len(self.api_key) > 32:
|
||
print(f"⚠️ API密钥长度异常: {len(self.api_key)} 位,尝试截取前32位")
|
||
self.api_key = self.api_key[:32]
|
||
print(f"🔧 截取后的API密钥长度: {len(self.api_key)} 位")
|
||
else:
|
||
print(f"❌ API密钥长度不足: {len(self.api_key)} 位(需要32位)")
|
||
else:
|
||
self.api_key = mch_id # 如果没有提供API密钥,使用商户号
|
||
print(f"⚠️ 使用商户号作为API密钥: {self.api_key}")
|
||
|
||
# 微信支付API地址
|
||
self.api_url = "https://api.mch.weixin.qq.com"
|
||
self.unifiedorder_url = f"{self.api_url}/pay/unifiedorder"
|
||
self.orderquery_url = f"{self.api_url}/pay/orderquery"
|
||
|
||
# 加载证书
|
||
self._load_certificates()
|
||
|
||
def _load_certificates(self):
|
||
"""加载微信支付证书文件"""
|
||
try:
|
||
# 检查证书文件是否存在
|
||
if not os.path.exists(self.cert_path):
|
||
raise FileNotFoundError(f"证书文件不存在: {self.cert_path}")
|
||
if not os.path.exists(self.key_path):
|
||
raise FileNotFoundError(f"密钥文件不存在: {self.key_path}")
|
||
|
||
print(f"微信支付证书加载成功")
|
||
print(f"证书文件: {self.cert_path}")
|
||
print(f"密钥文件: {self.key_path}")
|
||
|
||
except Exception as e:
|
||
print(f"加载微信支付证书失败: {e}")
|
||
raise
|
||
|
||
def _generate_nonce_str(self, length=32):
|
||
"""生成随机字符串"""
|
||
return uuid.uuid4().hex[:length]
|
||
|
||
def _generate_sign(self, params):
|
||
"""
|
||
生成微信支付签名
|
||
|
||
Args:
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
签名字符串
|
||
"""
|
||
# 过滤空值并排序
|
||
filtered_params = {k: v for k, v in params.items() if v != '' and k != 'sign'}
|
||
sorted_params = sorted(filtered_params.items())
|
||
|
||
# 拼接字符串
|
||
string_a = "&".join([f"{k}={v}" for k, v in sorted_params])
|
||
|
||
# 使用配置的API密钥
|
||
string_sign_temp = f"{string_a}&key={self.api_key}"
|
||
|
||
# 调试输出
|
||
print(f"🔐 签名参数: {sorted_params}")
|
||
print(f"🔐 签名字符串: {string_sign_temp}")
|
||
|
||
# MD5签名
|
||
sign = hashlib.md5(string_sign_temp.encode('utf-8')).hexdigest().upper()
|
||
print(f"🔐 生成签名: {sign}")
|
||
return sign
|
||
|
||
def _dict_to_xml(self, data):
|
||
"""将字典转换为XML格式"""
|
||
xml = ["<xml>"]
|
||
for key, value in data.items():
|
||
xml.append(f"<{key}><![CDATA[{value}]]></{key}>")
|
||
xml.append("</xml>")
|
||
return "".join(xml)
|
||
|
||
def _xml_to_dict(self, xml_data):
|
||
"""将XML转换为字典格式"""
|
||
import xml.etree.ElementTree as ET
|
||
try:
|
||
root = ET.fromstring(xml_data)
|
||
result = {}
|
||
for child in root:
|
||
result[child.tag] = child.text
|
||
return result
|
||
except ET.ParseError as e:
|
||
print(f"XML解析错误: {e}")
|
||
return {}
|
||
|
||
def create_native_order(self, order_no, total_fee, body, product_id=None):
|
||
"""
|
||
创建Native支付订单
|
||
|
||
Args:
|
||
order_no: 商户订单号
|
||
total_fee: 总金额(分)
|
||
body: 商品描述
|
||
product_id: 商品ID(可选)
|
||
|
||
Returns:
|
||
dict: 包含code_url的响应数据
|
||
"""
|
||
try:
|
||
# 构建请求参数
|
||
total_fee_fen = int(total_fee * 100) # 转换为分
|
||
params = {
|
||
'mch_id': str(self.mch_id),
|
||
'nonce_str': self._generate_nonce_str(),
|
||
'body': str(body),
|
||
'out_trade_no': str(order_no),
|
||
'total_fee': str(total_fee_fen),
|
||
'spbill_create_ip': '127.0.0.1',
|
||
'notify_url': str(self.notify_url),
|
||
'trade_type': 'NATIVE'
|
||
}
|
||
|
||
print(f"💰 订单金额: {total_fee}元 = {total_fee_fen}分")
|
||
|
||
# AppID是必填字段,即使是Native支付
|
||
if self.app_id and not self.app_id.startswith('wx你的') and not self.app_id.startswith('your_'):
|
||
params['appid'] = str(self.app_id)
|
||
print(f"🔧 使用AppID: {self.app_id}")
|
||
else:
|
||
print("❌ 错误:微信支付要求AppID必填,即使是Native支付")
|
||
return {
|
||
'success': False,
|
||
'error': 'AppID是必填字段,请在配置中提供真实的微信AppID'
|
||
}
|
||
|
||
if product_id:
|
||
params['product_id'] = product_id
|
||
|
||
# 生成签名
|
||
params['sign'] = self._generate_sign(params)
|
||
|
||
# 转换为XML
|
||
xml_data = self._dict_to_xml(params)
|
||
|
||
print(f"创建微信支付订单请求: {order_no}")
|
||
|
||
# 发送请求
|
||
response = requests.post(
|
||
self.unifiedorder_url,
|
||
data=xml_data.encode('utf-8'),
|
||
headers={'Content-Type': 'text/xml'},
|
||
cert=(self.cert_path, self.key_path),
|
||
verify=True,
|
||
timeout=30
|
||
)
|
||
|
||
# 解析响应
|
||
print(f"📡 微信API原始响应: {response.text}")
|
||
result = self._xml_to_dict(response.text)
|
||
print(f"📦 解析后的响应: {result}")
|
||
|
||
if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS':
|
||
print(f"微信支付订单创建成功: {result.get('code_url')}")
|
||
return {
|
||
'success': True,
|
||
'code_url': result.get('code_url'),
|
||
'prepay_id': result.get('prepay_id'),
|
||
'order_no': order_no
|
||
}
|
||
else:
|
||
error_msg = result.get('err_code_des') or result.get('return_msg', '未知错误')
|
||
error_code = result.get('err_code', '')
|
||
|
||
# 处理编码问题
|
||
try:
|
||
if isinstance(error_msg, str):
|
||
# 尝试修复编码
|
||
error_msg = error_msg.encode('latin1').decode('utf-8')
|
||
except:
|
||
pass
|
||
|
||
print(f"微信支付订单创建失败:")
|
||
print(f" 返回码: {result.get('return_code')}")
|
||
print(f" 结果码: {result.get('result_code')}")
|
||
print(f" 错误码: {error_code}")
|
||
print(f" 错误信息: {error_msg}")
|
||
|
||
return {
|
||
'success': False,
|
||
'error': f"{error_code}: {error_msg}" if error_code else error_msg
|
||
}
|
||
|
||
except requests.RequestException as e:
|
||
print(f"微信支付API请求失败: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': f'网络请求失败: {e}'
|
||
}
|
||
except Exception as e:
|
||
print(f"创建微信支付订单异常: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': f'系统错误: {e}'
|
||
}
|
||
|
||
def query_order(self, order_no=None, transaction_id=None):
|
||
"""
|
||
查询订单状态
|
||
|
||
Args:
|
||
order_no: 商户订单号
|
||
transaction_id: 微信支付交易号
|
||
|
||
Returns:
|
||
dict: 订单状态信息
|
||
"""
|
||
try:
|
||
if not order_no and not transaction_id:
|
||
return {'success': False, 'error': '订单号和交易号不能同时为空'}
|
||
|
||
params = {
|
||
'mch_id': self.mch_id,
|
||
'nonce_str': self._generate_nonce_str(),
|
||
}
|
||
|
||
# AppID是必填字段
|
||
if self.app_id and not self.app_id.startswith('wx你的') and not self.app_id.startswith('your_'):
|
||
params['appid'] = str(self.app_id)
|
||
|
||
if order_no:
|
||
params['out_trade_no'] = order_no
|
||
if transaction_id:
|
||
params['transaction_id'] = transaction_id
|
||
|
||
# 生成签名
|
||
params['sign'] = self._generate_sign(params)
|
||
|
||
# 转换为XML
|
||
xml_data = self._dict_to_xml(params)
|
||
|
||
# 发送请求
|
||
response = requests.post(
|
||
self.orderquery_url,
|
||
data=xml_data.encode('utf-8'),
|
||
headers={'Content-Type': 'text/xml'},
|
||
cert=(self.cert_path, self.key_path),
|
||
verify=True,
|
||
timeout=30
|
||
)
|
||
|
||
# 解析响应
|
||
result = self._xml_to_dict(response.text)
|
||
|
||
if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS':
|
||
return {
|
||
'success': True,
|
||
'trade_state': result.get('trade_state'),
|
||
'transaction_id': result.get('transaction_id'),
|
||
'out_trade_no': result.get('out_trade_no'),
|
||
'total_fee': result.get('total_fee'),
|
||
'time_end': result.get('time_end')
|
||
}
|
||
else:
|
||
error_msg = result.get('err_code_des') or result.get('return_msg', '未知错误')
|
||
return {
|
||
'success': False,
|
||
'error': error_msg
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"查询订单状态异常: {e}")
|
||
return {
|
||
'success': False,
|
||
'error': f'查询失败: {e}'
|
||
}
|
||
|
||
def verify_callback(self, callback_data):
|
||
"""
|
||
验证微信支付回调数据
|
||
|
||
Args:
|
||
callback_data: 回调XML数据
|
||
|
||
Returns:
|
||
dict: 验证结果和解析后的数据
|
||
"""
|
||
try:
|
||
# 解析XML数据
|
||
data = self._xml_to_dict(callback_data)
|
||
|
||
if not data:
|
||
return {'success': False, 'error': '数据解析失败'}
|
||
|
||
# 验证签名
|
||
received_sign = data.pop('sign', '')
|
||
calculated_sign = self._generate_sign(data)
|
||
|
||
if received_sign != calculated_sign:
|
||
return {'success': False, 'error': '签名验证失败'}
|
||
|
||
# 验证返回码
|
||
if data.get('return_code') != 'SUCCESS':
|
||
return {'success': False, 'error': data.get('return_msg', '支付失败')}
|
||
|
||
return {
|
||
'success': True,
|
||
'data': data
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"验证回调数据异常: {e}")
|
||
return {'success': False, 'error': f'验证失败: {e}'}
|
||
|
||
|
||
# 微信支付实例配置
|
||
# 注意:这些配置需要根据您的实际微信支付商户信息进行修改
|
||
try:
|
||
# 尝试导入真实配置
|
||
from wechat_pay_config import WECHAT_PAY_CONFIG
|
||
print("✅ 使用真实微信支付配置")
|
||
except ImportError:
|
||
# 如果没有真实配置,使用默认配置
|
||
print("⚠️ 使用默认微信支付配置(演示模式)")
|
||
WECHAT_PAY_CONFIG = {
|
||
'app_id': 'your_app_id', # 替换为您的微信AppID
|
||
'mch_id': 'your_mch_id', # 替换为您的商户号
|
||
'api_key': 'your_api_key', # 替换为您的API密钥
|
||
'cert_path': './certs/apiclient_cert.pem', # 证书文件路径
|
||
'key_path': './certs/apiclient_key.pem', # 密钥文件路径
|
||
'notify_url': 'https://api.valuefrontier.cn/api/payment/wechat/callback' # 支付结果通知回调
|
||
}
|
||
|
||
|
||
def create_wechat_pay_instance():
|
||
"""创建微信支付实例"""
|
||
# 检查配置完整性(AppID可选)
|
||
required_keys = ['mch_id', 'cert_path', 'key_path', 'notify_url']
|
||
optional_keys = ['app_id'] # AppID对于Native支付是可选的
|
||
|
||
for key in required_keys:
|
||
if not WECHAT_PAY_CONFIG.get(key):
|
||
raise ValueError(f"微信支付配置缺少必需项: {key}")
|
||
if WECHAT_PAY_CONFIG[key].startswith('your_') or WECHAT_PAY_CONFIG[key].startswith('你的'):
|
||
raise ValueError(f"微信支付配置 {key} 还是默认值,请配置真实信息")
|
||
|
||
# 检查可选配置
|
||
for key in optional_keys:
|
||
if WECHAT_PAY_CONFIG.get(key):
|
||
if WECHAT_PAY_CONFIG[key].startswith('your_') or WECHAT_PAY_CONFIG[key].startswith('你的'):
|
||
print(f"⚠️ {key} 是示例值,将使用纯商户支付模式")
|
||
|
||
# 检查证书文件
|
||
for path_key in ['cert_path', 'key_path']:
|
||
if not os.path.exists(WECHAT_PAY_CONFIG[path_key]):
|
||
raise FileNotFoundError(f"证书文件不存在: {WECHAT_PAY_CONFIG[path_key]}")
|
||
|
||
app_id = WECHAT_PAY_CONFIG.get('app_id')
|
||
if app_id and not app_id.startswith('wx你的') and not app_id.startswith('your_'):
|
||
print(f"🔧 创建微信支付实例: AppID={app_id}, 商户号={WECHAT_PAY_CONFIG['mch_id']}")
|
||
else:
|
||
app_id = None
|
||
print(f"🔧 创建微信支付实例: 纯商户模式, 商户号={WECHAT_PAY_CONFIG['mch_id']}")
|
||
|
||
return WechatPay(
|
||
app_id=app_id,
|
||
mch_id=WECHAT_PAY_CONFIG['mch_id'],
|
||
cert_path=WECHAT_PAY_CONFIG['cert_path'],
|
||
key_path=WECHAT_PAY_CONFIG['key_path'],
|
||
notify_url=WECHAT_PAY_CONFIG['notify_url'],
|
||
api_key=WECHAT_PAY_CONFIG.get('api_key')
|
||
)
|
||
|
||
def check_wechat_pay_ready():
|
||
"""检查微信支付是否就绪"""
|
||
try:
|
||
instance = create_wechat_pay_instance()
|
||
return True, "微信支付配置正确"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
"""测试代码"""
|
||
try:
|
||
# 创建微信支付实例
|
||
wechat_pay = create_wechat_pay_instance()
|
||
|
||
# 测试创建订单
|
||
test_order_no = f"TEST{int(time.time())}"
|
||
result = wechat_pay.create_native_order(
|
||
order_no=test_order_no,
|
||
total_fee=0.01, # 0.01元
|
||
body="测试商品"
|
||
)
|
||
|
||
print("测试结果:", json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
except Exception as e:
|
||
print(f"测试失败: {e}")
|