zyc 6853b08fc9
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m16s
refactor: 切换 Celery broker 为火山引擎 Redis + 僵尸任务自动恢复
- Redis 从阿里云切换到火山引擎(同区域低延迟)
- delay() 失败改为 warning 日志 + 重试一次(不再静默吞异常)
- 新增 recover_stuck_tasks 定时任务,每10分钟扫描卡住的任务重新派发
- 轮询时每次 touch updated_at,防止活跃任务被误判为僵尸
- Celery worker 启用内嵌 Beat 调度器(-B 参数)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 10:26:04 +08:00

182 lines
6.4 KiB
Python

"""Celery tasks for async video generation polling."""
import logging
import time
from celery import shared_task
logger = logging.getLogger(__name__)
# 渐进式轮询间隔
# 前 2 分钟:每 5 秒 (24 次)
# 2-10 分钟:每 15 秒 (32 次)
# 10 分钟后:每 30 秒 (无限)
POLL_SCHEDULE = [
(120, 5), # 0-120s: every 5s
(600, 15), # 120-600s: every 15s
(None, 30), # 600s+: every 30s
]
@shared_task(bind=True, max_retries=0, ignore_result=True)
def poll_video_task(self, record_id):
"""Poll Volcano API for a video generation task until it reaches a terminal state.
This is the server-side counterpart to the frontend polling.
It runs independently of the browser — even if the user closes the page,
this task keeps polling until Volcano returns completed or failed.
"""
from django.utils import timezone
from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES
try:
record = GenerationRecord.objects.get(pk=record_id)
except GenerationRecord.DoesNotExist:
logger.warning('poll_video_task: record %s not found', record_id)
return
ark_task_id = record.ark_task_id
if not ark_task_id:
logger.warning('poll_video_task: record %s has no ark_task_id', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
return
elapsed = 0
logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id)
while True:
# 计算当前间隔
interval = POLL_SCHEDULE[-1][1] # default to last
for threshold, iv in POLL_SCHEDULE:
if threshold is None or elapsed < threshold:
interval = iv
break
time.sleep(interval)
elapsed += interval
# Re-fetch record to check if frontend already updated it
try:
record.refresh_from_db()
except GenerationRecord.DoesNotExist:
logger.info('poll_video_task: record %s deleted during polling', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status)
return
# Poll Volcano API
try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
continue # retry on next interval
if new_status in ('queued', 'processing'):
# Still running, update status and touch updated_at
record.status = new_status
record.save(update_fields=['status', 'updated_at'])
continue
# Terminal state reached — process result
record.status = new_status
# Save seed
returned_seed = ark_resp.get('seed')
if returned_seed is not None:
record.seed = returned_seed
if new_status == 'completed':
_handle_completed(record, ark_resp)
elif new_status == 'failed':
_handle_failed(record, ark_resp)
record.completed_at = timezone.now()
record.save(update_fields=[
'status', 'result_url', 'error_message', 'raw_error',
'seed', 'completed_at',
])
logger.info(
'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds',
record_id, ark_task_id, new_status, elapsed,
)
return
def _handle_completed(record, ark_resp):
"""Process a completed task: persist video to TOS and settle payment."""
from utils.airdrama_client import extract_video_url
video_url = extract_video_url(ark_resp)
if video_url:
try:
from utils.tos_client import upload_from_url
record.result_url = upload_from_url(video_url, folder='results')
except Exception:
logger.exception('poll_video_task: failed to persist video to TOS')
record.result_url = video_url
# 结算:按实际 tokens 扣费
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)
@shared_task(ignore_result=True)
def recover_stuck_tasks():
"""定时扫描卡在 processing/queued 超过 10 分钟的任务,重新派发轮询。"""
from datetime import timedelta
from django.utils import timezone
from apps.generation.models import GenerationRecord
cutoff = timezone.now() - timedelta(minutes=10)
stuck_records = GenerationRecord.objects.filter(
status__in=('queued', 'processing'),
ark_task_id__isnull=False,
updated_at__lt=cutoff, # updated_at 超过 10 分钟没更新,说明没有 worker 在轮询
).exclude(ark_task_id='')
count = 0
for record in stuck_records:
logger.warning('recover_stuck_tasks: re-dispatching record=%s ark=%s', record.id, record.ark_task_id)
try:
poll_video_task.delay(record.id)
count += 1
except Exception:
logger.error('recover_stuck_tasks: failed to dispatch record=%s', record.id)
if count:
logger.info('recover_stuck_tasks: re-dispatched %d stuck tasks', count)
def _handle_failed(record, ark_resp):
"""Process a failed task: record error and release frozen amount."""
from utils.airdrama_client import ERROR_MESSAGES
error = ark_resp.get('error', {})
code = error.get('code', '') if isinstance(error, dict) else ''
raw_msg = error.get('message', '') if isinstance(error, dict) else str(error)
record.error_message = ERROR_MESSAGES.get(code, raw_msg)
record.raw_error = f'{code}: {raw_msg}' if code else raw_msg
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)