update pay ui
This commit is contained in:
232
stress_test/login_stress_test.py
Normal file
232
stress_test/login_stress_test.py
Normal file
@@ -0,0 +1,232 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
登录并发压力测试脚本
|
||||
|
||||
模拟大量用户同时请求微信登录二维码,测试服务器承受能力
|
||||
|
||||
使用方式:
|
||||
# 测试 100 个并发请求
|
||||
python login_stress_test.py --url https://valuefrontier.cn --concurrent 100
|
||||
|
||||
# 测试 500 个并发,持续 30 秒
|
||||
python login_stress_test.py --url https://valuefrontier.cn --concurrent 500 --duration 30
|
||||
|
||||
# 测试本地单进程 Flask
|
||||
python login_stress_test.py --url http://localhost:5001 --concurrent 100
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import time
|
||||
import statistics
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class LoginStressTest:
|
||||
def __init__(self, base_url, concurrent, duration, batch_size=50):
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.concurrent = concurrent
|
||||
self.duration = duration
|
||||
self.batch_size = batch_size
|
||||
|
||||
# 统计数据
|
||||
self.stats = {
|
||||
'total_requests': 0,
|
||||
'success': 0,
|
||||
'failed': 0,
|
||||
'timeout': 0,
|
||||
'response_times': [],
|
||||
'errors': defaultdict(int),
|
||||
'status_codes': defaultdict(int),
|
||||
}
|
||||
|
||||
async def request_qrcode(self, session, request_id):
|
||||
"""请求微信登录二维码"""
|
||||
url = f"{self.base_url}/api/auth/wechat/qrcode"
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
|
||||
elapsed = (time.time() - start_time) * 1000 # ms
|
||||
self.stats['response_times'].append(elapsed)
|
||||
self.stats['status_codes'][response.status] += 1
|
||||
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if data.get('success'):
|
||||
self.stats['success'] += 1
|
||||
return True, elapsed, None
|
||||
else:
|
||||
error = data.get('error', '未知错误')
|
||||
self.stats['failed'] += 1
|
||||
self.stats['errors'][error] += 1
|
||||
return False, elapsed, error
|
||||
else:
|
||||
self.stats['failed'] += 1
|
||||
self.stats['errors'][f'HTTP {response.status}'] += 1
|
||||
return False, elapsed, f'HTTP {response.status}'
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
self.stats['timeout'] += 1
|
||||
self.stats['errors']['Timeout'] += 1
|
||||
return False, elapsed, 'Timeout'
|
||||
except aiohttp.ClientError as e:
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
self.stats['failed'] += 1
|
||||
error_type = type(e).__name__
|
||||
self.stats['errors'][error_type] += 1
|
||||
return False, elapsed, error_type
|
||||
except Exception as e:
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
self.stats['failed'] += 1
|
||||
error_type = f'{type(e).__name__}: {str(e)[:50]}'
|
||||
self.stats['errors'][error_type] += 1
|
||||
return False, elapsed, error_type
|
||||
|
||||
async def run_batch(self, session, batch_id, count):
|
||||
"""运行一批并发请求"""
|
||||
tasks = []
|
||||
for i in range(count):
|
||||
request_id = f"batch{batch_id}_req{i}"
|
||||
tasks.append(self.request_qrcode(session, request_id))
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
self.stats['total_requests'] += len(results)
|
||||
return results
|
||||
|
||||
async def run(self):
|
||||
"""运行压力测试"""
|
||||
print("=" * 70)
|
||||
print(f"🚀 微信登录并发压力测试")
|
||||
print(f" 目标: {self.base_url}")
|
||||
print(f" 并发数: {self.concurrent}")
|
||||
print(f" 持续时间: {self.duration} 秒")
|
||||
print(f" 批量大小: {self.batch_size}")
|
||||
print("=" * 70)
|
||||
|
||||
connector = aiohttp.TCPConnector(
|
||||
limit=self.concurrent, # 最大并发连接数
|
||||
limit_per_host=self.concurrent,
|
||||
ttl_dns_cache=300,
|
||||
)
|
||||
|
||||
async with aiohttp.ClientSession(connector=connector) as session:
|
||||
start_time = time.time()
|
||||
batch_id = 0
|
||||
|
||||
print(f"\n📡 开始发送请求...")
|
||||
|
||||
while time.time() - start_time < self.duration:
|
||||
batch_start = time.time()
|
||||
|
||||
# 发送一批请求
|
||||
await self.run_batch(session, batch_id, self.batch_size)
|
||||
|
||||
batch_elapsed = time.time() - batch_start
|
||||
total_elapsed = time.time() - start_time
|
||||
rps = self.stats['total_requests'] / total_elapsed if total_elapsed > 0 else 0
|
||||
|
||||
print(f" 批次 {batch_id + 1}: 总请求 {self.stats['total_requests']}, "
|
||||
f"成功 {self.stats['success']}, 失败 {self.stats['failed']}, "
|
||||
f"超时 {self.stats['timeout']}, RPS: {rps:.1f}")
|
||||
|
||||
batch_id += 1
|
||||
|
||||
# 控制请求速率,避免瞬间打满
|
||||
if batch_elapsed < 1.0:
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
total_time = time.time() - start_time
|
||||
|
||||
# 输出统计结果
|
||||
self.print_results(total_time)
|
||||
|
||||
def print_results(self, total_time):
|
||||
"""打印测试结果"""
|
||||
print("\n" + "=" * 70)
|
||||
print("📊 测试结果")
|
||||
print("=" * 70)
|
||||
|
||||
total = self.stats['total_requests']
|
||||
success = self.stats['success']
|
||||
failed = self.stats['failed']
|
||||
timeout = self.stats['timeout']
|
||||
|
||||
print(f"\n📈 请求统计:")
|
||||
print(f" 总请求数: {total}")
|
||||
print(f" 成功: {success} ({success/total*100:.1f}%)" if total > 0 else " 成功: 0")
|
||||
print(f" 失败: {failed} ({failed/total*100:.1f}%)" if total > 0 else " 失败: 0")
|
||||
print(f" 超时: {timeout} ({timeout/total*100:.1f}%)" if total > 0 else " 超时: 0")
|
||||
print(f" 总耗时: {total_time:.2f} 秒")
|
||||
print(f" RPS (请求/秒): {total/total_time:.2f}" if total_time > 0 else " RPS: N/A")
|
||||
|
||||
if self.stats['response_times']:
|
||||
times = self.stats['response_times']
|
||||
print(f"\n⏱️ 响应时间统计:")
|
||||
print(f" 最小: {min(times):.0f} ms")
|
||||
print(f" 最大: {max(times):.0f} ms")
|
||||
print(f" 平均: {statistics.mean(times):.0f} ms")
|
||||
print(f" 中位数: {statistics.median(times):.0f} ms")
|
||||
if len(times) >= 100:
|
||||
p95 = sorted(times)[int(len(times) * 0.95)]
|
||||
p99 = sorted(times)[int(len(times) * 0.99)]
|
||||
print(f" P95: {p95:.0f} ms")
|
||||
print(f" P99: {p99:.0f} ms")
|
||||
|
||||
if self.stats['status_codes']:
|
||||
print(f"\n📋 状态码分布:")
|
||||
for code, count in sorted(self.stats['status_codes'].items()):
|
||||
print(f" {code}: {count}")
|
||||
|
||||
if self.stats['errors']:
|
||||
print(f"\n❌ 错误分布:")
|
||||
for error, count in sorted(self.stats['errors'].items(), key=lambda x: -x[1]):
|
||||
print(f" {error}: {count}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
|
||||
# 判断测试结果
|
||||
success_rate = success / total * 100 if total > 0 else 0
|
||||
avg_time = statistics.mean(self.stats['response_times']) if self.stats['response_times'] else 0
|
||||
|
||||
if success_rate >= 99 and avg_time < 1000:
|
||||
print("✅ 测试通过: 服务器表现优秀")
|
||||
elif success_rate >= 95 and avg_time < 3000:
|
||||
print("⚠️ 测试警告: 服务器有轻微压力")
|
||||
elif success_rate >= 90:
|
||||
print("🔶 测试警告: 服务器压力较大,建议优化")
|
||||
else:
|
||||
print("❌ 测试失败: 服务器无法承受当前并发,需要扩容或优化")
|
||||
|
||||
print("=" * 70)
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="微信登录并发压力测试")
|
||||
parser.add_argument("--url", default="https://valuefrontier.cn",
|
||||
help="目标URL (default: https://valuefrontier.cn)")
|
||||
parser.add_argument("--concurrent", "-c", type=int, default=100,
|
||||
help="并发数 (default: 100)")
|
||||
parser.add_argument("--duration", "-d", type=int, default=10,
|
||||
help="测试持续时间(秒) (default: 10)")
|
||||
parser.add_argument("--batch", "-b", type=int, default=50,
|
||||
help="每批请求数 (default: 50)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
test = LoginStressTest(
|
||||
base_url=args.url,
|
||||
concurrent=args.concurrent,
|
||||
duration=args.duration,
|
||||
batch_size=args.batch
|
||||
)
|
||||
|
||||
await test.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user