#!/usr/bin/env python # -*- coding: utf-8 -*- """ 登录并发压力测试脚本 模拟大量用户同时请求微信登录二维码,测试服务器承受能力 使用方式: # 测试 100 个并发请求 python login_stress_test.py --url https://valuefrontier.cn --concurrent 100 # 测试 500 个并发,持续 30 秒 python login_stress_test.py --url https://valuefrontier.cn --concurrent 500 --duration 30 # 测试本地单进程 Flask python login_stress_test.py --url http://localhost:5001 --concurrent 100 """ import argparse import asyncio import aiohttp import time import statistics from datetime import datetime from collections import defaultdict class LoginStressTest: def __init__(self, base_url, concurrent, duration, batch_size=50): self.base_url = base_url.rstrip('/') self.concurrent = concurrent self.duration = duration self.batch_size = batch_size # 统计数据 self.stats = { 'total_requests': 0, 'success': 0, 'failed': 0, 'timeout': 0, 'response_times': [], 'errors': defaultdict(int), 'status_codes': defaultdict(int), } async def request_qrcode(self, session, request_id): """请求微信登录二维码""" url = f"{self.base_url}/api/auth/wechat/qrcode" start_time = time.time() try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: elapsed = (time.time() - start_time) * 1000 # ms self.stats['response_times'].append(elapsed) self.stats['status_codes'][response.status] += 1 if response.status == 200: data = await response.json() # 支持两种返回格式: {"success": true} 或 {"code": 0} if data.get('success') or data.get('code') == 0: self.stats['success'] += 1 return True, elapsed, None else: error = data.get('error') or data.get('message') or '未知错误' self.stats['failed'] += 1 self.stats['errors'][error] += 1 return False, elapsed, error else: self.stats['failed'] += 1 self.stats['errors'][f'HTTP {response.status}'] += 1 return False, elapsed, f'HTTP {response.status}' except asyncio.TimeoutError: elapsed = (time.time() - start_time) * 1000 self.stats['timeout'] += 1 self.stats['errors']['Timeout'] += 1 return False, elapsed, 'Timeout' except aiohttp.ClientError as e: elapsed = (time.time() - start_time) * 1000 self.stats['failed'] += 1 error_type = type(e).__name__ self.stats['errors'][error_type] += 1 return False, elapsed, error_type except Exception as e: elapsed = (time.time() - start_time) * 1000 self.stats['failed'] += 1 error_type = f'{type(e).__name__}: {str(e)[:50]}' self.stats['errors'][error_type] += 1 return False, elapsed, error_type async def run_batch(self, session, batch_id, count): """运行一批并发请求""" tasks = [] for i in range(count): request_id = f"batch{batch_id}_req{i}" tasks.append(self.request_qrcode(session, request_id)) results = await asyncio.gather(*tasks, return_exceptions=True) self.stats['total_requests'] += len(results) return results async def run(self): """运行压力测试""" print("=" * 70) print(f"🚀 微信登录并发压力测试") print(f" 目标: {self.base_url}") print(f" 并发数: {self.concurrent}") print(f" 持续时间: {self.duration} 秒") print(f" 批量大小: {self.batch_size}") print("=" * 70) connector = aiohttp.TCPConnector( limit=self.concurrent, # 最大并发连接数 limit_per_host=self.concurrent, ttl_dns_cache=300, ) async with aiohttp.ClientSession(connector=connector) as session: start_time = time.time() batch_id = 0 print(f"\n📡 开始发送请求...") while time.time() - start_time < self.duration: batch_start = time.time() # 发送一批请求 await self.run_batch(session, batch_id, self.batch_size) batch_elapsed = time.time() - batch_start total_elapsed = time.time() - start_time rps = self.stats['total_requests'] / total_elapsed if total_elapsed > 0 else 0 print(f" 批次 {batch_id + 1}: 总请求 {self.stats['total_requests']}, " f"成功 {self.stats['success']}, 失败 {self.stats['failed']}, " f"超时 {self.stats['timeout']}, RPS: {rps:.1f}") batch_id += 1 # 控制请求速率,避免瞬间打满 if batch_elapsed < 1.0: await asyncio.sleep(0.1) total_time = time.time() - start_time # 输出统计结果 self.print_results(total_time) def print_results(self, total_time): """打印测试结果""" print("\n" + "=" * 70) print("📊 测试结果") print("=" * 70) total = self.stats['total_requests'] success = self.stats['success'] failed = self.stats['failed'] timeout = self.stats['timeout'] print(f"\n📈 请求统计:") print(f" 总请求数: {total}") print(f" 成功: {success} ({success/total*100:.1f}%)" if total > 0 else " 成功: 0") print(f" 失败: {failed} ({failed/total*100:.1f}%)" if total > 0 else " 失败: 0") print(f" 超时: {timeout} ({timeout/total*100:.1f}%)" if total > 0 else " 超时: 0") print(f" 总耗时: {total_time:.2f} 秒") print(f" RPS (请求/秒): {total/total_time:.2f}" if total_time > 0 else " RPS: N/A") if self.stats['response_times']: times = self.stats['response_times'] print(f"\n⏱️ 响应时间统计:") print(f" 最小: {min(times):.0f} ms") print(f" 最大: {max(times):.0f} ms") print(f" 平均: {statistics.mean(times):.0f} ms") print(f" 中位数: {statistics.median(times):.0f} ms") if len(times) >= 100: p95 = sorted(times)[int(len(times) * 0.95)] p99 = sorted(times)[int(len(times) * 0.99)] print(f" P95: {p95:.0f} ms") print(f" P99: {p99:.0f} ms") if self.stats['status_codes']: print(f"\n📋 状态码分布:") for code, count in sorted(self.stats['status_codes'].items()): print(f" {code}: {count}") if self.stats['errors']: print(f"\n❌ 错误分布:") for error, count in sorted(self.stats['errors'].items(), key=lambda x: -x[1]): print(f" {error}: {count}") print("\n" + "=" * 70) # 判断测试结果 success_rate = success / total * 100 if total > 0 else 0 avg_time = statistics.mean(self.stats['response_times']) if self.stats['response_times'] else 0 if success_rate >= 99 and avg_time < 1000: print("✅ 测试通过: 服务器表现优秀") elif success_rate >= 95 and avg_time < 3000: print("⚠️ 测试警告: 服务器有轻微压力") elif success_rate >= 90: print("🔶 测试警告: 服务器压力较大,建议优化") else: print("❌ 测试失败: 服务器无法承受当前并发,需要扩容或优化") print("=" * 70) async def main(): parser = argparse.ArgumentParser(description="微信登录并发压力测试") parser.add_argument("--url", default="https://valuefrontier.cn", help="目标URL (default: https://valuefrontier.cn)") parser.add_argument("--concurrent", "-c", type=int, default=100, help="并发数 (default: 100)") parser.add_argument("--duration", "-d", type=int, default=10, help="测试持续时间(秒) (default: 10)") parser.add_argument("--batch", "-b", type=int, default=50, help="每批请求数 (default: 50)") args = parser.parse_args() test = LoginStressTest( base_url=args.url, concurrent=args.concurrent, duration=args.duration, batch_size=args.batch ) await test.run() if __name__ == "__main__": asyncio.run(main())