Files
vf_react/new_subscription_logic.py
2025-11-19 19:41:26 +08:00

632 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
新版订阅支付系统核心逻辑
版本: v2.0.0
日期: 2025-11-19
核心改进:
1. 续费价格与新购价格完全一致
2. 不再计算剩余价值折算
3. 逻辑简化,易于维护
"""
from datetime import datetime, timedelta
from decimal import Decimal
import json
import random
# ============================================
# 辅助函数
# ============================================
def beijing_now():
"""获取北京时间"""
from datetime import timezone, timedelta
utc_now = datetime.now(timezone.utc)
beijing_time = utc_now.astimezone(timezone(timedelta(hours=8)))
return beijing_time.replace(tzinfo=None)
def generate_order_no(user_id):
"""生成订单号"""
timestamp = int(beijing_now().timestamp() * 1000000)
random_suffix = random.randint(1000, 9999)
return f"{timestamp}{user_id:04d}{random_suffix}"
def generate_subscription_id():
"""生成订阅ID"""
timestamp = int(beijing_now().timestamp() * 1000)
random_suffix = random.randint(10000, 99999)
return f"SUB_{timestamp}_{random_suffix}"
# ============================================
# 核心业务逻辑
# ============================================
def calculate_subscription_price(plan_code, billing_cycle, promo_code=None, user_id=None, db_session=None):
"""
计算订阅价格(新购和续费价格完全一致)
Args:
plan_code: 套餐代码 (pro/max)
billing_cycle: 计费周期 (monthly/quarterly/semiannual/yearly)
promo_code: 优惠码(可选)
user_id: 用户ID可选,用于优惠码验证)
db_session: 数据库会话(可选)
Returns:
dict: {
'success': True/False,
'plan_code': 'pro',
'plan_name': 'Pro 专业版',
'billing_cycle': 'yearly',
'original_price': 2699.00,
'discount_amount': 0,
'final_amount': 2699.00,
'promo_code': None,
'promo_error': None,
'error': None # 如果有错误
}
"""
from models import SubscriptionPlan, PromoCode # 需要在实际使用时导入
try:
# 1. 查询套餐
plan = SubscriptionPlan.query.filter_by(plan_code=plan_code, is_active=True).first()
if not plan:
return {
'success': False,
'error': f'套餐 {plan_code} 不存在或已下架'
}
# 2. 获取对应周期的价格
price_field_map = {
'monthly': 'price_monthly',
'quarterly': 'price_quarterly',
'semiannual': 'price_semiannual',
'yearly': 'price_yearly'
}
price_field = price_field_map.get(billing_cycle)
if not price_field:
return {
'success': False,
'error': f'不支持的计费周期: {billing_cycle}'
}
original_price = getattr(plan, price_field, None)
if original_price is None or original_price <= 0:
return {
'success': False,
'error': f'{billing_cycle} 周期价格未配置'
}
original_price = float(original_price)
# 3. 构建基础结果
result = {
'success': True,
'plan_code': plan_code,
'plan_name': plan.plan_name,
'billing_cycle': billing_cycle,
'original_price': original_price,
'discount_amount': 0.0,
'final_amount': original_price,
'promo_code': None,
'promo_error': None,
'error': None
}
# 4. 应用优惠码(如果有)
if promo_code and promo_code.strip():
promo_code = promo_code.strip().upper()
# 验证优惠码
promo, error = validate_promo_code(
promo_code,
plan_code,
billing_cycle,
original_price,
user_id,
db_session
)
if promo:
# 计算折扣
discount = calculate_discount(promo, original_price)
result['discount_amount'] = float(discount)
result['final_amount'] = float(original_price - discount)
result['promo_code'] = promo.code
elif error:
result['promo_error'] = error
return result
except Exception as e:
return {
'success': False,
'error': f'价格计算失败: {str(e)}'
}
def get_current_subscription(user_id, db_session=None):
"""
获取用户当前生效的订阅
Args:
user_id: 用户ID
db_session: 数据库会话(可选)
Returns:
UserSubscription 对象 或 None
"""
from models import UserSubscription
try:
subscription = UserSubscription.query.filter_by(
user_id=user_id,
is_current=True
).first()
# 检查是否过期
if subscription and subscription.end_date < beijing_now():
subscription.status = 'expired'
subscription.is_current = False
if db_session:
db_session.commit()
return None
return subscription
except Exception as e:
print(f"获取当前订阅失败: {e}")
return None
def determine_subscription_type(user_id, plan_code, billing_cycle):
"""
判断订阅类型(新购还是续费)
Args:
user_id: 用户ID
plan_code: 目标套餐代码
billing_cycle: 目标计费周期
Returns:
str: 'new''renew'
"""
current_sub = get_current_subscription(user_id)
# 如果没有订阅或订阅是免费版,则为新购
if not current_sub or current_sub.plan_code == 'free':
return 'new'
# 如果是付费订阅,则为续费
if current_sub.plan_code in ['pro', 'max']:
return 'renew'
return 'new'
def create_subscription_order(user_id, plan_code, billing_cycle, promo_code=None, db_session=None):
"""
创建订阅支付订单
Args:
user_id: 用户ID
plan_code: 套餐代码
billing_cycle: 计费周期
promo_code: 优惠码(可选)
db_session: 数据库会话
Returns:
dict: {
'success': True/False,
'order': PaymentOrder 对象,
'error': None
}
"""
from models import PaymentOrder
try:
# 1. 计算价格
price_result = calculate_subscription_price(
plan_code,
billing_cycle,
promo_code,
user_id,
db_session
)
if not price_result.get('success'):
return {
'success': False,
'error': price_result.get('error', '价格计算失败')
}
# 2. 判断订阅类型
subscription_type = determine_subscription_type(user_id, plan_code, billing_cycle)
# 3. 创建支付订单
order = PaymentOrder(
order_no=generate_order_no(user_id),
user_id=user_id,
plan_code=plan_code,
billing_cycle=billing_cycle,
subscription_type=subscription_type,
original_price=Decimal(str(price_result['original_price'])),
discount_amount=Decimal(str(price_result['discount_amount'])),
final_amount=Decimal(str(price_result['final_amount'])),
promo_code=promo_code,
status='pending',
expired_at=beijing_now() + timedelta(minutes=30)
)
if db_session:
db_session.add(order)
db_session.commit()
return {
'success': True,
'order': order,
'subscription_type': subscription_type,
'error': None
}
except Exception as e:
if db_session:
db_session.rollback()
return {
'success': False,
'error': f'创建订单失败: {str(e)}'
}
def activate_subscription_after_payment(order_id, db_session=None):
"""
支付成功后激活订阅
Args:
order_id: 订单ID
db_session: 数据库会话
Returns:
dict: {
'success': True/False,
'subscription': UserSubscription 对象,
'error': None
}
"""
from models import PaymentOrder, UserSubscription, PromoCodeUsage
try:
# 1. 查询订单
order = PaymentOrder.query.get(order_id)
if not order:
return {'success': False, 'error': '订单不存在'}
if order.status != 'paid':
return {'success': False, 'error': '订单未支付'}
# 2. 检查是否已经激活
existing_sub = UserSubscription.query.filter_by(
payment_order_id=order.id
).first()
if existing_sub:
return {
'success': True,
'subscription': existing_sub,
'message': '订阅已激活'
}
# 3. 计算订阅周期天数
cycle_days_map = {
'monthly': 30,
'quarterly': 90,
'semiannual': 180,
'yearly': 365
}
days = cycle_days_map.get(order.billing_cycle, 30)
# 4. 获取当前订阅
current_sub = get_current_subscription(order.user_id, db_session)
# 5. 计算开始和结束时间
now = beijing_now()
if current_sub and current_sub.end_date > now:
# 续费:从当前订阅结束时间开始
start_date = current_sub.end_date
else:
# 新购:从当前时间开始
start_date = now
end_date = start_date + timedelta(days=days)
# 6. 创建新订阅记录
new_subscription = UserSubscription(
user_id=order.user_id,
subscription_id=generate_subscription_id(),
plan_code=order.plan_code,
billing_cycle=order.billing_cycle,
start_date=start_date,
end_date=end_date,
status='active',
is_current=True,
payment_order_id=order.id,
paid_amount=order.final_amount,
original_price=order.original_price,
discount_amount=order.discount_amount,
subscription_type=order.subscription_type,
previous_subscription_id=current_sub.subscription_id if current_sub else None,
auto_renew=False
)
# 7. 将旧订阅标记为非当前
if current_sub:
current_sub.is_current = False
if db_session:
db_session.add(new_subscription)
# 8. 记录优惠码使用
if order.promo_code_id:
usage = PromoCodeUsage(
promo_code_id=order.promo_code_id,
user_id=order.user_id,
order_id=order.id,
discount_amount=order.discount_amount
)
db_session.add(usage)
# 更新优惠码使用次数
from models import PromoCode
promo = PromoCode.query.get(order.promo_code_id)
if promo:
promo.current_uses += 1
db_session.commit()
return {
'success': True,
'subscription': new_subscription,
'error': None
}
except Exception as e:
if db_session:
db_session.rollback()
return {
'success': False,
'error': f'激活订阅失败: {str(e)}'
}
def get_subscription_button_text(user_id, plan_code, billing_cycle):
"""
获取订阅按钮文字
Args:
user_id: 用户ID
plan_code: 套餐代码 (pro/max)
billing_cycle: 计费周期
Returns:
str: 按钮文字
"""
from models import SubscriptionPlan
# 获取套餐显示名称
plan = SubscriptionPlan.query.filter_by(plan_code=plan_code).first()
plan_name = plan.plan_name if plan else plan_code.upper()
# 获取周期显示名称
cycle_names = {
'monthly': '月付',
'quarterly': '季付',
'semiannual': '半年付',
'yearly': '年付'
}
cycle_name = cycle_names.get(billing_cycle, billing_cycle)
# 获取当前订阅
current_sub = get_current_subscription(user_id)
# 1. 如果没有订阅或订阅已过期
if not current_sub or current_sub.plan_code == 'free' or current_sub.status != 'active':
return f"选择 {plan_name}"
# 2. 如果是当前套餐且周期相同
if current_sub.plan_code == plan_code and current_sub.billing_cycle == billing_cycle:
return f"续费 {plan_name}"
# 3. 如果是当前套餐但周期不同
if current_sub.plan_code == plan_code:
return f"切换至{cycle_name}"
# 4. 如果是不同套餐
return f"选择 {plan_name}"
# ============================================
# 优惠码相关函数
# ============================================
def validate_promo_code(code, plan_code, billing_cycle, amount, user_id=None, db_session=None):
"""
验证优惠码
Args:
code: 优惠码
plan_code: 套餐代码
billing_cycle: 计费周期
amount: 订单金额
user_id: 用户ID可选
db_session: 数据库会话(可选)
Returns:
tuple: (PromoCode对象 或 None, 错误信息 或 None)
"""
from models import PromoCode, PromoCodeUsage
try:
# 查询优惠码
promo = PromoCode.query.filter_by(code=code.upper(), is_active=True).first()
if not promo:
return None, "优惠码不存在或已失效"
# 检查有效期
now = beijing_now()
if promo.valid_from and now < promo.valid_from:
return None, "优惠码尚未生效"
if promo.valid_until and now > promo.valid_until:
return None, "优惠码已过期"
# 检查总使用次数
if promo.max_total_uses and promo.current_uses >= promo.max_total_uses:
return None, "优惠码使用次数已达上限"
# 检查每用户使用次数
if user_id and promo.max_uses_per_user:
user_usage_count = PromoCodeUsage.query.filter_by(
promo_code_id=promo.id,
user_id=user_id
).count()
if user_usage_count >= promo.max_uses_per_user:
return None, f"您已使用过此优惠码(限用{promo.max_uses_per_user}次)"
# 检查适用套餐
if promo.applicable_plans:
try:
applicable = json.loads(promo.applicable_plans)
if isinstance(applicable, list) and plan_code not in applicable:
return None, "该优惠码不适用于此套餐"
except:
pass
# 检查适用周期
if promo.applicable_cycles:
try:
applicable = json.loads(promo.applicable_cycles)
if isinstance(applicable, list) and billing_cycle not in applicable:
return None, "该优惠码不适用于此计费周期"
except:
pass
# 检查最低消费
if promo.min_amount and amount < float(promo.min_amount):
return None, f"需满 ¥{float(promo.min_amount):.2f} 才可使用此优惠码"
return promo, None
except Exception as e:
return None, f"验证优惠码时出错: {str(e)}"
def calculate_discount(promo_code, amount):
"""
计算优惠金额
Args:
promo_code: PromoCode 对象
amount: 订单金额
Returns:
Decimal: 优惠金额
"""
try:
if promo_code.discount_type == 'percentage':
# 百分比折扣
discount = Decimal(str(amount)) * Decimal(str(promo_code.discount_value)) / Decimal('100')
elif promo_code.discount_type == 'fixed_amount':
# 固定金额折扣
discount = Decimal(str(promo_code.discount_value))
else:
discount = Decimal('0')
# 确保折扣不超过总金额
discount = min(discount, Decimal(str(amount)))
return discount
except Exception as e:
print(f"计算折扣失败: {e}")
return Decimal('0')
# ============================================
# 辅助查询函数
# ============================================
def get_user_subscription_history(user_id, limit=10):
"""
获取用户订阅历史
Args:
user_id: 用户ID
limit: 返回记录数量限制
Returns:
list: UserSubscription 对象列表
"""
from models import UserSubscription
try:
subscriptions = UserSubscription.query.filter_by(
user_id=user_id
).order_by(
UserSubscription.created_at.desc()
).limit(limit).all()
return subscriptions
except Exception as e:
print(f"获取订阅历史失败: {e}")
return []
def check_subscription_status(user_id):
"""
检查用户订阅状态
Args:
user_id: 用户ID
Returns:
dict: {
'has_subscription': True/False,
'plan_code': 'pro''max''free',
'status': 'active''expired',
'end_date': datetime 或 None,
'days_left': int
}
"""
current_sub = get_current_subscription(user_id)
if not current_sub or current_sub.plan_code == 'free':
return {
'has_subscription': False,
'plan_code': 'free',
'status': 'active',
'end_date': None,
'days_left': 999
}
now = beijing_now()
days_left = (current_sub.end_date - now).days if current_sub.end_date > now else 0
return {
'has_subscription': True,
'plan_code': current_sub.plan_code,
'status': current_sub.status,
'end_date': current_sub.end_date,
'days_left': days_left
}