""" Celery poll_video_task 并发压测(两步执行) 步骤 1:启动 worker(mock 火山 API) 步骤 2:派发任务 + 监控 用法: cd backend && source venv/bin/activate # 终端 1:启动 mock worker python tests/test_poll_concurrency.py worker # 终端 2:派发 + 监控 python tests/test_poll_concurrency.py bench --tasks 100 --duration 30 """ import argparse import os import sys import time # 公共环境变量 REDIS_URL = os.environ.get('REDIS_URL', 'redis://zyc:Zyc188208@redis-shzlsczo52dft8mia.redis.volces.com:6379/1') os.environ['REDIS_URL'] = REDIS_URL os.environ['USE_MYSQL'] = 'true' os.environ.setdefault('DB_HOST', 'mysql-8351f937d637-public.rds.volces.com') os.environ.setdefault('DB_NAME', 'video_auto') os.environ.setdefault('DB_USER', 'zyc') os.environ.setdefault('DB_PASSWORD', 'Zyc188208') os.environ.setdefault('DB_PORT', '3306') sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') def cmd_worker(args): """启动 worker,用 mock 替换真实 airdrama_client""" # gevent monkey-patch 必须在所有 import 之前 from gevent import monkey monkey.patch_all() # 用 mock 模块替换真实 airdrama_client sys.path.insert(0, os.path.join(os.path.dirname(__file__))) import mock_airdrama sys.modules['utils.airdrama_client'] = mock_airdrama import django django.setup() print(f'[worker] 启动中... (mock 火山 API, concurrency={args.concurrency})') print(f'[worker] Redis: {REDIS_URL}') from config.celery import app app.Worker( pool='gevent', concurrency=args.concurrency, loglevel='INFO', without_heartbeat=True, without_mingle=True, without_gossip=True, ).start() def cmd_bench(args): """派发任务 + 监控""" import django django.setup() import redis as redis_lib r = redis_lib.from_url(REDIS_URL) from apps.accounts.models import User, Team from apps.generation.models import GenerationRecord from apps.generation.tasks import poll_video_task num_tasks = args.tasks duration = args.duration print(f'\n{"="*60}') print(f' Celery gevent 轮询并发压测') print(f' 任务数: {num_tasks}') print(f' 观察时长: {duration} 秒') print(f' Redis: {REDIS_URL}') print(f'{"="*60}\n') # 清空计数器 for key in ['bench:poll_count', 'bench:active', 'bench:peak', 'bench:tasks_seen']: r.delete(key) # 准备测试数据 team, _ = Team.objects.get_or_create(name='压测团队', defaults={'total_seconds_pool': 999999}) user, _ = User.objects.get_or_create(username='bench_user', defaults={ 'email': 'bench@test.com', 'team': team, }) GenerationRecord.objects.filter(prompt__startswith='压测任务').delete() records = [] for i in range(num_tasks): record = GenerationRecord.objects.create( user=user, prompt=f'压测任务 {i}', mode='universal', model='seedance_2.0', aspect_ratio='16:9', duration=5, status='processing', ark_task_id=f'bench-{i:04d}', ) records.append(record) print(f'[准备] 已创建 {num_tasks} 个测试记录') # 清空队列 r.delete('celery') print(f'[准备] 已清空 Redis 队列\n') # 派发 print(f'[派发] 正在派发 {num_tasks} 个轮询任务...') t0 = time.time() for record in records: poll_video_task.delay(record.id) print(f'[派发] 完成,耗时 {time.time()-t0:.1f} 秒\n') # 监控 print(f'[监控] 开始观察 {duration} 秒...\n') print(f' {"时间":>6s} {"总查询":>8s} {"当前并发":>8s} {"峰值并发":>8s} {"QPS":>8s} {"任务覆盖":>10s}') print(f' {"-"*6} {"-"*8} {"-"*8} {"-"*8} {"-"*8} {"-"*10}') last_count = 0 for sec in range(1, duration + 1): time.sleep(1) ct = int(r.get('bench:poll_count') or 0) ca = int(r.get('bench:active') or 0) cp = int(r.get('bench:peak') or 0) tp = r.scard('bench:tasks_seen') qps = ct - last_count last_count = ct print(f' {sec:>5d}s {ct:>8d} {ca:>8d} {cp:>8d} {qps:>8d} {tp:>9d}/{num_tasks}') # 结果 ft = int(r.get('bench:poll_count') or 0) fp = int(r.get('bench:peak') or 0) tp = r.scard('bench:tasks_seen') print(f'\n{"="*60}') print(f' 测试结果') print(f'{"="*60}') print(f' 总查询次数: {ft}') print(f' 平均 QPS: {ft / duration:.1f}') print(f' 峰值并发查询: {fp}') print(f' 任务覆盖率: {tp}/{num_tasks} ({tp*100//num_tasks}%)') print(f'{"="*60}\n') if tp == num_tasks: print(f' PASS: 所有 {num_tasks} 个任务都被成功轮询') else: print(f' WARNING: 只有 {tp}/{num_tasks} 个任务被轮询到') # 清理(只清 Redis 计数器,DB 记录保留给 worker 查询) # 测试结束后手动清理: # python -c "import os,django;os.environ['DJANGO_SETTINGS_MODULE']='config.settings';os.environ['USE_MYSQL']='true';os.environ['DB_HOST']='mysql-8351f937d637-public.rds.volces.com';os.environ['DB_NAME']='video_auto';os.environ['DB_USER']='zyc';os.environ['DB_PASSWORD']='Zyc188208';django.setup();from apps.generation.models import GenerationRecord;print(GenerationRecord.objects.filter(prompt__startswith='压测任务').delete())" for key in ['bench:poll_count', 'bench:active', 'bench:peak', 'bench:tasks_seen']: r.delete(key) print(f' 已清理 Redis 计数器(DB 记录保留给 worker)') if __name__ == '__main__': parser = argparse.ArgumentParser(description='Celery 轮询并发压测') sub = parser.add_subparsers(dest='cmd') p_worker = sub.add_parser('worker', help='启动 mock worker') p_worker.add_argument('--concurrency', type=int, default=200) p_bench = sub.add_parser('bench', help='派发任务 + 监控') p_bench.add_argument('--tasks', type=int, default=100) p_bench.add_argument('--duration', type=int, default=30) args = parser.parse_args() if args.cmd == 'worker': cmd_worker(args) elif args.cmd == 'bench': cmd_bench(args) else: parser.print_help()