from airshelf.celery import app from apps.ai.models import AITask from apps.ai.services import poll_video_segment from apps.projects.models import ExportJob from apps.projects.models import VideoSegment from apps.projects.services.export import run_export_job @app.task(bind=True, max_retries=120) def poll_video_segment_task(self, video_segment_id: str) -> str: segment = VideoSegment.objects.select_related("project", "project__created_by").get(id=video_segment_id) ai_task = ( segment.project.ai_tasks.filter( task_type=AITask.Type.VIDEO_SEGMENT, request_payload__video_segment_id=str(segment.id), status__in=[AITask.Status.SUBMITTED, AITask.Status.POLLING], ) .select_related("created_by") .order_by("-created_at") .first() ) if ai_task is None: return video_segment_id user = ai_task.created_by or segment.project.created_by version = poll_video_segment(video_segment=segment, user=user) if version is None and segment.status in [VideoSegment.Status.RUNNING, VideoSegment.Status.QUEUED]: raise self.retry(countdown=30) return video_segment_id @app.task(bind=True, max_retries=2) def run_export_job_task(self, export_job_id: str) -> str: try: run_export_job(export_job_id) except Exception as exc: export_job = ExportJob.objects.filter(id=export_job_id).first() if export_job: export_job.status = ExportJob.Status.FAILED export_job.error_message = str(exc) export_job.save(update_fields=["status", "error_message", "updated_at"]) raise return export_job_id