diff --git a/backend/apps/generation/tasks.py b/backend/apps/generation/tasks.py index fe6cc51..d24eded 100644 --- a/backend/apps/generation/tasks.py +++ b/backend/apps/generation/tasks.py @@ -79,10 +79,9 @@ def poll_video_task(self, record_id): continue # retry on next interval if new_status in ('queued', 'processing'): - # Still running, update status and continue - if record.status != new_status: - record.status = new_status - record.save(update_fields=['status']) + # Still running, update status and touch updated_at + record.status = new_status + record.save(update_fields=['status', 'updated_at']) continue # Terminal state reached — process result @@ -135,6 +134,33 @@ def _handle_completed(record, ark_resp): _release_freeze(record) +@shared_task(ignore_result=True) +def recover_stuck_tasks(): + """定时扫描卡在 processing/queued 超过 10 分钟的任务,重新派发轮询。""" + from datetime import timedelta + from django.utils import timezone + from apps.generation.models import GenerationRecord + + cutoff = timezone.now() - timedelta(minutes=10) + stuck_records = GenerationRecord.objects.filter( + status__in=('queued', 'processing'), + ark_task_id__isnull=False, + updated_at__lt=cutoff, # updated_at 超过 10 分钟没更新,说明没有 worker 在轮询 + ).exclude(ark_task_id='') + + count = 0 + for record in stuck_records: + logger.warning('recover_stuck_tasks: re-dispatching record=%s ark=%s', record.id, record.ark_task_id) + try: + poll_video_task.delay(record.id) + count += 1 + except Exception: + logger.error('recover_stuck_tasks: failed to dispatch record=%s', record.id) + + if count: + logger.info('recover_stuck_tasks: re-dispatched %d stuck tasks', count) + + def _handle_failed(record, ark_resp): """Process a failed task: record error and release frozen amount.""" from utils.airdrama_client import ERROR_MESSAGES diff --git a/backend/apps/generation/views.py b/backend/apps/generation/views.py index da62652..fa3f40e 100644 --- a/backend/apps/generation/views.py +++ b/backend/apps/generation/views.py @@ -372,12 +372,19 @@ def video_generate_view(request): record.ark_task_id = ark_task_id record.status = 'processing' record.save(update_fields=['ark_task_id', 'status']) - # 触发后端异步轮询(连不上 Redis 时静默降级,前端轮询兜底) + # 触发后端异步轮询 try: from apps.generation.tasks import poll_video_task poll_video_task.delay(record.id) except Exception: - logger.debug('Celery not available, falling back to frontend polling') + logger.warning('Celery dispatch failed for record %s, retrying once...', record.id) + import time + time.sleep(1) + try: + from apps.generation.tasks import poll_video_task as _poll + _poll.delay(record.id) + except Exception: + logger.error('Celery dispatch failed twice for record %s, relying on recovery task', record.id) except Exception as e: logger.exception('AirDrama API create task failed') record.status = 'failed' diff --git a/backend/config/settings.py b/backend/config/settings.py index 4e99e8b..58456fc 100644 --- a/backend/config/settings.py +++ b/backend/config/settings.py @@ -173,12 +173,18 @@ CSRF_TRUSTED_ORIGINS = [o for o in CORS_ALLOWED_ORIGINS if o.startswith('https:/ # ────────────────────────────────────────────── # Celery (async task queue) # ────────────────────────────────────────────── -CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://:vAhRnAA6VMco@r-7xvat0vez5clwbzk5vpd.redis.rds.aliyuncs.com:6379/8') +CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://:vAhRnAA6VMco@redis-cngzyc2r77ka16g7a.redis.ivolces.com:6379/0') CELERY_RESULT_BACKEND = CELERY_BROKER_URL CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' +CELERY_BEAT_SCHEDULE = { + 'recover-stuck-tasks': { + 'task': 'apps.generation.tasks.recover_stuck_tasks', + 'schedule': 600, # 每 10 分钟 + }, +} LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' diff --git a/k8s/backend-deployment.yaml b/k8s/backend-deployment.yaml index 2644741..b5d4fcd 100644 --- a/k8s/backend-deployment.yaml +++ b/k8s/backend-deployment.yaml @@ -56,7 +56,7 @@ spec: value: "3306" # Redis (Celery broker) - name: REDIS_URL - value: "redis://:vAhRnAA6VMco@r-7xvat0vez5clwbzk5vpd.redis.rds.aliyuncs.com:6379/8" + value: "redis://:vAhRnAA6VMco@redis-cngzyc2r77ka16g7a.redis.ivolces.com:6379/0" # CORS - name: CORS_ALLOWED_ORIGINS value: "https://airflow-studio.airlabs.art" diff --git a/k8s/celery-deployment.yaml b/k8s/celery-deployment.yaml index 49b6f0f..f7de818 100644 --- a/k8s/celery-deployment.yaml +++ b/k8s/celery-deployment.yaml @@ -20,7 +20,7 @@ spec: - name: celery-worker image: ${CI_REGISTRY_IMAGE}/video-backend:latest imagePullPolicy: Always - command: ["celery", "-A", "config", "worker", "--loglevel=info", "--concurrency=4"] + command: ["celery", "-A", "config", "worker", "--loglevel=info", "--concurrency=4", "-B"] env: - name: USE_MYSQL value: "true" @@ -35,7 +35,7 @@ spec: key: DJANGO_SECRET_KEY # Redis - name: REDIS_URL - value: "redis://:vAhRnAA6VMco@r-7xvat0vez5clwbzk5vpd.redis.rds.aliyuncs.com:6379/8" + value: "redis://:vAhRnAA6VMco@redis-cngzyc2r77ka16g7a.redis.ivolces.com:6379/0" # Database (Aliyun RDS) - name: DB_HOST valueFrom: