feat: v0.14.0 后端异步轮询(Celery + Redis)
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled

- Celery 异步任务:任务提交后后端持续轮询火山 API 直到拿到终态,用户关浏览器也不会丢视频
- 渐进式轮询:前2分钟每5秒、2-10分钟每15秒、10分钟后每30秒
- 优雅降级:无 Redis 时静默跳过,不影响现有前端轮询
- K8s:新增 Redis Deployment + Service、Celery Worker Deployment
- CI/CD:deploy.yaml 自动部署 Redis/Celery,每次推代码自动重启 celery worker
- 兜底:poll_stuck_tasks management command 清理僵尸任务

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
seaislee1209 2026-03-25 13:27:16 +08:00
parent 911f3c158b
commit 60713ea009
13 changed files with 437 additions and 2 deletions

View File

@ -61,10 +61,11 @@ jobs:
# Replace image placeholders in yaml files
sed -i "s|\${CI_REGISTRY_IMAGE}/video-backend:latest|${SWR_IMAGE}/video-backend:latest|g" k8s/backend-deployment.yaml
sed -i "s|\${CI_REGISTRY_IMAGE}/video-backend:latest|${SWR_IMAGE}/video-backend:latest|g" k8s/celery-deployment.yaml
sed -i "s|\${CI_REGISTRY_IMAGE}/video-web:latest|${SWR_IMAGE}/video-web:latest|g" k8s/web-deployment.yaml
# Copy k8s manifests to server
scp -o StrictHostKeyChecking=no k8s/backend-deployment.yaml k8s/web-deployment.yaml k8s/ingress.yaml root@${{ secrets.K3S_HOST }}:/tmp/
scp -o StrictHostKeyChecking=no k8s/backend-deployment.yaml k8s/web-deployment.yaml k8s/ingress.yaml k8s/redis-deployment.yaml k8s/celery-deployment.yaml root@${{ secrets.K3S_HOST }}:/tmp/
# Create/update secrets and apply manifests on server
set -o pipefail
@ -83,7 +84,9 @@ jobs:
--from-literal=ALIYUN_SMS_ACCESS_SECRET='${{ secrets.ALIYUN_SMS_ACCESS_SECRET }}' \
--dry-run=client -o yaml | kubectl apply -f -
kubectl apply -f /tmp/redis-deployment.yaml
kubectl apply -f /tmp/backend-deployment.yaml
kubectl apply -f /tmp/celery-deployment.yaml
kubectl apply -f /tmp/web-deployment.yaml
kubectl apply -f /tmp/ingress.yaml
@ -91,9 +94,10 @@ jobs:
kubectl patch svc traefik -n kube-system -p '{"spec":{"externalTrafficPolicy":"Local"}}' 2>/dev/null || true
kubectl rollout restart deployment/video-backend
kubectl rollout restart deployment/celery-worker
kubectl rollout restart deployment/video-web
rm -f /tmp/backend-deployment.yaml /tmp/web-deployment.yaml /tmp/ingress.yaml
rm -f /tmp/backend-deployment.yaml /tmp/web-deployment.yaml /tmp/ingress.yaml /tmp/redis-deployment.yaml /tmp/celery-deployment.yaml
ENDSSH
# ===== Log Center: failure reporting =====

View File

@ -0,0 +1,108 @@
"""Management command to poll stuck tasks and update their status.
This is a fallback for when Celery workers miss tasks or aren't running.
Run via cron or K8s CronJob: python manage.py poll_stuck_tasks
"""
import logging
from django.core.management.base import BaseCommand
from django.utils import timezone
from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Poll Volcano API for stuck queued/processing tasks and update their status.'
def handle(self, *args, **options):
stuck = GenerationRecord.objects.filter(status__in=['queued', 'processing'])
count = stuck.count()
if count == 0:
self.stdout.write('No stuck tasks found.')
return
self.stdout.write(f'Found {count} stuck task(s), polling...')
resolved = 0
for record in stuck:
ark_task_id = record.ark_task_id
# No ark_task_id means API submission failed — mark as failed
if not ark_task_id:
record.status = 'failed'
record.error_message = '任务提交失败(系统清理)'
record.completed_at = timezone.now()
record.save(update_fields=['status', 'error_message', 'completed_at'])
from apps.generation.views import _release_freeze
_release_freeze(record)
resolved += 1
self.stdout.write(f' [{record.id}] no ark_task_id -> marked failed')
continue
# Poll Volcano API
try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception as e:
self.stdout.write(f' [{record.id}] ark={ark_task_id} API error: {e}')
continue
if new_status in ('queued', 'processing'):
self.stdout.write(f' [{record.id}] ark={ark_task_id} still {new_status}')
continue
# Terminal state — process
record.status = new_status
returned_seed = ark_resp.get('seed')
if returned_seed is not None:
record.seed = returned_seed
if new_status == 'completed':
video_url = extract_video_url(ark_resp)
if video_url:
try:
from utils.tos_client import upload_from_url
record.result_url = upload_from_url(video_url, folder='results')
except Exception:
logger.exception('Failed to persist video to TOS')
record.result_url = video_url
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)
elif new_status == 'failed':
error = ark_resp.get('error', {})
code = error.get('code', '') if isinstance(error, dict) else ''
raw_msg = error.get('message', '') if isinstance(error, dict) else str(error)
record.error_message = ERROR_MESSAGES.get(code, raw_msg)
record.raw_error = f'{code}: {raw_msg}' if code else raw_msg
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)
record.completed_at = timezone.now()
record.save(update_fields=[
'status', 'result_url', 'error_message', 'raw_error',
'seed', 'completed_at',
])
resolved += 1
self.stdout.write(f' [{record.id}] ark={ark_task_id} -> {new_status}')
self.stdout.write(f'Done. Resolved {resolved}/{count} tasks.')

View File

@ -0,0 +1,155 @@
"""Celery tasks for async video generation polling."""
import logging
import time
from celery import shared_task
logger = logging.getLogger(__name__)
# 渐进式轮询间隔
# 前 2 分钟:每 5 秒 (24 次)
# 2-10 分钟:每 15 秒 (32 次)
# 10 分钟后:每 30 秒 (无限)
POLL_SCHEDULE = [
(120, 5), # 0-120s: every 5s
(600, 15), # 120-600s: every 15s
(None, 30), # 600s+: every 30s
]
@shared_task(bind=True, max_retries=0, ignore_result=True)
def poll_video_task(self, record_id):
"""Poll Volcano API for a video generation task until it reaches a terminal state.
This is the server-side counterpart to the frontend polling.
It runs independently of the browser even if the user closes the page,
this task keeps polling until Volcano returns completed or failed.
"""
from django.utils import timezone
from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES
try:
record = GenerationRecord.objects.get(pk=record_id)
except GenerationRecord.DoesNotExist:
logger.warning('poll_video_task: record %s not found', record_id)
return
ark_task_id = record.ark_task_id
if not ark_task_id:
logger.warning('poll_video_task: record %s has no ark_task_id', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
return
elapsed = 0
logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id)
while True:
# 计算当前间隔
interval = POLL_SCHEDULE[-1][1] # default to last
for threshold, iv in POLL_SCHEDULE:
if threshold is None or elapsed < threshold:
interval = iv
break
time.sleep(interval)
elapsed += interval
# Re-fetch record to check if frontend already updated it
try:
record.refresh_from_db()
except GenerationRecord.DoesNotExist:
logger.info('poll_video_task: record %s deleted during polling', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status)
return
# Poll Volcano API
try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
continue # retry on next interval
if new_status in ('queued', 'processing'):
# Still running, update status and continue
if record.status != new_status:
record.status = new_status
record.save(update_fields=['status'])
continue
# Terminal state reached — process result
record.status = new_status
# Save seed
returned_seed = ark_resp.get('seed')
if returned_seed is not None:
record.seed = returned_seed
if new_status == 'completed':
_handle_completed(record, ark_resp)
elif new_status == 'failed':
_handle_failed(record, ark_resp)
record.completed_at = timezone.now()
record.save(update_fields=[
'status', 'result_url', 'error_message', 'raw_error',
'seed', 'completed_at',
])
logger.info(
'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds',
record_id, ark_task_id, new_status, elapsed,
)
return
def _handle_completed(record, ark_resp):
"""Process a completed task: persist video to TOS and settle payment."""
from utils.airdrama_client import extract_video_url
video_url = extract_video_url(ark_resp)
if video_url:
try:
from utils.tos_client import upload_from_url
record.result_url = upload_from_url(video_url, folder='results')
except Exception:
logger.exception('poll_video_task: failed to persist video to TOS')
record.result_url = video_url
# 结算:按实际 tokens 扣费
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)
def _handle_failed(record, ark_resp):
"""Process a failed task: record error and release frozen amount."""
from utils.airdrama_client import ERROR_MESSAGES
error = ark_resp.get('error', {})
code = error.get('code', '') if isinstance(error, dict) else ''
raw_msg = error.get('message', '') if isinstance(error, dict) else str(error)
record.error_message = ERROR_MESSAGES.get(code, raw_msg)
record.raw_error = f'{code}: {raw_msg}' if code else raw_msg
usage = ark_resp.get('usage', {})
total_tokens = usage.get('total_tokens', 0) if isinstance(usage, dict) else 0
if total_tokens > 0:
from apps.generation.views import _settle_payment
_settle_payment(record, total_tokens)
else:
from apps.generation.views import _release_freeze
_release_freeze(record)

View File

@ -363,6 +363,12 @@ def video_generate_view(request):
record.ark_task_id = ark_task_id
record.status = 'processing'
record.save(update_fields=['ark_task_id', 'status'])
# 触发后端异步轮询(连不上 Redis 时静默降级,前端轮询兜底)
try:
from apps.generation.tasks import poll_video_task
poll_video_task.delay(record.id)
except Exception:
logger.debug('Celery not available, falling back to frontend polling')
except Exception as e:
logger.exception('AirDrama API create task failed')
record.status = 'failed'

View File

@ -3,3 +3,10 @@ try:
pymysql.install_as_MySQLdb()
except ImportError:
pass # Docker uses mysqlclient natively
# Celery app — import so that @shared_task uses this app
try:
from .celery import app as celery_app
__all__ = ('celery_app',)
except ImportError:
pass # celery not installed (local dev without redis)

10
backend/config/celery.py Normal file
View File

@ -0,0 +1,10 @@
"""Celery configuration for AirDrama backend."""
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('airdrama')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(['apps.generation'])

View File

@ -170,6 +170,16 @@ CORS_ALLOW_CREDENTIALS = True
CSRF_TRUSTED_ORIGINS = [o for o in CORS_ALLOWED_ORIGINS if o.startswith('https://')]
# ──────────────────────────────────────────────
# Celery (async task queue)
# ──────────────────────────────────────────────
CELERY_BROKER_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True

View File

@ -9,3 +9,5 @@ requests>=2.31,<3.0
ip-region>=1.0
volcengine>=1.0.218
Pillow>=10.0
celery>=5.3,<6.0
redis>=5.0,<6.0

View File

@ -54,6 +54,9 @@ spec:
key: DB_PASSWORD
- name: DB_PORT
value: "3306"
# Redis (Celery broker)
- name: REDIS_URL
value: "redis://redis-service:6379/0"
# CORS
- name: CORS_ALLOWED_ORIGINS
value: "https://airflow-studio.airlabs.art"

View File

@ -0,0 +1,90 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
labels:
app: celery-worker
spec:
replicas: 1
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
imagePullSecrets:
- name: swr-secret
containers:
- name: celery-worker
image: ${CI_REGISTRY_IMAGE}/video-backend:latest
imagePullPolicy: Always
command: ["celery", "-A", "config", "worker", "--loglevel=info", "--concurrency=4"]
env:
- name: USE_MYSQL
value: "true"
- name: DJANGO_DEBUG
value: "False"
- name: DJANGO_ALLOWED_HOSTS
value: "*"
- name: DJANGO_SECRET_KEY
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: DJANGO_SECRET_KEY
# Redis
- name: REDIS_URL
value: "redis://redis-service:6379/0"
# Database (Aliyun RDS)
- name: DB_HOST
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: DB_HOST
- name: DB_NAME
value: "video_auto"
- name: DB_USER
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: DB_USER
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: DB_PASSWORD
- name: DB_PORT
value: "3306"
# TOS (from Secret)
- name: TOS_ACCESS_KEY
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: TOS_ACCESS_KEY
- name: TOS_SECRET_KEY
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: TOS_SECRET_KEY
- name: TOS_BUCKET
value: "airdrama-media"
- name: TOS_ENDPOINT
value: "https://tos-cn-beijing.volces.com"
- name: TOS_REGION
value: "cn-beijing"
- name: TOS_CDN_DOMAIN
value: "https://airdrama-media.tos-cn-beijing.volces.com"
# Seedance API (from Secret)
- name: ARK_API_KEY
valueFrom:
secretKeyRef:
name: video-backend-secrets
key: ARK_API_KEY
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"

40
k8s/redis-deployment.yaml Normal file
View File

@ -0,0 +1,40 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
labels:
app: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "64Mi"
cpu: "50m"
limits:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- protocol: TCP
port: 6379
targetPort: 6379