update pay promo

This commit is contained in:
2026-02-03 12:13:07 +08:00
parent 459f405314
commit 57e5672dc1

535
app.py
View File

@@ -1702,6 +1702,130 @@ class SubscriptionUpgrade(db.Model):
order = db.relationship('PaymentOrder', backref='upgrade_record')
# ============================================
# 发票相关模型
# ============================================
class Invoice(db.Model):
"""发票申请表"""
__tablename__ = 'invoices'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
invoice_no = db.Column(db.String(32), unique=True, nullable=True) # 发票号码(开具后填入)
invoice_code = db.Column(db.String(20), nullable=True) # 发票代码
order_id = db.Column(db.Integer, db.ForeignKey('payment_orders.id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
# 发票基本信息
invoice_type = db.Column(db.String(20), nullable=False, default='electronic') # electronic/paper
title_type = db.Column(db.String(20), nullable=False, default='personal') # personal/company
title = db.Column(db.String(200), nullable=False) # 发票抬头
# 企业开票信息
tax_number = db.Column(db.String(50), nullable=True) # 税号(企业必填)
company_address = db.Column(db.String(200), nullable=True)
company_phone = db.Column(db.String(50), nullable=True)
bank_name = db.Column(db.String(100), nullable=True)
bank_account = db.Column(db.String(50), nullable=True)
# 发票金额
amount = db.Column(db.Numeric(10, 2), nullable=False)
# 接收信息
email = db.Column(db.String(100), nullable=False) # 接收邮箱
phone = db.Column(db.String(20), nullable=True)
# 纸质发票邮寄信息
mailing_address = db.Column(db.String(500), nullable=True)
recipient_name = db.Column(db.String(50), nullable=True)
recipient_phone = db.Column(db.String(20), nullable=True)
# 状态信息
status = db.Column(db.String(20), default='pending') # pending/processing/completed/rejected/cancelled
invoice_url = db.Column(db.String(500), nullable=True) # 电子发票下载链接
# 时间戳
created_at = db.Column(db.DateTime, default=beijing_now)
updated_at = db.Column(db.DateTime, default=beijing_now, onupdate=beijing_now)
completed_at = db.Column(db.DateTime, nullable=True)
# 备注
remark = db.Column(db.String(500), nullable=True)
reject_reason = db.Column(db.String(200), nullable=True)
# 关系
order = db.relationship('PaymentOrder', backref='invoices')
user = db.relationship('User', backref='invoices')
def to_dict(self):
return {
'id': str(self.id),
'invoiceNo': self.invoice_no,
'invoiceCode': self.invoice_code,
'orderId': str(self.order_id),
'orderNo': self.order.order_no if self.order else None,
'userId': self.user_id,
'invoiceType': self.invoice_type,
'titleType': self.title_type,
'title': self.title,
'taxNumber': self.tax_number,
'companyAddress': self.company_address,
'companyPhone': self.company_phone,
'bankName': self.bank_name,
'bankAccount': self.bank_account,
'amount': float(self.amount) if self.amount else 0,
'email': self.email,
'phone': self.phone,
'mailingAddress': self.mailing_address,
'recipientName': self.recipient_name,
'recipientPhone': self.recipient_phone,
'status': self.status,
'invoiceUrl': self.invoice_url,
'createdAt': self.created_at.isoformat() if self.created_at else None,
'updatedAt': self.updated_at.isoformat() if self.updated_at else None,
'completedAt': self.completed_at.isoformat() if self.completed_at else None,
'remark': self.remark,
'rejectReason': self.reject_reason
}
class InvoiceTitleTemplate(db.Model):
"""发票抬头模板表"""
__tablename__ = 'invoice_title_templates'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
is_default = db.Column(db.Boolean, default=False)
title_type = db.Column(db.String(20), nullable=False, default='personal') # personal/company
title = db.Column(db.String(200), nullable=False)
tax_number = db.Column(db.String(50), nullable=True)
company_address = db.Column(db.String(200), nullable=True)
company_phone = db.Column(db.String(50), nullable=True)
bank_name = db.Column(db.String(100), nullable=True)
bank_account = db.Column(db.String(50), nullable=True)
created_at = db.Column(db.DateTime, default=beijing_now)
updated_at = db.Column(db.DateTime, default=beijing_now, onupdate=beijing_now)
# 关系
user = db.relationship('User', backref='invoice_title_templates')
def to_dict(self):
return {
'id': str(self.id),
'userId': self.user_id,
'isDefault': self.is_default,
'titleType': self.title_type,
'title': self.title,
'taxNumber': self.tax_number,
'companyAddress': self.company_address,
'companyPhone': self.company_phone,
'bankName': self.bank_name,
'bankAccount': self.bank_account,
'createdAt': self.created_at.isoformat() if self.created_at else None
}
# ============================================
# 模拟盘相关模型
# ============================================
@@ -3599,6 +3723,417 @@ def check_alipay_order_status_by_no(order_no):
return jsonify({'success': False, 'error': '查询失败'}), 500
# ============================================
# 发票管理 API
# ============================================
@app.route('/api/invoice/available-orders', methods=['GET'])
@login_required
def get_invoice_available_orders():
"""获取可开票订单列表(已支付且未申请开票的订单)"""
try:
user_id = session['user_id']
# 查询已支付的订单
paid_orders = PaymentOrder.query.filter_by(
user_id=user_id,
status='paid'
).order_by(PaymentOrder.paid_at.desc()).all()
available_orders = []
for order in paid_orders:
# 检查该订单是否已申请过发票
existing_invoice = Invoice.query.filter_by(
order_id=order.id
).filter(Invoice.status.notin_(['cancelled'])).first()
if not existing_invoice:
# 套餐名称映射
plan_names = {'pro': 'Pro会员', 'max': 'Max会员'}
cycle_names = {'monthly': '月付', 'quarterly': '季付', 'yearly': '年付'}
available_orders.append({
'id': str(order.id),
'orderNo': order.order_no,
'planName': plan_names.get(order.plan_name, order.plan_name),
'billingCycle': cycle_names.get(order.billing_cycle, order.billing_cycle),
'amount': float(order.amount) if order.amount else 0,
'paidAt': order.paid_at.isoformat() if order.paid_at else None,
'invoiceApplied': False
})
return jsonify({
'code': 200,
'message': 'success',
'data': available_orders
})
except Exception as e:
print(f"获取可开票订单失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/apply', methods=['POST'])
@login_required
def apply_invoice():
"""申请开票"""
try:
user_id = session['user_id']
data = request.get_json()
order_id = data.get('orderId')
invoice_type = data.get('invoiceType', 'electronic')
title_type = data.get('titleType', 'personal')
title = data.get('title')
tax_number = data.get('taxNumber')
email = data.get('email')
# 验证必填字段
if not order_id:
return jsonify({'code': 400, 'message': '订单ID不能为空', 'data': None}), 400
if not title:
return jsonify({'code': 400, 'message': '发票抬头不能为空', 'data': None}), 400
if not email:
return jsonify({'code': 400, 'message': '邮箱不能为空', 'data': None}), 400
if title_type == 'company' and not tax_number:
return jsonify({'code': 400, 'message': '企业开票必须填写税号', 'data': None}), 400
# 验证订单
order = PaymentOrder.query.filter_by(id=order_id, user_id=user_id, status='paid').first()
if not order:
return jsonify({'code': 404, 'message': '订单不存在或未支付', 'data': None}), 404
# 检查是否已申请过发票
existing_invoice = Invoice.query.filter_by(
order_id=order_id
).filter(Invoice.status.notin_(['cancelled'])).first()
if existing_invoice:
return jsonify({'code': 400, 'message': '该订单已申请开票', 'data': None}), 400
# 创建发票申请
invoice = Invoice(
order_id=order_id,
user_id=user_id,
invoice_type=invoice_type,
title_type=title_type,
title=title,
tax_number=tax_number,
company_address=data.get('companyAddress'),
company_phone=data.get('companyPhone'),
bank_name=data.get('bankName'),
bank_account=data.get('bankAccount'),
amount=order.amount,
email=email,
phone=data.get('phone'),
mailing_address=data.get('mailingAddress'),
recipient_name=data.get('recipientName'),
recipient_phone=data.get('recipientPhone'),
remark=data.get('remark'),
status='pending'
)
db.session.add(invoice)
db.session.commit()
print(f"发票申请创建成功: 用户{user_id}, 订单{order_id}, 发票ID{invoice.id}")
return jsonify({
'code': 200,
'message': '开票申请已提交预计1-3个工作日内处理',
'data': invoice.to_dict()
})
except Exception as e:
db.session.rollback()
print(f"申请开票失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/list', methods=['GET'])
@login_required
def get_invoice_list():
"""获取发票列表"""
try:
user_id = session['user_id']
page = request.args.get('page', 1, type=int)
page_size = request.args.get('pageSize', 10, type=int)
status_filter = request.args.get('status')
query = Invoice.query.filter_by(user_id=user_id)
if status_filter:
query = query.filter_by(status=status_filter)
query = query.order_by(Invoice.created_at.desc())
# 分页
total = query.count()
invoices = query.offset((page - 1) * page_size).limit(page_size).all()
return jsonify({
'code': 200,
'message': 'success',
'data': {
'list': [inv.to_dict() for inv in invoices],
'total': total,
'page': page,
'pageSize': page_size,
'totalPages': (total + page_size - 1) // page_size
}
})
except Exception as e:
print(f"获取发票列表失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/stats', methods=['GET'])
@login_required
def get_invoice_stats():
"""获取发票统计信息"""
try:
user_id = session['user_id']
# 统计各状态的发票数量
total = Invoice.query.filter_by(user_id=user_id).count()
pending = Invoice.query.filter_by(user_id=user_id, status='pending').count()
processing = Invoice.query.filter_by(user_id=user_id, status='processing').count()
completed = Invoice.query.filter_by(user_id=user_id, status='completed').count()
cancelled = Invoice.query.filter_by(user_id=user_id, status='cancelled').count()
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'pending': pending,
'processing': processing,
'completed': completed,
'cancelled': cancelled
}
})
except Exception as e:
print(f"获取发票统计失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/<invoice_id>', methods=['GET'])
@login_required
def get_invoice_detail(invoice_id):
"""获取发票详情"""
try:
user_id = session['user_id']
invoice = Invoice.query.filter_by(id=invoice_id, user_id=user_id).first()
if not invoice:
return jsonify({'code': 404, 'message': '发票不存在', 'data': None}), 404
return jsonify({
'code': 200,
'message': 'success',
'data': invoice.to_dict()
})
except Exception as e:
print(f"获取发票详情失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/<invoice_id>/cancel', methods=['POST'])
@login_required
def cancel_invoice(invoice_id):
"""取消发票申请"""
try:
user_id = session['user_id']
invoice = Invoice.query.filter_by(id=invoice_id, user_id=user_id).first()
if not invoice:
return jsonify({'code': 404, 'message': '发票不存在', 'data': None}), 404
if invoice.status != 'pending':
return jsonify({'code': 400, 'message': '只能取消待处理的发票申请', 'data': None}), 400
invoice.status = 'cancelled'
invoice.updated_at = beijing_now()
db.session.commit()
print(f"发票申请已取消: 用户{user_id}, 发票ID{invoice_id}")
return jsonify({
'code': 200,
'message': '发票申请已取消',
'data': None
})
except Exception as e:
db.session.rollback()
print(f"取消发票申请失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/<invoice_id>/download', methods=['GET'])
@login_required
def download_invoice(invoice_id):
"""下载电子发票"""
try:
user_id = session['user_id']
invoice = Invoice.query.filter_by(id=invoice_id, user_id=user_id).first()
if not invoice:
return jsonify({'code': 404, 'message': '发票不存在', 'data': None}), 404
if invoice.status != 'completed':
return jsonify({'code': 400, 'message': '发票尚未开具完成', 'data': None}), 400
# 如果有实际的发票文件URL可以重定向或返回文件
# 这里返回一个占位的 PDF 响应
if invoice.invoice_url:
# 实际项目中应该从存储服务获取文件
return redirect(invoice.invoice_url)
# 返回模拟的 PDF 内容(实际项目中应对接发票服务)
pdf_content = b'%PDF-1.4 mock invoice content'
response = make_response(pdf_content)
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'attachment; filename="invoice_{invoice.invoice_no or invoice.id}.pdf"'
return response
except Exception as e:
print(f"下载发票失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
# ==================== 发票抬头模板 API ====================
@app.route('/api/invoice/title-templates', methods=['GET'])
@login_required
def get_invoice_title_templates():
"""获取发票抬头模板列表"""
try:
user_id = session['user_id']
templates = InvoiceTitleTemplate.query.filter_by(user_id=user_id)\
.order_by(InvoiceTitleTemplate.is_default.desc(), InvoiceTitleTemplate.created_at.desc()).all()
return jsonify({
'code': 200,
'message': 'success',
'data': [t.to_dict() for t in templates]
})
except Exception as e:
print(f"获取抬头模板列表失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/title-template', methods=['POST'])
@login_required
def save_invoice_title_template():
"""保存发票抬头模板"""
try:
user_id = session['user_id']
data = request.get_json()
title_type = data.get('titleType', 'personal')
title = data.get('title')
is_default = data.get('isDefault', False)
if not title:
return jsonify({'code': 400, 'message': '抬头名称不能为空', 'data': None}), 400
# 如果设为默认,取消其他模板的默认状态
if is_default:
InvoiceTitleTemplate.query.filter_by(user_id=user_id).update({'is_default': False})
template = InvoiceTitleTemplate(
user_id=user_id,
title_type=title_type,
title=title,
tax_number=data.get('taxNumber'),
company_address=data.get('companyAddress'),
company_phone=data.get('companyPhone'),
bank_name=data.get('bankName'),
bank_account=data.get('bankAccount'),
is_default=is_default
)
db.session.add(template)
db.session.commit()
print(f"发票抬头模板保存成功: 用户{user_id}, 模板ID{template.id}")
return jsonify({
'code': 200,
'message': '保存成功',
'data': template.to_dict()
})
except Exception as e:
db.session.rollback()
print(f"保存抬头模板失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/title-template/<template_id>', methods=['DELETE'])
@login_required
def delete_invoice_title_template(template_id):
"""删除发票抬头模板"""
try:
user_id = session['user_id']
template = InvoiceTitleTemplate.query.filter_by(id=template_id, user_id=user_id).first()
if not template:
return jsonify({'code': 404, 'message': '模板不存在', 'data': None}), 404
db.session.delete(template)
db.session.commit()
print(f"发票抬头模板删除成功: 用户{user_id}, 模板ID{template_id}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None
})
except Exception as e:
db.session.rollback()
print(f"删除抬头模板失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/invoice/title-template/<template_id>/default', methods=['POST'])
@login_required
def set_default_invoice_title_template(template_id):
"""设置默认发票抬头"""
try:
user_id = session['user_id']
template = InvoiceTitleTemplate.query.filter_by(id=template_id, user_id=user_id).first()
if not template:
return jsonify({'code': 404, 'message': '模板不存在', 'data': None}), 404
# 取消其他模板的默认状态
InvoiceTitleTemplate.query.filter_by(user_id=user_id).update({'is_default': False})
template.is_default = True
db.session.commit()
print(f"设置默认抬头成功: 用户{user_id}, 模板ID{template_id}")
return jsonify({
'code': 200,
'message': '设置成功',
'data': None
})
except Exception as e:
db.session.rollback()
print(f"设置默认抬头失败: {str(e)}")
return jsonify({'code': 500, 'message': str(e), 'data': None}), 500
@app.route('/api/auth/session', methods=['GET'])
def get_session_info():
"""获取当前登录用户信息"""