From ded5c4c44f9542ae4e5dabbc93e5431caa0e61b4 Mon Sep 17 00:00:00 2001 From: zyc <1439655764@qq.com> Date: Sat, 4 Apr 2026 20:13:23 +0800 Subject: [PATCH] fix bug --- backend/apps/generation/tasks.py | 62 +++++++++++--------------------- backend/config/settings.py | 2 +- 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/backend/apps/generation/tasks.py b/backend/apps/generation/tasks.py index 50d143e..8f98f1f 100644 --- a/backend/apps/generation/tasks.py +++ b/backend/apps/generation/tasks.py @@ -6,44 +6,30 @@ from celery import shared_task logger = logging.getLogger(__name__) -# 轮询间隔(秒):每次查完后重新入队,不占 worker 进程 -POLL_INTERVAL = 5 +@shared_task(ignore_result=True) +def poll_video_task(record_id): + """Poll Volcano API once for a video generation task. -@shared_task(bind=True, max_retries=None, ignore_result=True) -def poll_video_task(self, record_id): - """Poll Volcano API for a video generation task. - - 每次只执行一轮查询,查完通过 self.retry 重新入队。 - 这样 worker 不会被 sleep 占死,重启也不丢任务。 + 一次性任务:查一次 API,更新 DB,结束。 + 由 recover_stuck_tasks(beat 每30秒调度)统一驱动,不再自己 retry。 """ from django.utils import timezone from apps.generation.models import GenerationRecord from utils.airdrama_client import query_task, map_status - # 防重复:同一 record 同一时刻只允许一个 poll 在执行 - from django.core.cache import cache - lock_key = f'poll_lock:{record_id}' - if not cache.add(lock_key, '1', timeout=POLL_INTERVAL * 3): - logger.info('poll_video_task: record %s already being polled, skipping', record_id) - return - try: record = GenerationRecord.objects.get(pk=record_id) except GenerationRecord.DoesNotExist: logger.warning('poll_video_task: record %s not found', record_id) - cache.delete(lock_key) + return + + if record.status not in ('queued', 'processing'): return ark_task_id = record.ark_task_id if not ark_task_id: logger.warning('poll_video_task: record %s has no ark_task_id', record_id) - cache.delete(lock_key) - return - - if record.status not in ('queued', 'processing'): - logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) - cache.delete(lock_key) return # Poll Volcano API @@ -51,16 +37,13 @@ def poll_video_task(self, record_id): ark_resp = query_task(ark_task_id) new_status = map_status(ark_resp.get('status', '')) except Exception: - logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) - cache.delete(lock_key) - raise self.retry(countdown=POLL_INTERVAL) + logger.exception('poll_video_task: API query failed for record=%s ark=%s', record_id, ark_task_id) + return if new_status in ('queued', 'processing'): - # Still running — update status, then re-enqueue record.status = new_status record.save(update_fields=['status', 'updated_at']) - cache.delete(lock_key) - raise self.retry(countdown=POLL_INTERVAL) + return # Terminal state reached — process result record.status = new_status @@ -80,7 +63,6 @@ def poll_video_task(self, record_id): 'seed', 'completed_at', ]) - cache.delete(lock_key) logger.info( 'poll_video_task: record=%s ark=%s final_status=%s', record_id, ark_task_id, new_status, @@ -131,29 +113,27 @@ def _handle_completed(record, ark_resp): @shared_task(ignore_result=True) def recover_stuck_tasks(): - """定时扫描卡在 processing/queued 超过 3 分钟的任务,重新派发轮询。""" - from datetime import timedelta - from django.utils import timezone + """每30秒扫一次所有进行中的任务,统一派发轮询。 + + poll_video_task 是一次性任务,不再自己 retry,由这里统一驱动。 + """ from apps.generation.models import GenerationRecord - cutoff = timezone.now() - timedelta(minutes=3) - stuck_records = GenerationRecord.objects.filter( + active_records = GenerationRecord.objects.filter( status__in=('queued', 'processing'), ark_task_id__isnull=False, - updated_at__lt=cutoff, - ).exclude(ark_task_id='') + ).exclude(ark_task_id='').values_list('id', flat=True) count = 0 - for record in stuck_records: - logger.warning('recover_stuck_tasks: re-dispatching record=%s ark=%s', record.id, record.ark_task_id) + for record_id in active_records: try: - poll_video_task.delay(record.id) + poll_video_task.delay(record_id) count += 1 except Exception: - logger.error('recover_stuck_tasks: failed to dispatch record=%s', record.id) + logger.error('recover_stuck_tasks: failed to dispatch record=%s', record_id) if count: - logger.info('recover_stuck_tasks: re-dispatched %d stuck tasks', count) + logger.info('recover_stuck_tasks: dispatched %d active tasks', count) def _handle_failed(record, ark_resp): diff --git a/backend/config/settings.py b/backend/config/settings.py index b9a1127..d627a94 100644 --- a/backend/config/settings.py +++ b/backend/config/settings.py @@ -182,7 +182,7 @@ CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_BEAT_SCHEDULE = { 'recover-stuck-tasks': { 'task': 'apps.generation.tasks.recover_stuck_tasks', - 'schedule': 180, # 每 3 分钟 + 'schedule': 30, # 每 30 秒 }, }