# -*- coding: utf-8 -*- """ 新版订阅支付系统核心逻辑 版本: v2.0.0 日期: 2025-11-19 核心改进: 1. 续费价格与新购价格完全一致 2. 不再计算剩余价值折算 3. 逻辑简化,易于维护 """ from datetime import datetime, timedelta from decimal import Decimal import json import random # ============================================ # 辅助函数 # ============================================ def beijing_now(): """获取北京时间""" from datetime import timezone, timedelta utc_now = datetime.now(timezone.utc) beijing_time = utc_now.astimezone(timezone(timedelta(hours=8))) return beijing_time.replace(tzinfo=None) def generate_order_no(user_id): """生成订单号""" timestamp = int(beijing_now().timestamp() * 1000000) random_suffix = random.randint(1000, 9999) return f"{timestamp}{user_id:04d}{random_suffix}" def generate_subscription_id(): """生成订阅ID""" timestamp = int(beijing_now().timestamp() * 1000) random_suffix = random.randint(10000, 99999) return f"SUB_{timestamp}_{random_suffix}" # ============================================ # 核心业务逻辑 # ============================================ def calculate_subscription_price(plan_code, billing_cycle, promo_code=None, user_id=None, db_session=None): """ 计算订阅价格(新购和续费价格完全一致) Args: plan_code: 套餐代码 (pro/max) billing_cycle: 计费周期 (monthly/quarterly/semiannual/yearly) promo_code: 优惠码(可选) user_id: 用户ID(可选,用于优惠码验证) db_session: 数据库会话(可选) Returns: dict: { 'success': True/False, 'plan_code': 'pro', 'plan_name': 'Pro 专业版', 'billing_cycle': 'yearly', 'original_price': 2699.00, 'discount_amount': 0, 'final_amount': 2699.00, 'promo_code': None, 'promo_error': None, 'error': None # 如果有错误 } """ from models import SubscriptionPlan, PromoCode # 需要在实际使用时导入 try: # 1. 查询套餐 plan = SubscriptionPlan.query.filter_by(plan_code=plan_code, is_active=True).first() if not plan: return { 'success': False, 'error': f'套餐 {plan_code} 不存在或已下架' } # 2. 获取对应周期的价格 price_field_map = { 'monthly': 'price_monthly', 'quarterly': 'price_quarterly', 'semiannual': 'price_semiannual', 'yearly': 'price_yearly' } price_field = price_field_map.get(billing_cycle) if not price_field: return { 'success': False, 'error': f'不支持的计费周期: {billing_cycle}' } original_price = getattr(plan, price_field, None) if original_price is None or original_price <= 0: return { 'success': False, 'error': f'{billing_cycle} 周期价格未配置' } original_price = float(original_price) # 3. 构建基础结果 result = { 'success': True, 'plan_code': plan_code, 'plan_name': plan.plan_name, 'billing_cycle': billing_cycle, 'original_price': original_price, 'discount_amount': 0.0, 'final_amount': original_price, 'promo_code': None, 'promo_error': None, 'error': None } # 4. 应用优惠码(如果有) if promo_code and promo_code.strip(): promo_code = promo_code.strip().upper() # 验证优惠码 promo, error = validate_promo_code( promo_code, plan_code, billing_cycle, original_price, user_id, db_session ) if promo: # 计算折扣 discount = calculate_discount(promo, original_price) result['discount_amount'] = float(discount) result['final_amount'] = float(original_price - discount) result['promo_code'] = promo.code elif error: result['promo_error'] = error return result except Exception as e: return { 'success': False, 'error': f'价格计算失败: {str(e)}' } def get_current_subscription(user_id, db_session=None): """ 获取用户当前生效的订阅 Args: user_id: 用户ID db_session: 数据库会话(可选) Returns: UserSubscription 对象 或 None """ from models import UserSubscription try: subscription = UserSubscription.query.filter_by( user_id=user_id, is_current=True ).first() # 检查是否过期 if subscription and subscription.end_date < beijing_now(): subscription.status = 'expired' subscription.is_current = False if db_session: db_session.commit() return None return subscription except Exception as e: print(f"获取当前订阅失败: {e}") return None def determine_subscription_type(user_id, plan_code, billing_cycle): """ 判断订阅类型(新购还是续费) Args: user_id: 用户ID plan_code: 目标套餐代码 billing_cycle: 目标计费周期 Returns: str: 'new' 或 'renew' """ current_sub = get_current_subscription(user_id) # 如果没有订阅或订阅是免费版,则为新购 if not current_sub or current_sub.plan_code == 'free': return 'new' # 如果是付费订阅,则为续费 if current_sub.plan_code in ['pro', 'max']: return 'renew' return 'new' def create_subscription_order(user_id, plan_code, billing_cycle, promo_code=None, db_session=None): """ 创建订阅支付订单 Args: user_id: 用户ID plan_code: 套餐代码 billing_cycle: 计费周期 promo_code: 优惠码(可选) db_session: 数据库会话 Returns: dict: { 'success': True/False, 'order': PaymentOrder 对象, 'error': None } """ from models import PaymentOrder try: # 1. 计算价格 price_result = calculate_subscription_price( plan_code, billing_cycle, promo_code, user_id, db_session ) if not price_result.get('success'): return { 'success': False, 'error': price_result.get('error', '价格计算失败') } # 2. 判断订阅类型 subscription_type = determine_subscription_type(user_id, plan_code, billing_cycle) # 3. 创建支付订单 order = PaymentOrder( order_no=generate_order_no(user_id), user_id=user_id, plan_code=plan_code, billing_cycle=billing_cycle, subscription_type=subscription_type, original_price=Decimal(str(price_result['original_price'])), discount_amount=Decimal(str(price_result['discount_amount'])), final_amount=Decimal(str(price_result['final_amount'])), promo_code=promo_code, status='pending', expired_at=beijing_now() + timedelta(minutes=30) ) if db_session: db_session.add(order) db_session.commit() return { 'success': True, 'order': order, 'subscription_type': subscription_type, 'error': None } except Exception as e: if db_session: db_session.rollback() return { 'success': False, 'error': f'创建订单失败: {str(e)}' } def activate_subscription_after_payment(order_id, db_session=None): """ 支付成功后激活订阅 Args: order_id: 订单ID db_session: 数据库会话 Returns: dict: { 'success': True/False, 'subscription': UserSubscription 对象, 'error': None } """ from models import PaymentOrder, UserSubscription, PromoCodeUsage try: # 1. 查询订单 order = PaymentOrder.query.get(order_id) if not order: return {'success': False, 'error': '订单不存在'} if order.status != 'paid': return {'success': False, 'error': '订单未支付'} # 2. 检查是否已经激活 existing_sub = UserSubscription.query.filter_by( payment_order_id=order.id ).first() if existing_sub: return { 'success': True, 'subscription': existing_sub, 'message': '订阅已激活' } # 3. 计算订阅周期天数 cycle_days_map = { 'monthly': 30, 'quarterly': 90, 'semiannual': 180, 'yearly': 365 } days = cycle_days_map.get(order.billing_cycle, 30) # 4. 获取当前订阅 current_sub = get_current_subscription(order.user_id, db_session) # 5. 计算开始和结束时间 now = beijing_now() if current_sub and current_sub.end_date > now: # 续费:从当前订阅结束时间开始 start_date = current_sub.end_date else: # 新购:从当前时间开始 start_date = now end_date = start_date + timedelta(days=days) # 6. 创建新订阅记录 new_subscription = UserSubscription( user_id=order.user_id, subscription_id=generate_subscription_id(), plan_code=order.plan_code, billing_cycle=order.billing_cycle, start_date=start_date, end_date=end_date, status='active', is_current=True, payment_order_id=order.id, paid_amount=order.final_amount, original_price=order.original_price, discount_amount=order.discount_amount, subscription_type=order.subscription_type, previous_subscription_id=current_sub.subscription_id if current_sub else None, auto_renew=False ) # 7. 将旧订阅标记为非当前 if current_sub: current_sub.is_current = False if db_session: db_session.add(new_subscription) # 8. 记录优惠码使用 if order.promo_code_id: usage = PromoCodeUsage( promo_code_id=order.promo_code_id, user_id=order.user_id, order_id=order.id, discount_amount=order.discount_amount ) db_session.add(usage) # 更新优惠码使用次数 from models import PromoCode promo = PromoCode.query.get(order.promo_code_id) if promo: promo.current_uses += 1 db_session.commit() return { 'success': True, 'subscription': new_subscription, 'error': None } except Exception as e: if db_session: db_session.rollback() return { 'success': False, 'error': f'激活订阅失败: {str(e)}' } def get_subscription_button_text(user_id, plan_code, billing_cycle): """ 获取订阅按钮文字 Args: user_id: 用户ID plan_code: 套餐代码 (pro/max) billing_cycle: 计费周期 Returns: str: 按钮文字 """ from models import SubscriptionPlan # 获取套餐显示名称 plan = SubscriptionPlan.query.filter_by(plan_code=plan_code).first() plan_name = plan.plan_name if plan else plan_code.upper() # 获取周期显示名称 cycle_names = { 'monthly': '月付', 'quarterly': '季付', 'semiannual': '半年付', 'yearly': '年付' } cycle_name = cycle_names.get(billing_cycle, billing_cycle) # 获取当前订阅 current_sub = get_current_subscription(user_id) # 1. 如果没有订阅或订阅已过期 if not current_sub or current_sub.plan_code == 'free' or current_sub.status != 'active': return f"选择 {plan_name}" # 2. 如果是当前套餐且周期相同 if current_sub.plan_code == plan_code and current_sub.billing_cycle == billing_cycle: return f"续费 {plan_name}" # 3. 如果是当前套餐但周期不同 if current_sub.plan_code == plan_code: return f"切换至{cycle_name}" # 4. 如果是不同套餐 return f"选择 {plan_name}" # ============================================ # 优惠码相关函数 # ============================================ def validate_promo_code(code, plan_code, billing_cycle, amount, user_id=None, db_session=None): """ 验证优惠码 Args: code: 优惠码 plan_code: 套餐代码 billing_cycle: 计费周期 amount: 订单金额 user_id: 用户ID(可选) db_session: 数据库会话(可选) Returns: tuple: (PromoCode对象 或 None, 错误信息 或 None) """ from models import PromoCode, PromoCodeUsage try: # 查询优惠码 promo = PromoCode.query.filter_by(code=code.upper(), is_active=True).first() if not promo: return None, "优惠码不存在或已失效" # 检查有效期 now = beijing_now() if promo.valid_from and now < promo.valid_from: return None, "优惠码尚未生效" if promo.valid_until and now > promo.valid_until: return None, "优惠码已过期" # 检查总使用次数 if promo.max_total_uses and promo.current_uses >= promo.max_total_uses: return None, "优惠码使用次数已达上限" # 检查每用户使用次数 if user_id and promo.max_uses_per_user: user_usage_count = PromoCodeUsage.query.filter_by( promo_code_id=promo.id, user_id=user_id ).count() if user_usage_count >= promo.max_uses_per_user: return None, f"您已使用过此优惠码(限用{promo.max_uses_per_user}次)" # 检查适用套餐 if promo.applicable_plans: try: applicable = json.loads(promo.applicable_plans) if isinstance(applicable, list) and plan_code not in applicable: return None, "该优惠码不适用于此套餐" except: pass # 检查适用周期 if promo.applicable_cycles: try: applicable = json.loads(promo.applicable_cycles) if isinstance(applicable, list) and billing_cycle not in applicable: return None, "该优惠码不适用于此计费周期" except: pass # 检查最低消费 if promo.min_amount and amount < float(promo.min_amount): return None, f"需满 ¥{float(promo.min_amount):.2f} 才可使用此优惠码" return promo, None except Exception as e: return None, f"验证优惠码时出错: {str(e)}" def calculate_discount(promo_code, amount): """ 计算优惠金额 Args: promo_code: PromoCode 对象 amount: 订单金额 Returns: Decimal: 优惠金额 """ try: if promo_code.discount_type == 'percentage': # 百分比折扣 discount = Decimal(str(amount)) * Decimal(str(promo_code.discount_value)) / Decimal('100') elif promo_code.discount_type == 'fixed_amount': # 固定金额折扣 discount = Decimal(str(promo_code.discount_value)) else: discount = Decimal('0') # 确保折扣不超过总金额 discount = min(discount, Decimal(str(amount))) return discount except Exception as e: print(f"计算折扣失败: {e}") return Decimal('0') # ============================================ # 辅助查询函数 # ============================================ def get_user_subscription_history(user_id, limit=10): """ 获取用户订阅历史 Args: user_id: 用户ID limit: 返回记录数量限制 Returns: list: UserSubscription 对象列表 """ from models import UserSubscription try: subscriptions = UserSubscription.query.filter_by( user_id=user_id ).order_by( UserSubscription.created_at.desc() ).limit(limit).all() return subscriptions except Exception as e: print(f"获取订阅历史失败: {e}") return [] def check_subscription_status(user_id): """ 检查用户订阅状态 Args: user_id: 用户ID Returns: dict: { 'has_subscription': True/False, 'plan_code': 'pro' 或 'max' 或 'free', 'status': 'active' 或 'expired', 'end_date': datetime 或 None, 'days_left': int } """ current_sub = get_current_subscription(user_id) if not current_sub or current_sub.plan_code == 'free': return { 'has_subscription': False, 'plan_code': 'free', 'status': 'active', 'end_date': None, 'days_left': 999 } now = beijing_now() days_left = (current_sub.end_date - now).days if current_sub.end_date > now else 0 return { 'has_subscription': True, 'plan_code': current_sub.plan_code, 'status': current_sub.status, 'end_date': current_sub.end_date, 'days_left': days_left }