fix celery

This commit is contained in:
zyc 2026-04-04 10:15:23 +08:00
parent a4c36e4fee
commit 43228d255e
5 changed files with 15950 additions and 62 deletions

View File

@ -1,27 +1,25 @@
"""Celery tasks for async video generation polling.""" """Celery tasks for async video generation polling."""
import logging import logging
import time
from celery import shared_task from celery import shared_task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 固定轮询间隔:全程每 5 秒RPM 12000 足够400 并发仅用 40% # 轮询间隔(秒):每次查完后重新入队,不占 worker 进程
POLL_INTERVAL = 5 POLL_INTERVAL = 5
@shared_task(bind=True, max_retries=0, ignore_result=True) @shared_task(bind=True, max_retries=None, ignore_result=True)
def poll_video_task(self, record_id): def poll_video_task(self, record_id):
"""Poll Volcano API for a video generation task until it reaches a terminal state. """Poll Volcano API for a video generation task.
This is the server-side counterpart to the frontend polling. 每次只执行一轮查询查完通过 self.retry 重新入队
It runs independently of the browser even if the user closes the page, 这样 worker 不会被 sleep 占死重启也不丢任务
this task keeps polling until Volcano returns completed or failed.
""" """
from django.utils import timezone from django.utils import timezone
from apps.generation.models import GenerationRecord from apps.generation.models import GenerationRecord
from utils.airdrama_client import query_task, map_status, extract_video_url, ERROR_MESSAGES from utils.airdrama_client import query_task, map_status
try: try:
record = GenerationRecord.objects.get(pk=record_id) record = GenerationRecord.objects.get(pk=record_id)
@ -38,62 +36,42 @@ def poll_video_task(self, record_id):
logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status) logger.info('poll_video_task: record %s already in terminal state: %s', record_id, record.status)
return return
elapsed = 0 # Poll Volcano API
logger.info('poll_video_task: start polling record=%s ark=%s', record_id, ark_task_id) try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
raise self.retry(countdown=POLL_INTERVAL)
while True: if new_status in ('queued', 'processing'):
time.sleep(POLL_INTERVAL) # Still running — update status, then re-enqueue
elapsed += POLL_INTERVAL
# Re-fetch record to check if frontend already updated it
try:
record.refresh_from_db()
except GenerationRecord.DoesNotExist:
logger.info('poll_video_task: record %s deleted during polling', record_id)
return
if record.status not in ('queued', 'processing'):
logger.info('poll_video_task: record %s resolved by frontend: %s', record_id, record.status)
return
# Poll Volcano API
try:
ark_resp = query_task(ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
except Exception:
logger.exception('poll_video_task: API query failed for %s, will retry', ark_task_id)
continue # retry on next interval
if new_status in ('queued', 'processing'):
# Still running, update status and touch updated_at
record.status = new_status
record.save(update_fields=['status', 'updated_at'])
continue
# Terminal state reached — process result
record.status = new_status record.status = new_status
record.save(update_fields=['status', 'updated_at'])
raise self.retry(countdown=POLL_INTERVAL)
# Save seed # Terminal state reached — process result
returned_seed = ark_resp.get('seed') record.status = new_status
if returned_seed is not None:
record.seed = returned_seed
if new_status == 'completed': returned_seed = ark_resp.get('seed')
_handle_completed(record, ark_resp) if returned_seed is not None:
elif new_status == 'failed': record.seed = returned_seed
_handle_failed(record, ark_resp)
record.completed_at = timezone.now() if new_status == 'completed':
record.save(update_fields=[ _handle_completed(record, ark_resp)
'status', 'result_url', 'error_message', 'raw_error', elif new_status == 'failed':
'seed', 'completed_at', _handle_failed(record, ark_resp)
])
logger.info( record.completed_at = timezone.now()
'poll_video_task: record=%s ark=%s final_status=%s elapsed=%ds', record.save(update_fields=[
record_id, ark_task_id, new_status, elapsed, 'status', 'result_url', 'error_message', 'raw_error',
) 'seed', 'completed_at',
return ])
logger.info(
'poll_video_task: record=%s ark=%s final_status=%s',
record_id, ark_task_id, new_status,
)
def _handle_completed(record, ark_resp): def _handle_completed(record, ark_resp):
@ -122,16 +100,16 @@ def _handle_completed(record, ark_resp):
@shared_task(ignore_result=True) @shared_task(ignore_result=True)
def recover_stuck_tasks(): def recover_stuck_tasks():
"""定时扫描卡在 processing/queued 超过 10 分钟的任务,重新派发轮询。""" """定时扫描卡在 processing/queued 超过 3 分钟的任务,重新派发轮询。"""
from datetime import timedelta from datetime import timedelta
from django.utils import timezone from django.utils import timezone
from apps.generation.models import GenerationRecord from apps.generation.models import GenerationRecord
cutoff = timezone.now() - timedelta(minutes=10) cutoff = timezone.now() - timedelta(minutes=3)
stuck_records = GenerationRecord.objects.filter( stuck_records = GenerationRecord.objects.filter(
status__in=('queued', 'processing'), status__in=('queued', 'processing'),
ark_task_id__isnull=False, ark_task_id__isnull=False,
updated_at__lt=cutoff, # updated_at 超过 10 分钟没更新,说明没有 worker 在轮询 updated_at__lt=cutoff,
).exclude(ark_task_id='') ).exclude(ark_task_id='')
count = 0 count = 0

View File

@ -182,7 +182,7 @@ CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
'recover-stuck-tasks': { 'recover-stuck-tasks': {
'task': 'apps.generation.tasks.recover_stuck_tasks', 'task': 'apps.generation.tasks.recover_stuck_tasks',
'schedule': 600, # 每 10 分钟 'schedule': 180, # 每 3 分钟
}, },
} }

View File

@ -0,0 +1,134 @@
# Celery 轮询机制修复报告
> 日期2026-04-04
> 版本v0.16.0
> 影响范围backend/apps/generation/tasks.py, backend/config/settings.py
---
## 一、问题现象
2026/4/1 下午,大量用户反馈视频生成任务长时间卡在"生成中",前端显示耗时 60~65 分钟。
火山引擎侧确认视频实际生成仅需约 10 分钟,结果已就绪但未被平台及时同步。
**截图数据**4/1 下午完成的任务):
| 提交时间 | 显示耗时 |
|---------|---------|
| 2026/4/1 16:57:28 | 63 分 33 秒 |
| 2026/4/1 16:58:41 | 62 分 37 秒 |
| 2026/4/1 16:59:16 | 62 分 7 秒 |
| 2026/4/1 17:00:36 | 64 分 24 秒 |
| 2026/4/1 17:04:53 | 64 分 2 秒 |
## 二、根因分析
### 2.1 状态同步链路
```
用户提交任务
→ 后端调 create_task火山 API
→ 获得 ark_task_id
→ 派发 Celery 任务 poll_video_task
→ Celery worker 每 5 秒查一次火山 API
→ 火山返回完成 → 写 DB + 上传 TOS + 结算
→ 前端轮询 DB → 展示结果
```
前端只读 DB 状态,**不直接调火山 API**。整个链路完全依赖 Celery worker 轮询。
### 2.2 旧实现缺陷
`poll_video_task` 使用 `while True` + `time.sleep(5)` 长驻循环:
```python
# 旧代码
while True:
time.sleep(POLL_INTERVAL) # 5 秒
ark_resp = query_task(...) # 查一次
if terminal:
break
```
**三个致命问题:**
| 问题 | 影响 |
|------|------|
| 每个任务占死一个 worker 进程 | `concurrency=4` 最多同时轮询 4 个任务,第 5 个排队 |
| worker 重启后循环直接丢失 | 内存中的 `while True` 不可持久化OOM/重启 = 任务丢失 |
| `time.sleep` 浪费进程资源 | worker 99% 时间在 sleep实际有用工作不到 1% |
### 2.3 OOM 重启链
```
4 个任务同时轮询
→ 某些任务完成,触发 TOS 上传(下载视频 + 上传对象存储)
→ 内存飙升超过 512Mi 限制
→ K8s OOM Kill → worker 重启(共重启 15 次)
→ 4 个进程中的 while True 循环全部丢失
→ 等 recover_stuck_tasks每 10 分钟)重新派发
→ 重新派发后 worker 又被占满 → 又 OOM → 循环
→ 实际恢复耗时 ≈ 50~60 分钟
```
## 三、修复方案
### 3.1 核心改动self.retry 替代 while True
```python
# 新代码
@shared_task(bind=True, max_retries=None, ignore_result=True)
def poll_video_task(self, record_id):
record = GenerationRecord.objects.get(pk=record_id)
ark_resp = query_task(record.ark_task_id)
new_status = map_status(ark_resp.get('status', ''))
if new_status in ('queued', 'processing'):
record.save(update_fields=['status', 'updated_at'])
raise self.retry(countdown=5) # 5 秒后重新入队
# 到达终态 → 处理结果
...
```
**原理对比:**
| | 旧方式while True | 新方式self.retry |
|---|---|---|
| 任务生命周期 | 在 worker 进程内存中 | 在 Redis 队列中 |
| worker 占用 | 持续占用直到完成(分钟级) | 每次查询仅占用毫秒级 |
| worker 重启 | 任务丢失 | Redis 中的任务自动恢复 |
| 并发能力 | 最多 4 个(= concurrency | 数百个(受 API RPM 限制) |
### 3.2 recover_stuck_tasks 间隔缩短
| | 旧值 | 新值 |
|---|---|---|
| Beat 调度间隔 | 600 秒10 分钟) | 180 秒3 分钟) |
| stuck 判定门槛 | 10 分钟 | 3 分钟 |
| 最坏恢复时间 | ~20 分钟 | ~6 分钟 |
### 3.3 变更文件
| 文件 | 改动 |
|------|------|
| `backend/apps/generation/tasks.py` | `poll_video_task`: while True → self.retry`recover_stuck_tasks`: 门槛 10 → 3 分钟 |
| `backend/config/settings.py` | Beat schedule: 600 → 180 秒 |
## 四、效果预估
| 指标 | 修复前 | 修复后 |
|------|--------|--------|
| 同时轮询任务数上限 | 4 | 数百 |
| worker 重启后任务恢复 | 丢失,等 10 分钟兜底 | 自动恢复,无需兜底 |
| 最坏同步延迟 | 60+ 分钟 | ~15 秒(= 查询间隔 + 网络延迟) |
| 内存占用 | 持续占满sleep 期间不释放) | 脉冲式占用(查完释放) |
| OOM 风险 | 高4 进程常驻 + TOS 上传峰值) | 低(进程闲置时内存极小) |
## 五、部署注意
1. **无需数据库迁移** — 仅修改 Python 代码
2. **部署后旧的 while True 任务会自然消亡** — 不需要手动干预
3. **Redis 中可能有旧格式的任务** — 兼容无问题,新旧 `poll_video_task` 签名一致(`record_id` 参数不变)
4. **建议同步部署**:先部署代码,再重启 Celery worker`kubectl rollout restart deployment celery-worker`

7888
video_auto copy.sql Normal file

File diff suppressed because one or more lines are too long

7888
video_auto.sql Normal file

File diff suppressed because one or more lines are too long