""" TTS 语音合成服务 + OSS 上传 """ import io import json import uuid import logging from datetime import datetime from django.conf import settings logger = logging.getLogger(__name__) # TTS 提供商可在此切换,当前预留 edge-tts(免费) TTS_VOICE = 'zh-CN-XiaoxiaoNeural' def generate_tts_stream(story): """ 为故事生成 TTS 音频并上传 OSS,通过 SSE 推送进度。 Args: story: Story model instance Yields: str: SSE 格式的事件数据行 """ yield _sse_event('stage', { 'stage': 'connecting', 'progress': 0, 'message': '正在准备语音合成...', }) try: import edge_tts except ImportError: yield _sse_event('error', {'message': 'edge-tts 库未安装,请运行 pip install edge-tts'}) return # 如果已有音频,直接返回 if story.audio_url: yield _sse_event('done', { 'stage': 'done', 'progress': 100, 'message': '音频已存在', 'audio_url': story.audio_url, }) return yield _sse_event('stage', { 'stage': 'generating', 'progress': 10, 'message': '正在合成语音...', }) try: # edge-tts 是异步的,需要在同步上下文中运行 import asyncio audio_data = asyncio.run(_synthesize(story.content)) except Exception as e: logger.error(f'TTS synthesis failed: {e}') yield _sse_event('error', {'message': f'语音合成失败: {str(e)}'}) return yield _sse_event('stage', { 'stage': 'saving', 'progress': 70, 'message': '正在保存音频文件...', }) # 上传到 OSS try: from utils.oss import get_oss_client oss_client = get_oss_client() filename = f"{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}.mp3" key = f"stories/audio/{filename}" oss_client.bucket.put_object(key, audio_data) oss_config = settings.ALIYUN_OSS if oss_config.get('CUSTOM_DOMAIN'): audio_url = f"https://{oss_config['CUSTOM_DOMAIN']}/{key}" else: audio_url = f"https://{oss_config['BUCKET_NAME']}.{oss_config['ENDPOINT']}/{key}" # 更新故事记录 story.audio_url = audio_url story.save(update_fields=['audio_url']) except Exception as e: logger.error(f'OSS upload failed: {e}') yield _sse_event('error', {'message': f'音频上传失败: {str(e)}'}) return yield _sse_event('done', { 'stage': 'done', 'progress': 100, 'message': '语音合成完成!', 'audio_url': audio_url, }) async def _synthesize(text): """使用 edge-tts 合成语音,返回音频 bytes""" import edge_tts communicate = edge_tts.Communicate(text, TTS_VOICE) audio_chunks = [] async for chunk in communicate.stream(): if chunk['type'] == 'audio': audio_chunks.append(chunk['data']) return b''.join(audio_chunks) def _sse_event(event, data): """格式化 SSE 事件""" return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"