From ca6f2a0346191a28722677e1a1f7296edc025415 Mon Sep 17 00:00:00 2001 From: zyc <1439655764@qq.com> Date: Sat, 4 Apr 2026 14:21:38 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0=20Redis=20=E5=88=86?= =?UTF-8?q?=E5=B8=83=E5=BC=8F=E9=94=81=E9=98=B2=E6=AD=A2=20poll=5Fvideo=5F?= =?UTF-8?q?task=20=E9=87=8D=E5=A4=8D=E6=B4=BE=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit recover_stuck_tasks 在 API 超时 >3 分钟时可能重复派发同一任务, 导致重复扣费风险。通过 cache.add 实现互斥锁保护。 Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/apps/generation/tasks.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/backend/apps/generation/tasks.py b/backend/apps/generation/tasks.py index 9e7384c..522001e 100644 --- a/backend/apps/generation/tasks.py +++ b/backend/apps/generation/tasks.py @@ -21,19 +21,29 @@ def poll_video_task(self, record_id): from apps.generation.models import GenerationRecord from utils.airdrama_client import query_task, map_status + # 防重复:同一 record 同一时刻只允许一个 poll 在执行 + from django.core.cache import cache + lock_key = f'poll_lock:{record_id}' + if not cache.add(lock_key, '1', timeout=POLL_INTERVAL * 3): + logger.info('poll_video_task: record %s already being polled, skipping', record_id) + return + try: record = GenerationRecord.objects.get(pk=record_id) except GenerationRecord.DoesNotExist: logger.warning('poll_video_task: record %s not found', record_id) + cache.delete(lock_key) return ark_task_id = record.ark_task_id if not ark_task_id: logger.warning('poll_video_task: record %s has no ark_task_id', record_id) + cache.delete(lock_key) return if record.status not in ('queued', 'processing'): logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) + cache.delete(lock_key) return # Poll Volcano API @@ -42,12 +52,14 @@ def poll_video_task(self, record_id): new_status = map_status(ark_resp.get('status', '')) except Exception: logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) + cache.delete(lock_key) raise self.retry(countdown=POLL_INTERVAL) if new_status in ('queued', 'processing'): # Still running — update status, then re-enqueue record.status = new_status record.save(update_fields=['status', 'updated_at']) + cache.delete(lock_key) raise self.retry(countdown=POLL_INTERVAL) # Terminal state reached — process result