import re import subprocess import tempfile import uuid from datetime import timedelta from decimal import Decimal from io import BytesIO from pathlib import Path from django.db import transaction from django.utils import timezone from apps.ai.models import AITask, ModelConfig from apps.ai.providers import VolcanoArkProvider from apps.assets.models import Asset, AssetFile from apps.assets.storage import TosStorage from apps.billing.services.ledger import charge_reserved_credit, release_credit, reserve_credit from apps.projects.models import ( BaseAssetGroup, ExportJob, ProjectStage, ScriptSegment, ScriptVersion, StoryboardFrame, StoryboardVersion, VideoSegment, VideoSegmentVersion, ) def get_default_model(capability: str) -> ModelConfig: return ( ModelConfig.objects.select_related("provider") .filter(capability=capability, status=ModelConfig.Status.ACTIVE, provider__status="active") .order_by("created_at") .first() ) def estimate_cost(model_config: ModelConfig) -> Decimal: return model_config.unit_price if model_config.unit_price > 0 else Decimal("1.0000") def build_script_prompt(*, project, user_prompt: str, selling_point_ids: list[str] | None = None) -> list[dict[str, str]]: product = project.product selling_points = product.selling_points.all() if selling_point_ids: selling_points = selling_points.filter(id__in=selling_point_ids) selling_text = "\n".join(f"- {item.title}: {item.detail}" for item in selling_points) system = ( "你是电商短视频脚本导演。请为 9:16 竖屏带货短视频生成 60 秒脚本," "拆成 4 个 15 秒段落。每段包含旁白、画面描述、商品露出方式和转场建议。" ) user = f""" 商品标题:{product.title} 品牌:{product.brand or "未填写"} 类目:{product.category or "未填写"} 目标人群:{product.target_audience or "未填写"} 商品描述:{product.description or "未填写"} 卖点: {selling_text or "未选择卖点,请根据商品信息自行提炼。"} 用户补充需求: {user_prompt or "生成一条结构完整、节奏清晰、适合投放的带货短视频脚本。"} """.strip() return [{"role": "system", "content": system}, {"role": "user", "content": user}] def split_script_into_segments(content: str, count: int = 4) -> list[str]: """把一段脚本稳健地拆成 `count` 个分镜文本,保证每镜都非空、且所有内容都被分配到某一镜。 原实现按行 `[:4]`,ARK 返回整段散文时常变成「第1镜有词、2/3/4镜全空」, 导致后续故事板帧 / 视频段拿到空提示词,前后内容断裂。这里改为: 优先按空行/标号块切,块数够就把全部块均匀分桶;块不够再按句子切;仍不够则补齐。 """ def _bucketize(items: list[str], joiner: str) -> list[str]: buckets: list[list[str]] = [[] for _ in range(count)] per = len(items) / count for index, item in enumerate(items): buckets[min(count - 1, int(index / per))].append(item) return [joiner.join(bucket).strip() for bucket in buckets] text = (content or "").strip() if not text: return [""] * count # 1) 优先按空行分段;只有一段时退回按行分 blocks = [block.strip() for block in re.split(r"\n\s*\n", text) if block.strip()] if len(blocks) < 2: blocks = [line.strip() for line in text.splitlines() if line.strip()] if len(blocks) >= count: return _bucketize(blocks, "\n") # 2) 段落不足:按中英文句末标点切句,再均匀分桶 sentences = [s.strip() for s in re.split(r"(?<=[。!?!?.;;\n])", text) if s.strip()] if len(sentences) >= count: return _bucketize(sentences, " ") # 3) 仍不足:用已有块/句补齐到 count,绝不留空镜 base = blocks or sentences or [text] filled = list(base) while len(filled) < count: filled.append(base[-1]) return filled[:count] @transaction.atomic def create_ai_task(*, project, user, task_type: str, model_config: ModelConfig, request_payload: dict) -> AITask: cost = estimate_cost(model_config) task = AITask.objects.create( team=project.team, created_by=user, project=project, task_type=task_type, status=AITask.Status.CREATED, model_config=model_config, idempotency_key=f"{task_type}:{project.id}:{uuid.uuid4()}", request_payload=request_payload, estimated_cost=cost, ) reserve_credit(team=project.team, user=user, task=task, amount=cost) task.status = AITask.Status.RESERVED task.save(update_fields=["status", "updated_at"]) return task def generate_project_script(*, project, user, user_prompt: str, selling_point_ids: list[str] | None = None) -> ScriptVersion: model_config = get_default_model(ModelConfig.Capability.TEXT) if model_config is None: raise ValueError("no active text model configured") messages = build_script_prompt(project=project, user_prompt=user_prompt, selling_point_ids=selling_point_ids) payload = {"model": model_config.name, "endpoint": model_config.endpoint, "messages": messages} task = create_ai_task( project=project, user=user, task_type=AITask.Type.SCRIPT_GENERATION, model_config=model_config, request_payload=payload, ) reservation = task.credit_reservation try: task.status = AITask.Status.SUBMITTED task.submitted_at = timezone.now() task.save(update_fields=["status", "submitted_at", "updated_at"]) provider = VolcanoArkProvider(base_url=model_config.provider.base_url or None) response = provider.chat_completion(model=model_config.name, endpoint=model_config.endpoint, messages=messages) content = provider.extract_text(response) with transaction.atomic(): task.status = AITask.Status.SUCCEEDED task.response_payload = response task.actual_cost = task.estimated_cost task.completed_at = timezone.now() task.save(update_fields=["status", "response_payload", "actual_cost", "completed_at", "updated_at"]) charge_reserved_credit(reservation=reservation, actual_amount=task.actual_cost) script = ScriptVersion.objects.create( project=project, task=task, title="AI 脚本", content=content, source="ai", is_adopted=False, ) for index, segment_text in enumerate(split_script_into_segments(content)): ScriptSegment.objects.create( script_version=script, sort_order=index, duration_seconds=15, narration=segment_text, visual_prompt=segment_text, ) stage, _ = ProjectStage.objects.get_or_create(project=project, stage=ProjectStage.Stage.SCRIPT) stage.status = ProjectStage.Status.NEEDS_REVIEW stage.save(update_fields=["status", "updated_at"]) return script except Exception as exc: with transaction.atomic(): task.status = AITask.Status.FAILED task.error_message = str(exc) task.completed_at = timezone.now() task.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) release_credit(reservation=reservation, reason=str(exc)) raise def _generate_video_poster(*, video_bytes: bytes, team, project, asset_id) -> "StoredObject | None": """用 ffmpeg 抽视频首帧作为封面(poster)并上传 TOS。best-effort:任何失败都返回 None,不影响视频资产落地。""" if not video_bytes: return None try: with tempfile.TemporaryDirectory(prefix="airshelf-poster-") as tmp: tmp_dir = Path(tmp) video_path = tmp_dir / "in.mp4" poster_path = tmp_dir / "poster.jpg" video_path.write_bytes(video_bytes) proc = subprocess.run( ["ffmpeg", "-y", "-ss", "0", "-i", str(video_path), "-frames:v", "1", "-q:v", "3", str(poster_path)], capture_output=True, timeout=60, ) if proc.returncode != 0 or not poster_path.exists(): return None poster_bytes = poster_path.read_bytes() if not poster_bytes: return None object_key = f"teams/{team.id}/projects/{project.id}/generated/{asset_id}-poster.jpg" return TosStorage().upload_fileobj( fileobj=BytesIO(poster_bytes), object_key=object_key, content_type="image/jpeg" ) except Exception: # noqa: BLE001 — poster 仅用于展示,失败不阻断 return None def _store_generated_media(*, team, user, project, task, media: str, name: str, category: str, asset_type: str) -> Asset: fileobj, content_type = VolcanoArkProvider.media_to_bytes(media) suffix = ".png" if "video" in content_type: suffix = ".mp4" elif "jpeg" in content_type: suffix = ".jpg" elif "webp" in content_type: suffix = ".webp" asset_id = uuid.uuid4() object_key = f"teams/{team.id}/projects/{project.id}/generated/{asset_id}{suffix}" stored = TosStorage().upload_fileobj(fileobj=fileobj, object_key=object_key, content_type=content_type) asset = Asset.objects.create( id=asset_id, team=team, created_by=user, name=name, asset_type=asset_type, source=Asset.Source.AI_GENERATED, category=category, origin_task=task, ) AssetFile.objects.create( asset=asset, object_key=stored.object_key, bucket=stored.bucket, content_type=stored.content_type, size_bytes=stored.size_bytes, is_primary=True, ) # 视频资产:额外抽首帧作为封面图,挂成同一 Asset 下的 image 文件,供任务中心/列表显示缩略图 if "video" in content_type: try: video_bytes = fileobj.getvalue() if isinstance(fileobj, BytesIO) else b"" except Exception: # noqa: BLE001 video_bytes = b"" poster = _generate_video_poster(video_bytes=video_bytes, team=team, project=project, asset_id=asset_id) if poster: AssetFile.objects.create( asset=asset, object_key=poster.object_key, bucket=poster.bucket, content_type=poster.content_type, size_bytes=poster.size_bytes, is_primary=False, ) return asset def generate_base_asset(*, project, user, kind: str, prompt: str) -> BaseAssetGroup: model_config = get_default_model(ModelConfig.Capability.IMAGE) if model_config is None: raise ValueError("no active image model configured") payload = {"model": model_config.name, "endpoint": model_config.endpoint, "prompt": prompt, "kind": kind} task = create_ai_task( project=project, user=user, task_type={ BaseAssetGroup.Kind.PRODUCT: AITask.Type.PRODUCT_IMAGE, BaseAssetGroup.Kind.PERSON: AITask.Type.PERSON_IMAGE, BaseAssetGroup.Kind.SCENE: AITask.Type.SCENE_IMAGE, }[kind], model_config=model_config, request_payload=payload, ) reservation = task.credit_reservation try: provider = VolcanoArkProvider(base_url=model_config.provider.base_url or None) response = provider.image_generation(model=model_config.name, endpoint=model_config.endpoint, prompt=prompt) media = provider.extract_first_media_url(response) with transaction.atomic(): task.status = AITask.Status.SUCCEEDED task.response_payload = response task.actual_cost = task.estimated_cost task.completed_at = timezone.now() task.save(update_fields=["status", "response_payload", "actual_cost", "completed_at", "updated_at"]) charge_reserved_credit(reservation=reservation, actual_amount=task.actual_cost) category = { BaseAssetGroup.Kind.PRODUCT: Asset.Category.PRODUCT_IMAGE, BaseAssetGroup.Kind.PERSON: Asset.Category.PERSON, BaseAssetGroup.Kind.SCENE: Asset.Category.SCENE, }[kind] asset = _store_generated_media( team=project.team, user=user, project=project, task=task, media=media, name=f"{project.name}-{kind}", category=category, asset_type=Asset.Type.IMAGE, ) group = BaseAssetGroup.objects.create(project=project, kind=kind, task=task, prompt=prompt) group.candidate_assets.add(asset) group.adopted_asset = asset group.save(update_fields=["adopted_asset", "updated_at"]) return group except Exception as exc: task.status = AITask.Status.FAILED task.error_message = str(exc) task.completed_at = timezone.now() task.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) release_credit(reservation=reservation, reason=str(exc)) raise def _scene_context(project) -> str: """从商品 + 已采用基础资产提炼一句「风格锚点」,贯穿故事板 / 视频,保证各镜内容一致。""" product = project.product parts = [f"商品:{product.title}"] if product.brand: parts.append(f"品牌:{product.brand}") if product.category: parts.append(f"类目:{product.category}") if getattr(product, "target_audience", ""): parts.append(f"人群:{product.target_audience}") adopted_kinds = set( project.base_asset_groups.filter(adopted_asset__isnull=False).values_list("kind", flat=True) ) if BaseAssetGroup.Kind.PERSON in adopted_kinds: parts.append("真人出镜,保持人物一致") if BaseAssetGroup.Kind.SCENE in adopted_kinds: parts.append("统一场景与色调") return " · ".join(parts) def build_storyboard_frame_prompt(project, version, segment) -> str: """单帧故事板提示词:风格锚点 + 本镜画面(回退旁白)+ 版本统一指令。""" visual = (segment.visual_prompt or segment.narration or "").strip() lines = [ _scene_context(project), f"第 {segment.sort_order + 1} 镜画面:{visual}" if visual else f"第 {segment.sort_order + 1} 镜", ] if version.prompt: lines.append(version.prompt.strip()) lines.append("电商竖屏分镜图,构图清晰,可直接指导视频生成") return "\n".join(line for line in lines if line) def build_video_segment_prompt(project, video_segment, scene, user_prompt: str) -> str: """单段视频提示词:把本镜旁白 + 画面 + 风格锚点织进去,让每个视频片段跟住对应脚本/故事板。""" lines = [_scene_context(project)] if scene is not None: if scene.narration: lines.append(f"旁白:{scene.narration.strip()}") visual = (scene.visual_prompt or scene.narration or "").strip() if visual: lines.append(f"画面:{visual}") if user_prompt: lines.append(user_prompt.strip()) lines.append( f"第 {video_segment.sort_order + 1} 段 · {video_segment.target_duration_seconds}s · " "9:16 竖屏电商带货短视频,镜头稳定,商品露出清晰,节奏有转化感" ) return "\n".join(line for line in lines if line) def submit_storyboard(*, project, user, prompt: str = "") -> StoryboardVersion: """异步故事板·提交:快速创建(或复用)一个未采用的版本,不在此处生图。逐帧生成交给 generate_storyboard_frame(轮询)。""" adopted_script = project.script_versions.filter(is_adopted=True).prefetch_related("segments").first() if adopted_script is None: raise ValueError("script must be adopted before generating storyboard") if get_default_model(ModelConfig.Capability.IMAGE) is None: raise ValueError("no active image model configured") # 复用尚未完成(未采用)的版本,避免重复提交产生多版本;否则新建 version = project.storyboard_versions.filter(is_adopted=False).order_by("-created_at").first() if version is None: version = StoryboardVersion.objects.create(project=project, prompt=prompt) elif prompt and version.prompt != prompt: version.prompt = prompt version.save(update_fields=["prompt", "updated_at"]) return version def _storyboard_frame_worker(task_id, version_id, segment_id, user_id) -> None: """后台线程:真正调 ARK 生成一帧故事板图并落库。每次 poll 不阻塞在此——HTTP 永远秒回。""" import threading # noqa: F401 — 仅标注此函数运行在独立线程 from django.db import connections from apps.accounts.models import User try: task = AITask.objects.select_related("model_config__provider").get(id=task_id) version = StoryboardVersion.objects.select_related("project__team").get(id=version_id) segment = ScriptSegment.objects.get(id=segment_id) user = User.objects.get(id=user_id) project = version.project model_config = task.model_config reservation = task.credit_reservation task.status = AITask.Status.SUBMITTED task.save(update_fields=["status", "updated_at"]) try: provider = VolcanoArkProvider(base_url=model_config.provider.base_url or None) frame_prompt = task.request_payload.get("prompt") or build_storyboard_frame_prompt(project, version, segment) response = provider.image_generation( model=model_config.name, endpoint=model_config.endpoint, prompt=frame_prompt, ) media = provider.extract_first_media_url(response) task.status = AITask.Status.SUCCEEDED task.response_payload = response task.actual_cost = task.estimated_cost task.completed_at = timezone.now() task.save(update_fields=["status", "response_payload", "actual_cost", "completed_at", "updated_at"]) charge_reserved_credit(reservation=reservation, actual_amount=task.actual_cost) asset = _store_generated_media( team=project.team, user=user, project=project, task=task, media=media, name=f"{project.name}-storyboard-{segment.sort_order + 1}", category=Asset.Category.SCENE, asset_type=Asset.Type.IMAGE, ) StoryboardFrame.objects.create( storyboard=version, script_segment=segment, asset=asset, sort_order=segment.sort_order, prompt=segment.visual_prompt, ) except Exception as exc: # noqa: BLE001 — 失败回滚额度,标记任务失败供 poll 上报 task.status = AITask.Status.FAILED task.error_message = str(exc) task.completed_at = timezone.now() task.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) release_credit(reservation=reservation, reason=str(exc)) finally: connections.close_all() # 释放该线程的 DB 连接 def generate_storyboard_frame(*, project, user) -> dict: """异步故事板·轮询(秒回):读取进度;若无帧在生成则后台起线程生成下一帧。永不阻塞在 ARK 调用上。 返回 {status: generating|succeeded|failed, done, total, version_id}。全部完成→采用版本。""" import threading version = project.storyboard_versions.filter(is_adopted=False).order_by("-created_at").first() adopted_script = project.script_versions.filter(is_adopted=True).prefetch_related("segments").first() if version is None or adopted_script is None: latest = project.storyboard_versions.order_by("-created_at").first() n = latest.frames.count() if latest else 0 return {"status": "succeeded", "done": n, "total": n, "version_id": str(latest.id) if latest else ""} segments = list(adopted_script.segments.all().order_by("sort_order")) total = len(segments) done_segment_ids = set(version.frames.values_list("script_segment_id", flat=True)) done = len(done_segment_ids) if done >= total: _finalize_storyboard(project, version) return {"status": "succeeded", "done": total, "total": total, "version_id": str(version.id)} # 该版本内是否已有帧在后台生成中(RESERVED/SUBMITTED 的故事板任务即为「占位锁」)。 # 仅算「近 3 分钟内」的任务:若进程/线程意外中断留下僵尸任务,超时后不再视为在生成,允许重新发起。 stale_cutoff = timezone.now() - timedelta(minutes=3) inflight = AITask.objects.filter( project=project, task_type=AITask.Type.STORYBOARD, status__in=[AITask.Status.CREATED, AITask.Status.RESERVED, AITask.Status.SUBMITTED], request_payload__storyboard_version=str(version.id), created_at__gte=stale_cutoff, ).exists() if inflight: return {"status": "generating", "done": done, "total": total, "version_id": str(version.id)} pending = [s for s in segments if s.id not in done_segment_ids] segment = pending[0] # 单帧失败次数上限,避免持续失败时无限重试 failed_for_segment = AITask.objects.filter( project=project, task_type=AITask.Type.STORYBOARD, status=AITask.Status.FAILED, request_payload__storyboard_segment=str(segment.id), ).count() if failed_for_segment >= 2: last = AITask.objects.filter(project=project, task_type=AITask.Type.STORYBOARD, status=AITask.Status.FAILED, request_payload__storyboard_segment=str(segment.id)).order_by("-created_at").first() return {"status": "failed", "done": done, "total": total, "version_id": str(version.id), "error": last.error_message if last else "storyboard frame failed"} model_config = get_default_model(ModelConfig.Capability.IMAGE) task = create_ai_task( project=project, user=user, task_type=AITask.Type.STORYBOARD, model_config=model_config, request_payload={ "model": model_config.name, "endpoint": model_config.endpoint, "prompt": build_storyboard_frame_prompt(project, version, segment), "storyboard_version": str(version.id), "storyboard_segment": str(segment.id), }, ) threading.Thread( target=_storyboard_frame_worker, args=(str(task.id), str(version.id), str(segment.id), str(user.id)), daemon=True, ).start() return {"status": "generating", "done": done, "total": total, "version_id": str(version.id)} def _finalize_storyboard(project, version) -> None: """全部帧就绪:采用该版本(反采用其余版本)。项目阶段推进由视图负责(与原同步实现一致)。""" project.storyboard_versions.exclude(id=version.id).update(is_adopted=False) if not version.is_adopted: version.is_adopted = True version.save(update_fields=["is_adopted", "updated_at"]) def _asset_preview_url(asset) -> str: """资产主文件的可公开访问 URL(已写绝对 URL 优先,否则实时签 TOS GET)。""" if asset is None: return "" primary = asset.files.filter(is_primary=True).first() or asset.files.first() if primary is None: return "" if primary.preview_url: return primary.preview_url try: return TosStorage().presigned_get_url(object_key=primary.object_key) except Exception: return "" def _video_reference_images(project, video_segment) -> list[str]: """为本视频段挑一张视觉参考图:优先本镜故事板帧,兜底已采用商品基础资产。""" version = ( project.storyboard_versions.filter(is_adopted=True).order_by("-created_at").first() or project.storyboard_versions.order_by("-created_at").first() ) if version is not None: frame = ( version.frames.filter(sort_order=video_segment.sort_order).first() or version.frames.order_by("sort_order").first() ) if frame is not None: url = _asset_preview_url(frame.asset) if url: return [url] product_group = ( project.base_asset_groups.filter(kind=BaseAssetGroup.Kind.PRODUCT, adopted_asset__isnull=False) .order_by("-created_at") .first() ) if product_group is not None: url = _asset_preview_url(product_group.adopted_asset) if url: return [url] return [] def submit_video_segment(*, video_segment: VideoSegment, user, prompt: str) -> VideoSegmentVersion | None: model_config = get_default_model(ModelConfig.Capability.VIDEO) if model_config is None: raise ValueError("no active video model configured") project = video_segment.project # 衔接:按 sort_order 把视频段绑到对应脚本镜,并织出跟住该镜的提示词。 scene = None adopted_script = project.script_versions.filter(is_adopted=True).prefetch_related("segments").first() if adopted_script is not None: scene = adopted_script.segments.filter(sort_order=video_segment.sort_order).first() if scene is not None and video_segment.script_segment_id != scene.id: video_segment.script_segment = scene video_segment.save(update_fields=["script_segment", "updated_at"]) final_prompt = build_video_segment_prompt(project, video_segment, scene, prompt) # 参考图:优先用本镜故事板帧,其次商品/人物基础资产,给视频做视觉锚点(衔接故事板→视频)。 reference_images = _video_reference_images(project, video_segment) task = create_ai_task( project=project, user=user, task_type=AITask.Type.VIDEO_SEGMENT, model_config=model_config, request_payload={ "model": model_config.name, "endpoint": model_config.endpoint, "prompt": final_prompt, "duration": video_segment.target_duration_seconds, "ratio": "9:16", "video_segment_id": str(video_segment.id), "reference_images": reference_images, }, ) try: provider = VolcanoArkProvider(base_url=model_config.provider.base_url or None) try: response = provider.create_video_task( model=model_config.name, endpoint=model_config.endpoint, prompt=final_prompt, duration=video_segment.target_duration_seconds, ratio="9:16", resolution="720p", reference_images=reference_images or None, ) except Exception: # 降级:带参考图被拒时退回纯文生视频(文本里已含本镜旁白/画面,衔接不丢) if not reference_images: raise response = provider.create_video_task( model=model_config.name, endpoint=model_config.endpoint, prompt=final_prompt, duration=video_segment.target_duration_seconds, ratio="9:16", resolution="720p", reference_images=None, ) task.provider_task_id = str(response.get("id") or response.get("task_id") or "") task.response_payload = response task.status = AITask.Status.SUBMITTED task.submitted_at = timezone.now() task.save(update_fields=["provider_task_id", "response_payload", "status", "submitted_at", "updated_at"]) video_segment.status = VideoSegment.Status.RUNNING video_segment.save(update_fields=["status", "updated_at"]) return None except Exception as exc: task.status = AITask.Status.FAILED task.error_message = str(exc) task.completed_at = timezone.now() task.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) release_credit(reservation=task.credit_reservation, reason=str(exc)) video_segment.status = VideoSegment.Status.FAILED video_segment.error_message = str(exc) video_segment.save(update_fields=["status", "error_message", "updated_at"]) raise def poll_video_segment(*, video_segment: VideoSegment, user) -> VideoSegmentVersion | None: # 幂等:已完成的段直接回采用版;已失败的段不再 poll。避免对已成功 task 再 poll → 二次建版 / 二次扣费。 if video_segment.status == VideoSegment.Status.SUCCEEDED: return video_segment.adopted_version or video_segment.versions.order_by("-created_at").first() if video_segment.status == VideoSegment.Status.FAILED: return None task = video_segment.versions.order_by("-created_at").first() ai_task = None if task: ai_task = task.task if ai_task is None: ai_task = video_segment.project.ai_tasks.filter( task_type=AITask.Type.VIDEO_SEGMENT, request_payload__video_segment_id=str(video_segment.id), status__in=[AITask.Status.SUBMITTED, AITask.Status.POLLING], ).order_by("-created_at").first() if ai_task is None: raise ValueError("no active video generation task") # task 已终态(可能被并发的 worker / 另一次 poll 处理过):直接回已有版,不再调 ARK。 if ai_task.status == AITask.Status.SUCCEEDED: return video_segment.versions.filter(task=ai_task).order_by("-created_at").first() if ai_task.status in (AITask.Status.FAILED, AITask.Status.CANCELLED): return None provider = VolcanoArkProvider(base_url=ai_task.model_config.provider.base_url or None) response = provider.poll_video_task(endpoint=ai_task.model_config.endpoint, provider_task_id=ai_task.provider_task_id) remote_status = response.get("status") if remote_status in {"queued", "running", "processing"}: ai_task.status = AITask.Status.POLLING ai_task.response_payload = response ai_task.save(update_fields=["status", "response_payload", "updated_at"]) return None if remote_status in {"failed", "expired", "cancelled"}: ai_task.status = AITask.Status.FAILED ai_task.response_payload = response ai_task.error_message = response.get("error", {}).get("message", "video generation failed") ai_task.completed_at = timezone.now() ai_task.save(update_fields=["status", "response_payload", "error_message", "completed_at", "updated_at"]) release_credit(reservation=ai_task.credit_reservation, reason=ai_task.error_message) video_segment.status = VideoSegment.Status.FAILED video_segment.error_message = ai_task.error_message video_segment.save(update_fields=["status", "error_message", "updated_at"]) return None media = provider.extract_first_media_url(response) asset = _store_generated_media( team=video_segment.project.team, user=user, project=video_segment.project, task=ai_task, media=media, name=f"{video_segment.project.name}-segment-{video_segment.sort_order + 1}", category=Asset.Category.VIDEO_CLIP, asset_type=Asset.Type.VIDEO, ) ai_task.status = AITask.Status.SUCCEEDED ai_task.response_payload = response ai_task.actual_cost = ai_task.estimated_cost ai_task.completed_at = timezone.now() ai_task.save(update_fields=["status", "response_payload", "actual_cost", "completed_at", "updated_at"]) charge_reserved_credit(reservation=ai_task.credit_reservation, actual_amount=ai_task.actual_cost) version = VideoSegmentVersion.objects.create( video_segment=video_segment, task=ai_task, asset=asset, prompt=ai_task.request_payload.get("prompt", ""), is_adopted=True, ) video_segment.adopted_version = version video_segment.status = VideoSegment.Status.SUCCEEDED video_segment.error_message = "" video_segment.save(update_fields=["adopted_version", "status", "error_message", "updated_at"]) return version def create_export_job(*, timeline, user) -> ExportJob: return ExportJob.objects.create(timeline=timeline, status=ExportJob.Status.QUEUED) _STANDALONE_CATEGORY = { "model": Asset.Category.PERSON, "cover": Asset.Category.PRODUCT_IMAGE, "image": Asset.Category.PRODUCT_IMAGE, } _STANDALONE_TASK_TYPE = { "model": AITask.Type.PERSON_IMAGE, "cover": AITask.Type.PRODUCT_IMAGE, "image": AITask.Type.PRODUCT_IMAGE, } def generate_standalone_image(*, team, user, prompt: str, mode: str = "image", count: int = 1) -> list[Asset]: """不绑定项目的独立生图(图片创作 / 模特上身图 / 平台套图)。复用项目内生图链路,AITask.project=None。""" model_config = get_default_model(ModelConfig.Capability.IMAGE) if model_config is None: raise ValueError("no active image model configured") category = _STANDALONE_CATEGORY.get(mode, Asset.Category.UNCATEGORIZED) task_type = _STANDALONE_TASK_TYPE.get(mode, AITask.Type.PRODUCT_IMAGE) count = max(1, min(int(count or 1), 4)) provider = VolcanoArkProvider(base_url=model_config.provider.base_url or None) assets: list[Asset] = [] for index in range(count): cost = estimate_cost(model_config) task = AITask.objects.create( team=team, created_by=user, project=None, task_type=task_type, status=AITask.Status.CREATED, model_config=model_config, idempotency_key=f"standalone-image:{team.id}:{uuid.uuid4()}", request_payload={"model": model_config.name, "endpoint": model_config.endpoint, "prompt": prompt, "mode": mode}, estimated_cost=cost, ) reserve_credit(team=team, user=user, task=task, amount=cost) task.status = AITask.Status.RESERVED task.save(update_fields=["status", "updated_at"]) reservation = task.credit_reservation try: response = provider.image_generation(model=model_config.name, endpoint=model_config.endpoint, prompt=prompt) media = provider.extract_first_media_url(response) with transaction.atomic(): task.status = AITask.Status.SUCCEEDED task.response_payload = response task.actual_cost = task.estimated_cost task.completed_at = timezone.now() task.save(update_fields=["status", "response_payload", "actual_cost", "completed_at", "updated_at"]) charge_reserved_credit(reservation=reservation, actual_amount=task.actual_cost) fileobj, content_type = VolcanoArkProvider.media_to_bytes(media) suffix = ".jpg" if "jpeg" in content_type else (".webp" if "webp" in content_type else ".png") asset_id = uuid.uuid4() object_key = f"teams/{team.id}/standalone/{asset_id}{suffix}" stored = TosStorage().upload_fileobj(fileobj=fileobj, object_key=object_key, content_type=content_type) asset = Asset.objects.create( id=asset_id, team=team, created_by=user, name=f"AI 生成 · {mode} · {index + 1}", asset_type=Asset.Type.IMAGE, source=Asset.Source.AI_GENERATED, category=category, origin_task=task, ) AssetFile.objects.create(asset=asset, object_key=stored.object_key, bucket=stored.bucket, content_type=stored.content_type, size_bytes=stored.size_bytes, is_primary=True) assets.append(asset) except Exception as exc: task.status = AITask.Status.FAILED task.error_message = str(exc) task.completed_at = timezone.now() task.save(update_fields=["status", "error_message", "completed_at", "updated_at"]) release_credit(reservation=reservation, reason=str(exc)) raise return assets