Compare commits

..

No commits in common. "127ed9659d7fa4dc85a16a77ce90e34fa0ca22a2" and "ba33c35dd849473a653e57449d33dfbe9952fb5e" have entirely different histories.

2 changed files with 42 additions and 22 deletions

View File

@ -6,30 +6,44 @@ from celery import shared_task
logger = logging.getLogger(__name__)
# 轮询间隔(秒):每次查完后重新入队,不占 worker 进程
POLL_INTERVAL = 5
@shared_task(ignore_result=True)
def poll_video_task(record_id):
"""Poll Volcano API once for a video generation task.
一次性任务查一次 API更新 DB结束
recover_stuck_tasksbeat 每30秒调度统一驱动不再自己 retry
@shared_task(bind=True, max_retries=None, ignore_result=True)
def poll_video_task(self, record_id):
"""Poll Volcano API for a video generation task.
每次只执行一轮查询查完通过 self.retry 重新入队
这样 worker 不会被 sleep 占死重启也不丢任务
"""
from django.utils import timezone
from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status
# 防重复:同一 record 同一时刻只允许一个 poll 在执行
from django.core.cache import cache
lock_key = f'poll_lock:{record_id}'
if not cache.add(lock_key, '1', timeout=POLL_INTERVAL * 3):
logger.info('poll_video_task: record %s already being polled, skipping', record_id)
return
try:
record = GenerationRecord.objects.get(pk=record_id)
except GenerationRecord.DoesNotExist:
logger.warning('poll_video_task: record %s not found', record_id)
return
if record.status not in ('queued', 'processing'):
cache.delete(lock_key)
return
ark_task_id = record.ark_task_id
if not ark_task_id:
logger.warning('poll_video_task: record %s has no ark_task_id', record_id)
cache.delete(lock_key)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
cache.delete(lock_key)
return
# Poll Volcano API
@ -37,13 +51,16 @@ def poll_video_task(record_id):
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for record=%s ark=%s', record_id, ark_task_id)
return
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
cache.delete(lock_key)
raise self.retry(countdown=POLL_INTERVAL)
if new_status in ('queued', 'processing'):
# Still running — update status, then re-enqueue
record.status = new_status
record.save(update_fields=['status', 'updated_at'])
return
cache.delete(lock_key)
raise self.retry(countdown=POLL_INTERVAL)
# Terminal state reached — process result
record.status = new_status
@ -63,6 +80,7 @@ def poll_video_task(record_id):
'seed', 'completed_at',
])
cache.delete(lock_key)
logger.info(
'poll_video_task: record=%s ark=%s final_status=%s',
record_id, ark_task_id, new_status,
@ -113,27 +131,29 @@ def _handle_completed(record, ark_resp):
@shared_task(ignore_result=True)
def recover_stuck_tasks():
"""每30秒扫一次所有进行中的任务统一派发轮询。
poll_video_task 是一次性任务不再自己 retry由这里统一驱动
"""
"""定时扫描卡在 processing/queued 超过 3 分钟的任务,重新派发轮询。"""
from datetime import timedelta
from django.utils import timezone
from apps.generation.models import GenerationRecord
active_records = GenerationRecord.objects.filter(
cutoff = timezone.now() - timedelta(minutes=3)
stuck_records = GenerationRecord.objects.filter(
status__in=('queued', 'processing'),
ark_task_id__isnull=False,
).exclude(ark_task_id='').values_list('id', flat=True)
updated_at__lt=cutoff,
).exclude(ark_task_id='')
count = 0
for record_id in active_records:
for record in stuck_records:
logger.warning('recover_stuck_tasks: re-dispatching record=%s ark=%s', record.id, record.ark_task_id)
try:
poll_video_task.delay(record_id)
poll_video_task.delay(record.id)
count += 1
except Exception:
logger.error('recover_stuck_tasks: failed to dispatch record=%s', record_id)
logger.error('recover_stuck_tasks: failed to dispatch record=%s', record.id)
if count:
logger.info('recover_stuck_tasks: dispatched %d active tasks', count)
logger.info('recover_stuck_tasks: re-dispatched %d stuck tasks', count)
def _handle_failed(record, ark_resp):

View File

@ -182,7 +182,7 @@ CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BEAT_SCHEDULE = {
'recover-stuck-tasks': {
'task': 'apps.generation.tasks.recover_stuck_tasks',
'schedule': 30, # 每 30 秒
'schedule': 180, # 每 3 分钟
},
}