seaislee1209 7a358ea9ef
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m25s
fix: v0.14.3 GenerationRecord 加 updated_at + 轮询改固定5秒
- GenerationRecord 加 updated_at 字段(之前只在 QuotaConfig 上,Celery 查 GenerationRecord 报 FieldError)
- 后端轮询间隔从渐进式(5s→15s→30s)改为全程固定 5 秒(RPM 12000 足够,400 并发仅用 40%)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 18:33:29 +08:00

168 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Celery tasks for async video generation polling."""
import logging
import time
from celery import shared_task
logger = logging.getLogger(__name__)
# 固定轮询间隔:全程每 5 秒RPM 12000 足够400 并发仅用 40%
POLL_INTERVAL = 5
@shared_task(bind=True, max_retries=0, ignore_result=True)
def poll_video_task(self, record_id):
"""Poll Volcano API for a video generation task until it reaches a terminal state.
This is the server-side counterpart to the frontend polling.
It runs independently of the browser — even if the user closes the page,
this task keeps polling until Volcano returns completed or failed.
"""
from django.utils import timezone
from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES
try:
record = GenerationRecord.objects.get(pk=record_id)
except GenerationRecord.DoesNotExist:
logger.warning('poll_video_task: record %s not found', record_id)
return
ark_task_id = record.ark_task_id
if not ark_task_id:
logger.warning('poll_video_task: record %s has no ark_task_id', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
return
elapsed = 0
logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id)
while True:
time.sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL
# Re-fetch record to check if frontend already updated it
try:
record.refresh_from_db()
except GenerationRecord.DoesNotExist:
logger.info('poll_video_task: record %s deleted during polling', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status)
return
# Poll Volcano API
try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
continue # retry on next interval
if new_status in ('queued', 'processing'):
# Still running, update status and touch updated_at
record.status = new_status
record.save(update_fields=['status', 'updated_at'])
continue
# Terminal state reached — process result
record.status = new_status
# Save seed
returned_seed = ark_resp.get('seed')
if returned_seed is not None:
record.seed = returned_seed
if new_status == 'completed':
_handle_completed(record, ark_resp)
elif new_status == 'failed':
_handle_failed(record, ark_resp)
record.completed_at = timezone.now()
record.save(update_fields=[
'status', 'result_url', 'error_message', 'raw_error',
'seed', 'completed_at',
])
logger.info(
'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds',
record_id, ark_task_id, new_status, elapsed,
)
return
def _handle_completed(record, ark_resp):
"""Process a completed task: persist video to TOS and settle payment."""
from utils.airdrama_client import extract_video_url
video_url = extract_video_url(ark_resp)
if video_url:
try:
from utils.tos_client import upload_from_url
record.result_url = upload_from_url(video_url, folder='results')
except Exception:
logger.exception('poll_video_task: failed to persist video to TOS')
record.result_url = video_url
# 结算:按实际 tokens 扣费
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)
@shared_task(ignore_result=True)
def recover_stuck_tasks():
"""定时扫描卡在 processing/queued 超过 10 分钟的任务,重新派发轮询。"""
from datetime import timedelta
from django.utils import timezone
from apps.generation.models import GenerationRecord
cutoff = timezone.now() - timedelta(minutes=10)
stuck_records = GenerationRecord.objects.filter(
status__in=('queued', 'processing'),
ark_task_id__isnull=False,
updated_at__lt=cutoff, # updated_at 超过 10 分钟没更新,说明没有 worker 在轮询
).exclude(ark_task_id='')
count = 0
for record in stuck_records:
logger.warning('recover_stuck_tasks: re-dispatching record=%s ark=%s', record.id, record.ark_task_id)
try:
poll_video_task.delay(record.id)
count += 1
except Exception:
logger.error('recover_stuck_tasks: failed to dispatch record=%s', record.id)
if count:
logger.info('recover_stuck_tasks: re-dispatched %d stuck tasks', count)
def _handle_failed(record, ark_resp):
"""Process a failed task: record error and release frozen amount."""
from utils.airdrama_client import ERROR_MESSAGES
error = ark_resp.get('error', {})
code = error.get('code', '') if isinstance(error, dict) else ''
raw_msg = error.get('message', '') if isinstance(error, dict) else str(error)
record.error_message = ERROR_MESSAGES.get(code, raw_msg)
record.raw_error = f'{code}: {raw_msg}' if code else raw_msg
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)