diff --git a/stress_test/login_stress_test.py b/stress_test/login_stress_test.py new file mode 100644 index 00000000..39f86451 --- /dev/null +++ b/stress_test/login_stress_test.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +登录并发压力测试脚本 + +模拟大量用户同时请求微信登录二维码,测试服务器承受能力 + +使用方式: + # 测试 100 个并发请求 + python login_stress_test.py --url https://valuefrontier.cn --concurrent 100 + + # 测试 500 个并发,持续 30 秒 + python login_stress_test.py --url https://valuefrontier.cn --concurrent 500 --duration 30 + + # 测试本地单进程 Flask + python login_stress_test.py --url http://localhost:5001 --concurrent 100 +""" + +import argparse +import asyncio +import aiohttp +import time +import statistics +from datetime import datetime +from collections import defaultdict + + +class LoginStressTest: + def __init__(self, base_url, concurrent, duration, batch_size=50): + self.base_url = base_url.rstrip('/') + self.concurrent = concurrent + self.duration = duration + self.batch_size = batch_size + + # 统计数据 + self.stats = { + 'total_requests': 0, + 'success': 0, + 'failed': 0, + 'timeout': 0, + 'response_times': [], + 'errors': defaultdict(int), + 'status_codes': defaultdict(int), + } + + async def request_qrcode(self, session, request_id): + """请求微信登录二维码""" + url = f"{self.base_url}/api/auth/wechat/qrcode" + start_time = time.time() + + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: + elapsed = (time.time() - start_time) * 1000 # ms + self.stats['response_times'].append(elapsed) + self.stats['status_codes'][response.status] += 1 + + if response.status == 200: + data = await response.json() + if data.get('success'): + self.stats['success'] += 1 + return True, elapsed, None + else: + error = data.get('error', '未知错误') + self.stats['failed'] += 1 + self.stats['errors'][error] += 1 + return False, elapsed, error + else: + self.stats['failed'] += 1 + self.stats['errors'][f'HTTP {response.status}'] += 1 + return False, elapsed, f'HTTP {response.status}' + + except asyncio.TimeoutError: + elapsed = (time.time() - start_time) * 1000 + self.stats['timeout'] += 1 + self.stats['errors']['Timeout'] += 1 + return False, elapsed, 'Timeout' + except aiohttp.ClientError as e: + elapsed = (time.time() - start_time) * 1000 + self.stats['failed'] += 1 + error_type = type(e).__name__ + self.stats['errors'][error_type] += 1 + return False, elapsed, error_type + except Exception as e: + elapsed = (time.time() - start_time) * 1000 + self.stats['failed'] += 1 + error_type = f'{type(e).__name__}: {str(e)[:50]}' + self.stats['errors'][error_type] += 1 + return False, elapsed, error_type + + async def run_batch(self, session, batch_id, count): + """运行一批并发请求""" + tasks = [] + for i in range(count): + request_id = f"batch{batch_id}_req{i}" + tasks.append(self.request_qrcode(session, request_id)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + self.stats['total_requests'] += len(results) + return results + + async def run(self): + """运行压力测试""" + print("=" * 70) + print(f"🚀 微信登录并发压力测试") + print(f" 目标: {self.base_url}") + print(f" 并发数: {self.concurrent}") + print(f" 持续时间: {self.duration} 秒") + print(f" 批量大小: {self.batch_size}") + print("=" * 70) + + connector = aiohttp.TCPConnector( + limit=self.concurrent, # 最大并发连接数 + limit_per_host=self.concurrent, + ttl_dns_cache=300, + ) + + async with aiohttp.ClientSession(connector=connector) as session: + start_time = time.time() + batch_id = 0 + + print(f"\n📡 开始发送请求...") + + while time.time() - start_time < self.duration: + batch_start = time.time() + + # 发送一批请求 + await self.run_batch(session, batch_id, self.batch_size) + + batch_elapsed = time.time() - batch_start + total_elapsed = time.time() - start_time + rps = self.stats['total_requests'] / total_elapsed if total_elapsed > 0 else 0 + + print(f" 批次 {batch_id + 1}: 总请求 {self.stats['total_requests']}, " + f"成功 {self.stats['success']}, 失败 {self.stats['failed']}, " + f"超时 {self.stats['timeout']}, RPS: {rps:.1f}") + + batch_id += 1 + + # 控制请求速率,避免瞬间打满 + if batch_elapsed < 1.0: + await asyncio.sleep(0.1) + + total_time = time.time() - start_time + + # 输出统计结果 + self.print_results(total_time) + + def print_results(self, total_time): + """打印测试结果""" + print("\n" + "=" * 70) + print("📊 测试结果") + print("=" * 70) + + total = self.stats['total_requests'] + success = self.stats['success'] + failed = self.stats['failed'] + timeout = self.stats['timeout'] + + print(f"\n📈 请求统计:") + print(f" 总请求数: {total}") + print(f" 成功: {success} ({success/total*100:.1f}%)" if total > 0 else " 成功: 0") + print(f" 失败: {failed} ({failed/total*100:.1f}%)" if total > 0 else " 失败: 0") + print(f" 超时: {timeout} ({timeout/total*100:.1f}%)" if total > 0 else " 超时: 0") + print(f" 总耗时: {total_time:.2f} 秒") + print(f" RPS (请求/秒): {total/total_time:.2f}" if total_time > 0 else " RPS: N/A") + + if self.stats['response_times']: + times = self.stats['response_times'] + print(f"\n⏱️ 响应时间统计:") + print(f" 最小: {min(times):.0f} ms") + print(f" 最大: {max(times):.0f} ms") + print(f" 平均: {statistics.mean(times):.0f} ms") + print(f" 中位数: {statistics.median(times):.0f} ms") + if len(times) >= 100: + p95 = sorted(times)[int(len(times) * 0.95)] + p99 = sorted(times)[int(len(times) * 0.99)] + print(f" P95: {p95:.0f} ms") + print(f" P99: {p99:.0f} ms") + + if self.stats['status_codes']: + print(f"\n📋 状态码分布:") + for code, count in sorted(self.stats['status_codes'].items()): + print(f" {code}: {count}") + + if self.stats['errors']: + print(f"\n❌ 错误分布:") + for error, count in sorted(self.stats['errors'].items(), key=lambda x: -x[1]): + print(f" {error}: {count}") + + print("\n" + "=" * 70) + + # 判断测试结果 + success_rate = success / total * 100 if total > 0 else 0 + avg_time = statistics.mean(self.stats['response_times']) if self.stats['response_times'] else 0 + + if success_rate >= 99 and avg_time < 1000: + print("✅ 测试通过: 服务器表现优秀") + elif success_rate >= 95 and avg_time < 3000: + print("⚠️ 测试警告: 服务器有轻微压力") + elif success_rate >= 90: + print("🔶 测试警告: 服务器压力较大,建议优化") + else: + print("❌ 测试失败: 服务器无法承受当前并发,需要扩容或优化") + + print("=" * 70) + + +async def main(): + parser = argparse.ArgumentParser(description="微信登录并发压力测试") + parser.add_argument("--url", default="https://valuefrontier.cn", + help="目标URL (default: https://valuefrontier.cn)") + parser.add_argument("--concurrent", "-c", type=int, default=100, + help="并发数 (default: 100)") + parser.add_argument("--duration", "-d", type=int, default=10, + help="测试持续时间(秒) (default: 10)") + parser.add_argument("--batch", "-b", type=int, default=50, + help="每批请求数 (default: 50)") + + args = parser.parse_args() + + test = LoginStressTest( + base_url=args.url, + concurrent=args.concurrent, + duration=args.duration, + batch_size=args.batch + ) + + await test.run() + + +if __name__ == "__main__": + asyncio.run(main())