45 lines
1.6 KiB
Python

from airshelf.celery import app
from apps.ai.models import AITask
from apps.ai.services import poll_video_segment
from apps.projects.models import ExportJob
from apps.projects.models import VideoSegment
from apps.projects.services.export import run_export_job
@app.task(bind=True, max_retries=120)
def poll_video_segment_task(self, video_segment_id: str) -> str:
segment = VideoSegment.objects.select_related("project", "project__created_by").get(id=video_segment_id)
ai_task = (
segment.project.ai_tasks.filter(
task_type=AITask.Type.VIDEO_SEGMENT,
request_payload__video_segment_id=str(segment.id),
status__in=[AITask.Status.SUBMITTED, AITask.Status.POLLING],
)
.select_related("created_by")
.order_by("-created_at")
.first()
)
if ai_task is None:
return video_segment_id
user = ai_task.created_by or segment.project.created_by
version = poll_video_segment(video_segment=segment, user=user)
if version is None and segment.status in [VideoSegment.Status.RUNNING, VideoSegment.Status.QUEUED]:
raise self.retry(countdown=30)
return video_segment_id
@app.task(bind=True, max_retries=2)
def run_export_job_task(self, export_job_id: str) -> str:
try:
run_export_job(export_job_id)
except Exception as exc:
export_job = ExportJob.objects.filter(id=export_job_id).first()
if export_job:
export_job.status = ExportJob.Status.FAILED
export_job.error_message = str(exc)
export_job.save(update_fields=["status", "error_message", "updated_at"])
raise
return export_job_id