diff --git a/.gitea/workflows/deploy.yaml b/.gitea/workflows/deploy.yaml index 5b5954c..f0cf377 100644 --- a/.gitea/workflows/deploy.yaml +++ b/.gitea/workflows/deploy.yaml @@ -61,10 +61,11 @@ jobs: # Replace image placeholders in yaml files sed -i "s|\${CI_REGISTRY_IMAGE}/video-backend:latest|${SWR_IMAGE}/video-backend:latest|g" k8s/backend-deployment.yaml + sed -i "s|\${CI_REGISTRY_IMAGE}/video-backend:latest|${SWR_IMAGE}/video-backend:latest|g" k8s/celery-deployment.yaml sed -i "s|\${CI_REGISTRY_IMAGE}/video-web:latest|${SWR_IMAGE}/video-web:latest|g" k8s/web-deployment.yaml # Copy k8s manifests to server - scp -o StrictHostKeyChecking=no k8s/backend-deployment.yaml k8s/web-deployment.yaml k8s/ingress.yaml root@${{ secrets.K3S_HOST }}:/tmp/ + scp -o StrictHostKeyChecking=no k8s/backend-deployment.yaml k8s/web-deployment.yaml k8s/ingress.yaml k8s/redis-deployment.yaml k8s/celery-deployment.yaml root@${{ secrets.K3S_HOST }}:/tmp/ # Create/update secrets and apply manifests on server set -o pipefail @@ -83,7 +84,9 @@ jobs: --from-literal=ALIYUN_SMS_ACCESS_SECRET='${{ secrets.ALIYUN_SMS_ACCESS_SECRET }}' \ --dry-run=client -o yaml | kubectl apply -f - + kubectl apply -f /tmp/redis-deployment.yaml kubectl apply -f /tmp/backend-deployment.yaml + kubectl apply -f /tmp/celery-deployment.yaml kubectl apply -f /tmp/web-deployment.yaml kubectl apply -f /tmp/ingress.yaml @@ -91,9 +94,10 @@ jobs: kubectl patch svc traefik -n kube-system -p '{"spec":{"externalTrafficPolicy":"Local"}}' 2>/dev/null || true kubectl rollout restart deployment/video-backend + kubectl rollout restart deployment/celery-worker kubectl rollout restart deployment/video-web - rm -f /tmp/backend-deployment.yaml /tmp/web-deployment.yaml /tmp/ingress.yaml + rm -f /tmp/backend-deployment.yaml /tmp/web-deployment.yaml /tmp/ingress.yaml /tmp/redis-deployment.yaml /tmp/celery-deployment.yaml ENDSSH # ===== Log Center: failure reporting ===== diff --git a/backend/apps/generation/management/__init__.py b/backend/apps/generation/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/apps/generation/management/commands/__init__.py b/backend/apps/generation/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/apps/generation/management/commands/poll_stuck_tasks.py b/backend/apps/generation/management/commands/poll_stuck_tasks.py new file mode 100644 index 0000000..c7bb9a2 --- /dev/null +++ b/backend/apps/generation/management/commands/poll_stuck_tasks.py @@ -0,0 +1,108 @@ +"""Management command to poll stuck tasks and update their status. + +This is a fallback for when Celery workers miss tasks or aren't running. +Run via cron or K8s CronJob: python manage.py poll_stuck_tasks +""" + +import logging + +from django.core.management.base import BaseCommand +from django.utils import timezone + +from apps.generation.models import GenerationRecord +from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = 'Poll Volcano API for stuck queued/processing tasks and update their status.' + + def handle(self, *args, **options): + stuck = GenerationRecord.objects.filter(status__in=['queued', 'processing']) + count = stuck.count() + + if count == 0: + self.stdout.write('No stuck tasks found.') + return + + self.stdout.write(f'Found {count} stuck task(s), polling...') + + resolved = 0 + for record in stuck: + ark_task_id = record.ark_task_id + + # No ark_task_id means API submission failed — mark as failed + if not ark_task_id: + record.status = 'failed' + record.error_message = '任务提交失败(系统清理)' + record.completed_at = timezone.now() + record.save(update_fields=['status', 'error_message', 'completed_at']) + from apps.generation.views import _release_freeze + _release_freeze(record) + resolved += 1 + self.stdout.write(f' [{record.id}] no ark_task_id -> marked failed') + continue + + # Poll Volcano API + try: + ark_resp = query_task(ark_task_id) + new_status = map_status(ark_resp.get('status', '')) + except Exception as e: + self.stdout.write(f' [{record.id}] ark={ark_task_id} API error: {e}') + continue + + if new_status in ('queued', 'processing'): + self.stdout.write(f' [{record.id}] ark={ark_task_id} still {new_status}') + continue + + # Terminal state — process + record.status = new_status + returned_seed = ark_resp.get('seed') + if returned_seed is not None: + record.seed = returned_seed + + if new_status == 'completed': + video_url = extract_video_url(ark_resp) + if video_url: + try: + from utils.tos_client import upload_from_url + record.result_url = upload_from_url(video_url, folder='results') + except Exception: + logger.exception('Failed to persist video to TOS') + record.result_url = video_url + + usage = ark_resp.get('usage', {}) + total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 + if total_tokens > 0: + from apps.generation.views import _settle_payment + _settle_payment(record, total_tokens) + else: + from apps.generation.views import _release_freeze + _release_freeze(record) + + elif new_status == 'failed': + error = ark_resp.get('error', {}) + code = error.get('code', '') if isinstance(error, dict) else '' + raw_msg = error.get('message', '') if isinstance(error, dict) else str(error) + record.error_message = ERROR_MESSAGES.get(code, raw_msg) + record.raw_error = f'{code}: {raw_msg}' if code else raw_msg + + usage = ark_resp.get('usage', {}) + total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 + if total_tokens > 0: + from apps.generation.views import _settle_payment + _settle_payment(record, total_tokens) + else: + from apps.generation.views import _release_freeze + _release_freeze(record) + + record.completed_at = timezone.now() + record.save(update_fields=[ + 'status', 'result_url', 'error_message', 'raw_error', + 'seed', 'completed_at', + ]) + resolved += 1 + self.stdout.write(f' [{record.id}] ark={ark_task_id} -> {new_status}') + + self.stdout.write(f'Done. Resolved {resolved}/{count} tasks.') diff --git a/backend/apps/generation/tasks.py b/backend/apps/generation/tasks.py new file mode 100644 index 0000000..fe6cc51 --- /dev/null +++ b/backend/apps/generation/tasks.py @@ -0,0 +1,155 @@ +"""Celery tasks for async video generation polling.""" + +import logging +import time + +from celery import shared_task + +logger = logging.getLogger(__name__) + +# 渐进式轮询间隔 +# 前 2 分钟:每 5 秒 (24 次) +# 2-10 分钟:每 15 秒 (32 次) +# 10 分钟后:每 30 秒 (无限) +POLL_SCHEDULE = [ + (120, 5), # 0-120s: every 5s + (600, 15), # 120-600s: every 15s + (None, 30), # 600s+: every 30s +] + + +@shared_task(bind=True, max_retries=0, ignore_result=True) +def poll_video_task(self, record_id): + """Poll Volcano API for a video generation task until it reaches a terminal state. + + This is the server-side counterpart to the frontend polling. + It runs independently of the browser — even if the user closes the page, + this task keeps polling until Volcano returns completed or failed. + """ + from django.utils import timezone + from apps.generation.models import GenerationRecord + from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES + + try: + record = GenerationRecord.objects.get(pk=record_id) + except GenerationRecord.DoesNotExist: + logger.warning('poll_video_task: record %s not found', record_id) + return + + ark_task_id = record.ark_task_id + if not ark_task_id: + logger.warning('poll_video_task: record %s has no ark_task_id', record_id) + return + + if record.status not in ('queued', 'processing'): + logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) + return + + elapsed = 0 + logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id) + + while True: + # 计算当前间隔 + interval = POLL_SCHEDULE[-1][1] # default to last + for threshold, iv in POLL_SCHEDULE: + if threshold is None or elapsed < threshold: + interval = iv + break + + time.sleep(interval) + elapsed += interval + + # Re-fetch record to check if frontend already updated it + try: + record.refresh_from_db() + except GenerationRecord.DoesNotExist: + logger.info('poll_video_task: record %s deleted during polling', record_id) + return + + if record.status not in ('queued', 'processing'): + logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status) + return + + # Poll Volcano API + try: + ark_resp = query_task(ark_task_id) + new_status = map_status(ark_resp.get('status', '')) + except Exception: + logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id) + continue # retry on next interval + + if new_status in ('queued', 'processing'): + # Still running, update status and continue + if record.status != new_status: + record.status = new_status + record.save(update_fields=['status']) + continue + + # Terminal state reached — process result + record.status = new_status + + # Save seed + returned_seed = ark_resp.get('seed') + if returned_seed is not None: + record.seed = returned_seed + + if new_status == 'completed': + _handle_completed(record, ark_resp) + elif new_status == 'failed': + _handle_failed(record, ark_resp) + + record.completed_at = timezone.now() + record.save(update_fields=[ + 'status', 'result_url', 'error_message', 'raw_error', + 'seed', 'completed_at', + ]) + + logger.info( + 'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds', + record_id, ark_task_id, new_status, elapsed, + ) + return + + +def _handle_completed(record, ark_resp): + """Process a completed task: persist video to TOS and settle payment.""" + from utils.airdrama_client import extract_video_url + + video_url = extract_video_url(ark_resp) + if video_url: + try: + from utils.tos_client import upload_from_url + record.result_url = upload_from_url(video_url, folder='results') + except Exception: + logger.exception('poll_video_task: failed to persist video to TOS') + record.result_url = video_url + + # 结算:按实际 tokens 扣费 + usage = ark_resp.get('usage', {}) + total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 + if total_tokens > 0: + from apps.generation.views import _settle_payment + _settle_payment(record, total_tokens) + else: + from apps.generation.views import _release_freeze + _release_freeze(record) + + +def _handle_failed(record, ark_resp): + """Process a failed task: record error and release frozen amount.""" + from utils.airdrama_client import ERROR_MESSAGES + + error = ark_resp.get('error', {}) + code = error.get('code', '') if isinstance(error, dict) else '' + raw_msg = error.get('message', '') if isinstance(error, dict) else str(error) + record.error_message = ERROR_MESSAGES.get(code, raw_msg) + record.raw_error = f'{code}: {raw_msg}' if code else raw_msg + + usage = ark_resp.get('usage', {}) + total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0 + if total_tokens > 0: + from apps.generation.views import _settle_payment + _settle_payment(record, total_tokens) + else: + from apps.generation.views import _release_freeze + _release_freeze(record) diff --git a/backend/apps/generation/views.py b/backend/apps/generation/views.py index ca9b259..51d2b16 100644 --- a/backend/apps/generation/views.py +++ b/backend/apps/generation/views.py @@ -363,6 +363,12 @@ def video_generate_view(request): record.ark_task_id = ark_task_id record.status = 'processing' record.save(update_fields=['ark_task_id', 'status']) + # 触发后端异步轮询(连不上 Redis 时静默降级,前端轮询兜底) + try: + from apps.generation.tasks import poll_video_task + poll_video_task.delay(record.id) + except Exception: + logger.debug('Celery not available, falling back to frontend polling') except Exception as e: logger.exception('AirDrama API create task failed') record.status = 'failed' diff --git a/backend/config/__init__.py b/backend/config/__init__.py index 0183ce0..8de938a 100644 --- a/backend/config/__init__.py +++ b/backend/config/__init__.py @@ -3,3 +3,10 @@ try: pymysql.install_as_MySQLdb() except ImportError: pass # Docker uses mysqlclient natively + +# Celery app — import so that @shared_task uses this app +try: + from .celery import app as celery_app + __all__ = ('celery_app',) +except ImportError: + pass # celery not installed (local dev without redis) diff --git a/backend/config/celery.py b/backend/config/celery.py new file mode 100644 index 0000000..1b06cad --- /dev/null +++ b/backend/config/celery.py @@ -0,0 +1,10 @@ +"""Celery configuration for AirDrama backend.""" + +import os +from celery import Celery + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') + +app = Celery('airdrama') +app.config_from_object('django.conf:settings', namespace='CELERY') +app.autodiscover_tasks(['apps.generation']) diff --git a/backend/config/settings.py b/backend/config/settings.py index adc8b0f..4a72c89 100644 --- a/backend/config/settings.py +++ b/backend/config/settings.py @@ -170,6 +170,16 @@ CORS_ALLOW_CREDENTIALS = True CSRF_TRUSTED_ORIGINS = [o for o in CORS_ALLOWED_ORIGINS if o.startswith('https://')] +# ────────────────────────────────────────────── +# Celery (async task queue) +# ────────────────────────────────────────────── +CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') +CELERY_RESULT_BACKEND = CELERY_BROKER_URL +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' +CELERY_TIMEZONE = 'Asia/Shanghai' + LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True diff --git a/backend/requirements.txt b/backend/requirements.txt index 87df2ee..a2e2f1c 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -9,3 +9,5 @@ requests>=2.31,<3.0 ip-region>=1.0 volcengine>=1.0.218 Pillow>=10.0 +celery>=5.3,<6.0 +redis>=5.0,<6.0 diff --git a/k8s/backend-deployment.yaml b/k8s/backend-deployment.yaml index 296a100..c1da0d1 100644 --- a/k8s/backend-deployment.yaml +++ b/k8s/backend-deployment.yaml @@ -54,6 +54,9 @@ spec: key: DB_PASSWORD - name: DB_PORT value: "3306" + # Redis (Celery broker) + - name: REDIS_URL + value: "redis://redis-service:6379/0" # CORS - name: CORS_ALLOWED_ORIGINS value: "https://airflow-studio.airlabs.art" diff --git a/k8s/celery-deployment.yaml b/k8s/celery-deployment.yaml new file mode 100644 index 0000000..b1fff21 --- /dev/null +++ b/k8s/celery-deployment.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: celery-worker + labels: + app: celery-worker +spec: + replicas: 1 + selector: + matchLabels: + app: celery-worker + template: + metadata: + labels: + app: celery-worker + spec: + imagePullSecrets: + - name: swr-secret + containers: + - name: celery-worker + image: ${CI_REGISTRY_IMAGE}/video-backend:latest + imagePullPolicy: Always + command: ["celery", "-A", "config", "worker", "--loglevel=info", "--concurrency=4"] + env: + - name: USE_MYSQL + value: "true" + - name: DJANGO_DEBUG + value: "False" + - name: DJANGO_ALLOWED_HOSTS + value: "*" + - name: DJANGO_SECRET_KEY + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: DJANGO_SECRET_KEY + # Redis + - name: REDIS_URL + value: "redis://redis-service:6379/0" + # Database (Aliyun RDS) + - name: DB_HOST + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: DB_HOST + - name: DB_NAME + value: "video_auto" + - name: DB_USER + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: DB_USER + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: DB_PASSWORD + - name: DB_PORT + value: "3306" + # TOS (from Secret) + - name: TOS_ACCESS_KEY + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: TOS_ACCESS_KEY + - name: TOS_SECRET_KEY + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: TOS_SECRET_KEY + - name: TOS_BUCKET + value: "airdrama-media" + - name: TOS_ENDPOINT + value: "https://tos-cn-beijing.volces.com" + - name: TOS_REGION + value: "cn-beijing" + - name: TOS_CDN_DOMAIN + value: "https://airdrama-media.tos-cn-beijing.volces.com" + # Seedance API (from Secret) + - name: ARK_API_KEY + valueFrom: + secretKeyRef: + name: video-backend-secrets + key: ARK_API_KEY + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" diff --git a/k8s/redis-deployment.yaml b/k8s/redis-deployment.yaml new file mode 100644 index 0000000..b603285 --- /dev/null +++ b/k8s/redis-deployment.yaml @@ -0,0 +1,40 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis + labels: + app: redis +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:7-alpine + ports: + - containerPort: 6379 + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "128Mi" + cpu: "100m" +--- +apiVersion: v1 +kind: Service +metadata: + name: redis-service +spec: + selector: + app: redis + ports: + - protocol: TCP + port: 6379 + targetPort: 6379