"""Celery tasks for async video generation polling.""" import logging import time from celery import shared_task logger = logging.getLogger(__name__) # 渐进式轮询间隔 # 前 2 分钟:每 5 秒 (24 次) # 2-10 分钟:每 15 秒 (32 次) # 10 分钟后:每 30 秒 (无限) POLL_SCHEDULE = [ (120, 5), # 0-120s: every 5s (600, 15), # 120-600s: every 15s (None, 30), # 600s+: every 30s ] @shared_task(bind=True, max_retries=0, ignore_result=True) def poll_video_task(self, record_id): """Poll Volcano API for a video generation task until it reaches a terminal state. This is the server-side counterpart to the frontend polling. It runs independently of the browser — even if the user closes the page, this task keeps polling until Volcano returns completed or failed. """ from django.utils import timezone from apps.generation.models import GenerationRecord from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES try: record = GenerationRecord.objects.get(pk=record_id) except GenerationRecord.DoesNotExist: logger.warning('poll_video_task: record %s not found', record_id) return ark_task_id = record.ark_task_id if not ark_task_id: logger.warning('poll_video_task: record %s has no ark_task_id', record_id) return if record.status not in ('queued', 'processing'): logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) return elapsed = 0 logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id) while True: # 计算当前间隔 interval = POLL_SCHEDULE[-1][1] # default to last for threshold, iv in POLL_SCHEDULE: if threshold is None or elapsed < threshold: interval = iv break time.sleep(interval) elapsed += interval # Re-fetch record to check if frontend already updated it try: record.refresh_from_db() except GenerationRecord.DoesNotExist: logger.info('poll_video_task: record %s deleted during polling', record_id) return if record.status not in ('queued', 'processing'): logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status) return # Poll Volcano API try: ark_resp = query_task(ark_task_id) new_status = map_status(ark_resp.get('status', '')) except Exception: logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) continue # retry on next interval if new_status in ('queued', 'processing'): # Still running, update status and continue if record.status != new_status: record.status = new_status record.save(update_fields=['status']) continue # Terminal state reached — process result record.status = new_status # Save seed returned_seed = ark_resp.get('seed') if returned_seed is not None: record.seed = returned_seed if new_status == 'completed': _handle_completed(record, ark_resp) elif new_status == 'failed': _handle_failed(record, ark_resp) record.completed_at = timezone.now() record.save(update_fields=[ 'status', 'result_url', 'error_message', 'raw_error', 'seed', 'completed_at', ]) logger.info( 'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds', record_id, ark_task_id, new_status, elapsed, ) return def _handle_completed(record, ark_resp): """Process a completed task: persist video to TOS and settle payment.""" from utils.airdrama_client import extract_video_url video_url = extract_video_url(ark_resp) if video_url: try: from utils.tos_client import upload_from_url record.result_url = upload_from_url(video_url, folder='results') except Exception: logger.exception('poll_video_task: failed to persist video to TOS') record.result_url = video_url # 结算:按实际 tokens 扣费 usage = ark_resp.get('usage', {}) total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 if total_tokens > 0: from apps.generation.views import _settle_payment _settle_payment(record, total_tokens) else: from apps.generation.views import _release_freeze _release_freeze(record) def _handle_failed(record, ark_resp): """Process a failed task: record error and release frozen amount.""" from utils.airdrama_client import ERROR_MESSAGES error = ark_resp.get('error', {}) code = error.get('code', '') if isinstance(error, dict) else '' raw_msg = error.get('message', '') if isinstance(error, dict) else str(error) record.error_message = ERROR_MESSAGES.get(code, raw_msg) record.raw_error = f'{code}: {raw_msg}' if code else raw_msg usage = ark_resp.get('usage', {}) total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 if total_tokens > 0: from apps.generation.views import _settle_payment _settle_payment(record, total_tokens) else: from apps.generation.views import _release_freeze _release_freeze(record)