""" 批量将已有音乐的 MP3 音频预转码为 Opus 帧 JSON 并上传 OSS。 使用方法: python manage.py convert_tracks_to_opus python manage.py convert_tracks_to_opus --dry-run # 仅统计,不转码 python manage.py convert_tracks_to_opus --limit 10 # 只处理前 10 个 python manage.py convert_tracks_to_opus --force # 重新转码已有 opus_url 的曲目 python manage.py convert_tracks_to_opus --default # 仅处理系统默认曲目 """ import uuid import logging from datetime import datetime import requests from django.conf import settings from django.core.management.base import BaseCommand from apps.music.models import Track from apps.stories.services.opus_converter import convert_mp3_to_opus_json logger = logging.getLogger(__name__) class Command(BaseCommand): help = '批量将已有音乐的 MP3 音频预转码为 Opus 帧 JSON' def add_arguments(self, parser): parser.add_argument( '--dry-run', action='store_true', help='仅统计需要转码的曲目数量,不实际执行', ) parser.add_argument( '--limit', type=int, default=0, help='最多处理的曲目数量(0=不限)', ) parser.add_argument( '--force', action='store_true', help='重新转码已有 opus_url 的曲目', ) parser.add_argument( '--default', action='store_true', help='仅处理系统默认曲目(is_default=True)', ) def handle(self, *args, **options): dry_run = options['dry_run'] limit = options['limit'] force = options['force'] default_only = options['default'] # 查找需要转码的曲目 qs = Track.objects.filter( generation_status='completed', ).exclude(audio_url='') if not force: qs = qs.filter(opus_url='') if default_only: qs = qs.filter(is_default=True) qs = qs.order_by('id') total = qs.count() self.stdout.write(f'需要转码的曲目: {total} 个') if dry_run: self.stdout.write(self.style.NOTICE('[dry-run] 仅统计,不执行转码')) return if total == 0: self.stdout.write(self.style.SUCCESS('所有曲目已转码,无需处理')) return # OSS 客户端 from utils.oss import get_oss_client oss_client = get_oss_client() oss_config = settings.ALIYUN_OSS if oss_config.get('CUSTOM_DOMAIN'): url_prefix = f"https://{oss_config['CUSTOM_DOMAIN']}" else: url_prefix = f"https://{oss_config['BUCKET_NAME']}.{oss_config['ENDPOINT']}" tracks = qs[:limit] if limit > 0 else qs success_count = 0 fail_count = 0 for i, track in enumerate(tracks.iterator(), 1): self.stdout.write(f'\n[{i}/{total}] Track#{track.id} "{track.title}"') self.stdout.write(f' MP3: {track.audio_url[:80]}...') try: # 下载 MP3 resp = requests.get(track.audio_url, timeout=60) resp.raise_for_status() mp3_bytes = resp.content self.stdout.write(f' MP3 大小: {len(mp3_bytes) / 1024:.1f} KB') # 转码 opus_json = convert_mp3_to_opus_json(mp3_bytes) self.stdout.write(f' Opus JSON 大小: {len(opus_json) / 1024:.1f} KB') # 上传 OSS opus_filename = f"{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}.json" opus_key = f"music/audio-opus/{opus_filename}" oss_client.bucket.put_object(opus_key, opus_json.encode('utf-8')) opus_url = f"{url_prefix}/{opus_key}" track.opus_url = opus_url track.save(update_fields=['opus_url']) success_count += 1 self.stdout.write(self.style.SUCCESS(f' OK: {opus_url}')) except Exception as e: fail_count += 1 self.stdout.write(self.style.ERROR(f' FAIL: {e}')) logger.error(f'Track#{track.id} opus convert failed: {e}') self.stdout.write(f'\n{"=" * 40}') self.stdout.write(self.style.SUCCESS( f'完成: 成功 {success_count}, 失败 {fail_count}, 总计 {success_count + fail_count}' ))