diff --git a/backend/apps/generation/tasks.py b/backend/apps/generation/tasks.py index 9e7384c..522001e 100644 --- a/backend/apps/generation/tasks.py +++ b/backend/apps/generation/tasks.py @@ -21,19 +21,29 @@ def poll_video_task(self, record_id): from apps.generation.models import GenerationRecord from utils.airdrama_client import query_task, map_status + # 防重复:同一 record 同一时刻只允许一个 poll 在执行 + from django.core.cache import cache + lock_key = f'poll_lock:{record_id}' + if not cache.add(lock_key, '1', timeout=POLL_INTERVAL * 3): + logger.info('poll_video_task: record %s already being polled, skipping', record_id) + return + try: record = GenerationRecord.objects.get(pk=record_id) except GenerationRecord.DoesNotExist: logger.warning('poll_video_task: record %s not found', record_id) + cache.delete(lock_key) return ark_task_id = record.ark_task_id if not ark_task_id: logger.warning('poll_video_task: record %s has no ark_task_id', record_id) + cache.delete(lock_key) return if record.status not in ('queued', 'processing'): logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) + cache.delete(lock_key) return # Poll Volcano API @@ -42,12 +52,14 @@ def poll_video_task(self, record_id): new_status = map_status(ark_resp.get('status', '')) except Exception: logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) + cache.delete(lock_key) raise self.retry(countdown=POLL_INTERVAL) if new_status in ('queued', 'processing'): # Still running — update status, then re-enqueue record.status = new_status record.save(update_fields=['status', 'updated_at']) + cache.delete(lock_key) raise self.retry(countdown=POLL_INTERVAL) # Terminal state reached — process result