video-shuoshan/backend/tests/test_poll_concurrency.py
zyc 1a2bd982af
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 19s
add test
2026-04-04 12:46:26 +08:00

184 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery poll_video_task 并发压测(两步执行)
步骤 1启动 workermock 火山 API
步骤 2派发任务 + 监控
用法:
cd backend && source venv/bin/activate
# 终端 1启动 mock worker
python tests/test_poll_concurrency.py worker
# 终端 2派发 + 监控
python tests/test_poll_concurrency.py bench --tasks 100 --duration 30
"""
import argparse
import os
import sys
import time
# 公共环境变量
REDIS_URL = os.environ.get('REDIS_URL',
'redis://zyc:Zyc188208@redis-shzlsczo52dft8mia.redis.volces.com:6379/1')
os.environ['REDIS_URL'] = REDIS_URL
os.environ['USE_MYSQL'] = 'true'
os.environ.setdefault('DB_HOST', 'mysql-8351f937d637-public.rds.volces.com')
os.environ.setdefault('DB_NAME', 'video_auto')
os.environ.setdefault('DB_USER', 'zyc')
os.environ.setdefault('DB_PASSWORD', 'Zyc188208')
os.environ.setdefault('DB_PORT', '3306')
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
def cmd_worker(args):
"""启动 worker用 mock 替换真实 airdrama_client"""
# gevent monkey-patch 必须在所有 import 之前
from gevent import monkey
monkey.patch_all()
# 用 mock 模块替换真实 airdrama_client
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
import mock_airdrama
sys.modules['utils.airdrama_client'] = mock_airdrama
import django
django.setup()
print(f'[worker] 启动中... (mock 火山 API, concurrency={args.concurrency})')
print(f'[worker] Redis: {REDIS_URL}')
from config.celery import app
app.Worker(
pool='gevent',
concurrency=args.concurrency,
loglevel='INFO',
without_heartbeat=True,
without_mingle=True,
without_gossip=True,
).start()
def cmd_bench(args):
"""派发任务 + 监控"""
import django
django.setup()
import redis as redis_lib
r = redis_lib.from_url(REDIS_URL)
from apps.accounts.models import User, Team
from apps.generation.models import GenerationRecord
from apps.generation.tasks import poll_video_task
num_tasks = args.tasks
duration = args.duration
print(f'\n{"="*60}')
print(f' Celery gevent 轮询并发压测')
print(f' 任务数: {num_tasks}')
print(f' 观察时长: {duration}')
print(f' Redis: {REDIS_URL}')
print(f'{"="*60}\n')
# 清空计数器
for key in ['bench:poll_count', 'bench:active', 'bench:peak', 'bench:tasks_seen']:
r.delete(key)
# 准备测试数据
team, _ = Team.objects.get_or_create(name='压测团队', defaults={'total_seconds_pool': 999999})
user, _ = User.objects.get_or_create(username='bench_user', defaults={
'email': 'bench@test.com', 'team': team,
})
GenerationRecord.objects.filter(prompt__startswith='压测任务').delete()
records = []
for i in range(num_tasks):
record = GenerationRecord.objects.create(
user=user,
prompt=f'压测任务 {i}',
mode='universal',
model='seedance_2.0',
aspect_ratio='16:9',
duration=5,
status='processing',
ark_task_id=f'bench-{i:04d}',
)
records.append(record)
print(f'[准备] 已创建 {num_tasks} 个测试记录')
# 清空队列
r.delete('celery')
print(f'[准备] 已清空 Redis 队列\n')
# 派发
print(f'[派发] 正在派发 {num_tasks} 个轮询任务...')
t0 = time.time()
for record in records:
poll_video_task.delay(record.id)
print(f'[派发] 完成,耗时 {time.time()-t0:.1f}\n')
# 监控
print(f'[监控] 开始观察 {duration} 秒...\n')
print(f' {"时间":>6s} {"总查询":>8s} {"当前并发":>8s} {"峰值并发":>8s} {"QPS":>8s} {"任务覆盖":>10s}')
print(f' {"-"*6} {"-"*8} {"-"*8} {"-"*8} {"-"*8} {"-"*10}')
last_count = 0
for sec in range(1, duration + 1):
time.sleep(1)
ct = int(r.get('bench:poll_count') or 0)
ca = int(r.get('bench:active') or 0)
cp = int(r.get('bench:peak') or 0)
tp = r.scard('bench:tasks_seen')
qps = ct - last_count
last_count = ct
print(f' {sec:>5d}s {ct:>8d} {ca:>8d} {cp:>8d} {qps:>8d} {tp:>9d}/{num_tasks}')
# 结果
ft = int(r.get('bench:poll_count') or 0)
fp = int(r.get('bench:peak') or 0)
tp = r.scard('bench:tasks_seen')
print(f'\n{"="*60}')
print(f' 测试结果')
print(f'{"="*60}')
print(f' 总查询次数: {ft}')
print(f' 平均 QPS: {ft / duration:.1f}')
print(f' 峰值并发查询: {fp}')
print(f' 任务覆盖率: {tp}/{num_tasks} ({tp*100//num_tasks}%)')
print(f'{"="*60}\n')
if tp == num_tasks:
print(f' PASS: 所有 {num_tasks} 个任务都被成功轮询')
else:
print(f' WARNING: 只有 {tp}/{num_tasks} 个任务被轮询到')
# 清理(只清 Redis 计数器DB 记录保留给 worker 查询)
# 测试结束后手动清理:
# python -c "import os,django;os.environ['DJANGO_SETTINGS_MODULE']='config.settings';os.environ['USE_MYSQL']='true';os.environ['DB_HOST']='mysql-8351f937d637-public.rds.volces.com';os.environ['DB_NAME']='video_auto';os.environ['DB_USER']='zyc';os.environ['DB_PASSWORD']='Zyc188208';django.setup();from apps.generation.models import GenerationRecord;print(GenerationRecord.objects.filter(prompt__startswith='压测任务').delete())"
for key in ['bench:poll_count', 'bench:active', 'bench:peak', 'bench:tasks_seen']:
r.delete(key)
print(f' 已清理 Redis 计数器DB 记录保留给 worker')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Celery 轮询并发压测')
sub = parser.add_subparsers(dest='cmd')
p_worker = sub.add_parser('worker', help='启动 mock worker')
p_worker.add_argument('--concurrency', type=int, default=200)
p_bench = sub.add_parser('bench', help='派发任务 + 监控')
p_bench.add_argument('--tasks', type=int, default=100)
p_bench.add_argument('--duration', type=int, default=30)
args = parser.parse_args()
if args.cmd == 'worker':
cmd_worker(args)
elif args.cmd == 'bench':
cmd_bench(args)
else:
parser.print_help()