72 lines
1.6 KiB
Python
72 lines
1.6 KiB
Python
"""
|
||
临时替换 airdrama_client,让 query_task 始终返回 running。
|
||
worker 启动时会 import 这个 mock 版本。
|
||
"""
|
||
import os
|
||
import time
|
||
import redis
|
||
|
||
# 用 Redis 做跨进程计数器
|
||
_redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379/1')
|
||
_r = redis.from_url(_redis_url)
|
||
COUNTER_KEY = 'bench:poll_count'
|
||
ACTIVE_KEY = 'bench:active'
|
||
PEAK_KEY = 'bench:peak'
|
||
TASKS_KEY = 'bench:tasks_seen'
|
||
|
||
|
||
def query_task(task_id):
|
||
"""始终返回 running,通过 Redis 统计并发"""
|
||
pipe = _r.pipeline()
|
||
pipe.incr(COUNTER_KEY)
|
||
pipe.incr(ACTIVE_KEY)
|
||
pipe.sadd(TASKS_KEY, task_id)
|
||
pipe.execute()
|
||
|
||
# 检查并更新峰值
|
||
active = int(_r.get(ACTIVE_KEY) or 0)
|
||
peak = int(_r.get(PEAK_KEY) or 0)
|
||
if active > peak:
|
||
_r.set(PEAK_KEY, active)
|
||
|
||
time.sleep(0.2) # 模拟 200ms 网络延迟
|
||
|
||
_r.decr(ACTIVE_KEY)
|
||
|
||
return {'status': 'running'}
|
||
|
||
|
||
def map_status(ark_status):
|
||
mapping = {
|
||
'running': 'processing',
|
||
'submitted': 'queued',
|
||
'queued': 'queued',
|
||
'succeeded': 'completed',
|
||
'failed': 'failed',
|
||
}
|
||
return mapping.get(ark_status, 'processing')
|
||
|
||
|
||
def extract_video_url(resp):
|
||
return None
|
||
|
||
|
||
class AirDramaAPIError(Exception):
|
||
def __init__(self, code, message, status_code=400):
|
||
self.code = code
|
||
self.api_message = message
|
||
self.user_message = message
|
||
super().__init__(f'{code}: {message}')
|
||
|
||
|
||
ERROR_MESSAGES = {}
|
||
|
||
|
||
def create_task(**kwargs):
|
||
"""mock create_task"""
|
||
return {'id': 'mock-task-id'}
|
||
|
||
|
||
def download_video(url):
|
||
return b''
|