""" 临时替换 airdrama_client,让 query_task 始终返回 running。 worker 启动时会 import 这个 mock 版本。 """ import os import time import redis # 用 Redis 做跨进程计数器 _redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/1') _r = redis.from_url(_redis_url) COUNTER_KEY = 'bench:poll_count' ACTIVE_KEY = 'bench:active' PEAK_KEY = 'bench:peak' TASKS_KEY = 'bench:tasks_seen' def query_task(task_id): """始终返回 running,通过 Redis 统计并发""" pipe = _r.pipeline() pipe.incr(COUNTER_KEY) pipe.incr(ACTIVE_KEY) pipe.sadd(TASKS_KEY, task_id) pipe.execute() # 检查并更新峰值 active = int(_r.get(ACTIVE_KEY) or 0) peak = int(_r.get(PEAK_KEY) or 0) if active > peak: _r.set(PEAK_KEY, active) time.sleep(0.2) # 模拟 200ms 网络延迟 _r.decr(ACTIVE_KEY) return {'status': 'running'} def map_status(ark_status): mapping = { 'running': 'processing', 'submitted': 'queued', 'queued': 'queued', 'succeeded': 'completed', 'failed': 'failed', } return mapping.get(ark_status, 'processing') def extract_video_url(resp): return None class AirDramaAPIError(Exception): def __init__(self, code, message, status_code=400): self.code = code self.api_message = message self.user_message = message super().__init__(f'{code}: {message}') ERROR_MESSAGES = {} def create_task(**kwargs): """mock create_task""" return {'id': 'mock-task-id'} def download_video(url): return b''