#!/usr/bin/env python # -*- coding: utf-8 -*- """ 微信支付Native支付集成模块 使用您提供的微信支付证书文件进行支付处理 """ import time import json import hashlib import uuid import requests from datetime import datetime, timedelta from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding import base64 import os class WechatPay: """微信支付Native支付类""" def __init__(self, app_id, mch_id, cert_path, key_path, notify_url, api_key=None): """ 初始化微信支付配置 Args: app_id: 微信公众账号或开放平台APP的唯一标识(可选,Native支付可不需要) mch_id: 微信支付分配的商户号 cert_path: apiclient_cert.pem 文件路径 key_path: apiclient_key.pem 文件路径 notify_url: 支付结果通知回调地址 api_key: API密钥(可选,默认使用商户号) """ self.app_id = app_id # Native支付可以为None self.mch_id = mch_id self.cert_path = cert_path self.key_path = key_path self.notify_url = notify_url # 处理API密钥 if api_key: # 清理API密钥(去除可能的空格、换行等) self.api_key = str(api_key).strip() # 验证API密钥长度 if len(self.api_key) == 32: print(f"✅ API密钥长度正确: 32位") elif len(self.api_key) > 32: print(f"⚠️ API密钥长度异常: {len(self.api_key)} 位,尝试截取前32位") self.api_key = self.api_key[:32] print(f"🔧 截取后的API密钥长度: {len(self.api_key)} 位") else: print(f"❌ API密钥长度不足: {len(self.api_key)} 位(需要32位)") else: self.api_key = mch_id # 如果没有提供API密钥,使用商户号 print(f"⚠️ 使用商户号作为API密钥: {self.api_key}") # 微信支付API地址 self.api_url = "https://api.mch.weixin.qq.com" self.unifiedorder_url = f"{self.api_url}/pay/unifiedorder" self.orderquery_url = f"{self.api_url}/pay/orderquery" # 加载证书 self._load_certificates() def _load_certificates(self): """加载微信支付证书文件""" try: # 检查证书文件是否存在 if not os.path.exists(self.cert_path): raise FileNotFoundError(f"证书文件不存在: {self.cert_path}") if not os.path.exists(self.key_path): raise FileNotFoundError(f"密钥文件不存在: {self.key_path}") print(f"微信支付证书加载成功") print(f"证书文件: {self.cert_path}") print(f"密钥文件: {self.key_path}") except Exception as e: print(f"加载微信支付证书失败: {e}") raise def _generate_nonce_str(self, length=32): """生成随机字符串""" return uuid.uuid4().hex[:length] def _generate_sign(self, params): """ 生成微信支付签名 Args: params: 参数字典 Returns: 签名字符串 """ # 过滤空值并排序 filtered_params = {k: v for k, v in params.items() if v != '' and k != 'sign'} sorted_params = sorted(filtered_params.items()) # 拼接字符串 string_a = "&".join([f"{k}={v}" for k, v in sorted_params]) # 使用配置的API密钥 string_sign_temp = f"{string_a}&key={self.api_key}" # 调试输出 print(f"🔐 签名参数: {sorted_params}") print(f"🔐 签名字符串: {string_sign_temp}") # MD5签名 sign = hashlib.md5(string_sign_temp.encode('utf-8')).hexdigest().upper() print(f"🔐 生成签名: {sign}") return sign def _dict_to_xml(self, data): """将字典转换为XML格式""" xml = [""] for key, value in data.items(): xml.append(f"<{key}>") xml.append("") return "".join(xml) def _xml_to_dict(self, xml_data): """将XML转换为字典格式""" import xml.etree.ElementTree as ET try: root = ET.fromstring(xml_data) result = {} for child in root: result[child.tag] = child.text return result except ET.ParseError as e: print(f"XML解析错误: {e}") return {} def create_native_order(self, order_no, total_fee, body, product_id=None): """ 创建Native支付订单 Args: order_no: 商户订单号 total_fee: 总金额(分) body: 商品描述 product_id: 商品ID(可选) Returns: dict: 包含code_url的响应数据 """ try: # 构建请求参数 total_fee_fen = int(total_fee * 100) # 转换为分 params = { 'mch_id': str(self.mch_id), 'nonce_str': self._generate_nonce_str(), 'body': str(body), 'out_trade_no': str(order_no), 'total_fee': str(total_fee_fen), 'spbill_create_ip': '127.0.0.1', 'notify_url': str(self.notify_url), 'trade_type': 'NATIVE' } print(f"💰 订单金额: {total_fee}元 = {total_fee_fen}分") # AppID是必填字段,即使是Native支付 if self.app_id and not self.app_id.startswith('wx你的') and not self.app_id.startswith('your_'): params['appid'] = str(self.app_id) print(f"🔧 使用AppID: {self.app_id}") else: print("❌ 错误:微信支付要求AppID必填,即使是Native支付") return { 'success': False, 'error': 'AppID是必填字段,请在配置中提供真实的微信AppID' } if product_id: params['product_id'] = product_id # 生成签名 params['sign'] = self._generate_sign(params) # 转换为XML xml_data = self._dict_to_xml(params) print(f"创建微信支付订单请求: {order_no}") # 发送请求 response = requests.post( self.unifiedorder_url, data=xml_data.encode('utf-8'), headers={'Content-Type': 'text/xml'}, cert=(self.cert_path, self.key_path), verify=True, timeout=30 ) # 解析响应 print(f"📡 微信API原始响应: {response.text}") result = self._xml_to_dict(response.text) print(f"📦 解析后的响应: {result}") if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS': print(f"微信支付订单创建成功: {result.get('code_url')}") return { 'success': True, 'code_url': result.get('code_url'), 'prepay_id': result.get('prepay_id'), 'order_no': order_no } else: error_msg = result.get('err_code_des') or result.get('return_msg', '未知错误') error_code = result.get('err_code', '') # 处理编码问题 try: if isinstance(error_msg, str): # 尝试修复编码 error_msg = error_msg.encode('latin1').decode('utf-8') except: pass print(f"微信支付订单创建失败:") print(f" 返回码: {result.get('return_code')}") print(f" 结果码: {result.get('result_code')}") print(f" 错误码: {error_code}") print(f" 错误信息: {error_msg}") return { 'success': False, 'error': f"{error_code}: {error_msg}" if error_code else error_msg } except requests.RequestException as e: print(f"微信支付API请求失败: {e}") return { 'success': False, 'error': f'网络请求失败: {e}' } except Exception as e: print(f"创建微信支付订单异常: {e}") return { 'success': False, 'error': f'系统错误: {e}' } def query_order(self, order_no=None, transaction_id=None): """ 查询订单状态 Args: order_no: 商户订单号 transaction_id: 微信支付交易号 Returns: dict: 订单状态信息 """ try: if not order_no and not transaction_id: return {'success': False, 'error': '订单号和交易号不能同时为空'} params = { 'mch_id': self.mch_id, 'nonce_str': self._generate_nonce_str(), } # AppID是必填字段 if self.app_id and not self.app_id.startswith('wx你的') and not self.app_id.startswith('your_'): params['appid'] = str(self.app_id) if order_no: params['out_trade_no'] = order_no if transaction_id: params['transaction_id'] = transaction_id # 生成签名 params['sign'] = self._generate_sign(params) # 转换为XML xml_data = self._dict_to_xml(params) # 发送请求 response = requests.post( self.orderquery_url, data=xml_data.encode('utf-8'), headers={'Content-Type': 'text/xml'}, cert=(self.cert_path, self.key_path), verify=True, timeout=30 ) # 解析响应 result = self._xml_to_dict(response.text) if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS': return { 'success': True, 'trade_state': result.get('trade_state'), 'transaction_id': result.get('transaction_id'), 'out_trade_no': result.get('out_trade_no'), 'total_fee': result.get('total_fee'), 'time_end': result.get('time_end') } else: error_msg = result.get('err_code_des') or result.get('return_msg', '未知错误') return { 'success': False, 'error': error_msg } except Exception as e: print(f"查询订单状态异常: {e}") return { 'success': False, 'error': f'查询失败: {e}' } def verify_callback(self, callback_data): """ 验证微信支付回调数据 Args: callback_data: 回调XML数据 Returns: dict: 验证结果和解析后的数据 """ try: # 解析XML数据 data = self._xml_to_dict(callback_data) if not data: return {'success': False, 'error': '数据解析失败'} # 验证签名 received_sign = data.pop('sign', '') calculated_sign = self._generate_sign(data) if received_sign != calculated_sign: return {'success': False, 'error': '签名验证失败'} # 验证返回码 if data.get('return_code') != 'SUCCESS': return {'success': False, 'error': data.get('return_msg', '支付失败')} return { 'success': True, 'data': data } except Exception as e: print(f"验证回调数据异常: {e}") return {'success': False, 'error': f'验证失败: {e}'} # 微信支付实例配置 # 注意:这些配置需要根据您的实际微信支付商户信息进行修改 try: # 尝试导入真实配置 from wechat_pay_config import WECHAT_PAY_CONFIG print("✅ 使用真实微信支付配置") except ImportError: # 如果没有真实配置,使用默认配置 print("⚠️ 使用默认微信支付配置(演示模式)") WECHAT_PAY_CONFIG = { 'app_id': 'your_app_id', # 替换为您的微信AppID 'mch_id': 'your_mch_id', # 替换为您的商户号 'api_key': 'your_api_key', # 替换为您的API密钥 'cert_path': './certs/apiclient_cert.pem', # 证书文件路径 'key_path': './certs/apiclient_key.pem', # 密钥文件路径 'notify_url': 'https://valuefrontier.cn/api/payment/wechat/callback' # 支付结果通知回调 } def create_wechat_pay_instance(): """创建微信支付实例""" # 检查配置完整性(AppID可选) required_keys = ['mch_id', 'cert_path', 'key_path', 'notify_url'] optional_keys = ['app_id'] # AppID对于Native支付是可选的 for key in required_keys: if not WECHAT_PAY_CONFIG.get(key): raise ValueError(f"微信支付配置缺少必需项: {key}") if WECHAT_PAY_CONFIG[key].startswith('your_') or WECHAT_PAY_CONFIG[key].startswith('你的'): raise ValueError(f"微信支付配置 {key} 还是默认值,请配置真实信息") # 检查可选配置 for key in optional_keys: if WECHAT_PAY_CONFIG.get(key): if WECHAT_PAY_CONFIG[key].startswith('your_') or WECHAT_PAY_CONFIG[key].startswith('你的'): print(f"⚠️ {key} 是示例值,将使用纯商户支付模式") # 检查证书文件 for path_key in ['cert_path', 'key_path']: if not os.path.exists(WECHAT_PAY_CONFIG[path_key]): raise FileNotFoundError(f"证书文件不存在: {WECHAT_PAY_CONFIG[path_key]}") app_id = WECHAT_PAY_CONFIG.get('app_id') if app_id and not app_id.startswith('wx你的') and not app_id.startswith('your_'): print(f"🔧 创建微信支付实例: AppID={app_id}, 商户号={WECHAT_PAY_CONFIG['mch_id']}") else: app_id = None print(f"🔧 创建微信支付实例: 纯商户模式, 商户号={WECHAT_PAY_CONFIG['mch_id']}") return WechatPay( app_id=app_id, mch_id=WECHAT_PAY_CONFIG['mch_id'], cert_path=WECHAT_PAY_CONFIG['cert_path'], key_path=WECHAT_PAY_CONFIG['key_path'], notify_url=WECHAT_PAY_CONFIG['notify_url'], api_key=WECHAT_PAY_CONFIG.get('api_key') ) def check_wechat_pay_ready(): """检查微信支付是否就绪""" try: instance = create_wechat_pay_instance() return True, "微信支付配置正确" except Exception as e: return False, str(e) if __name__ == '__main__': """测试代码""" try: # 创建微信支付实例 wechat_pay = create_wechat_pay_instance() # 测试创建订单 test_order_no = f"TEST{int(time.time())}" result = wechat_pay.create_native_order( order_no=test_order_no, total_fee=0.01, # 0.01元 body="测试商品" ) print("测试结果:", json.dumps(result, indent=2, ensure_ascii=False)) except Exception as e: print(f"测试失败: {e}")