fix: 添加 Redis 分布式锁防止 poll_video_task 重复派发
Some checks failed
Build and Deploy / build-and-deploy (push) Failing after 6m17s

recover_stuck_tasks 在 API 超时 >3 分钟时可能重复派发同一任务,
导致重复扣费风险。通过 cache.add 实现互斥锁保护。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
zyc 2026-04-04 14:21:38 +08:00
parent 55c26fb1f5
commit ca6f2a0346

View File

@ -21,19 +21,29 @@ def poll_video_task(self, record_id):
from apps.generation.models import GenerationRecord from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status from utils.airdrama_client import query_task, map_status
# 防重复:同一 record 同一时刻只允许一个 poll 在执行
from django.core.cache import cache
lock_key = f'poll_lock:{record_id}'
if not cache.add(lock_key, '1', timeout=POLL_INTERVAL * 3):
logger.info('poll_video_task: record %s already being polled, skipping', record_id)
return
try: try:
record = GenerationRecord.objects.get(pk=record_id) record = GenerationRecord.objects.get(pk=record_id)
except GenerationRecord.DoesNotExist: except GenerationRecord.DoesNotExist:
logger.warning('poll_video_task: record %s not found', record_id) logger.warning('poll_video_task: record %s not found', record_id)
cache.delete(lock_key)
return return
ark_task_id = record.ark_task_id ark_task_id = record.ark_task_id
if not ark_task_id: if not ark_task_id:
logger.warning('poll_video_task: record %s has no ark_task_id', record_id) logger.warning('poll_video_task: record %s has no ark_task_id', record_id)
cache.delete(lock_key)
return return
if record.status not in ('queued', 'processing'): if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
cache.delete(lock_key)
return return
# Poll Volcano API # Poll Volcano API
@ -42,12 +52,14 @@ def poll_video_task(self, record_id):
new_status = map_status(ark_resp.get('status', '')) new_status = map_status(ark_resp.get('status', ''))
except Exception: except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
cache.delete(lock_key)
raise self.retry(countdown=POLL_INTERVAL) raise self.retry(countdown=POLL_INTERVAL)
if new_status in ('queued', 'processing'): if new_status in ('queued', 'processing'):
# Still running — update status, then re-enqueue # Still running — update status, then re-enqueue
record.status = new_status record.status = new_status
record.save(update_fields=['status', 'updated_at']) record.save(update_fields=['status', 'updated_at'])
cache.delete(lock_key)
raise self.retry(countdown=POLL_INTERVAL) raise self.retry(countdown=POLL_INTERVAL)
# Terminal state reached — process result # Terminal state reached — process result