# 音频服务集成指南 本文档详细说明QY LTY Backend系统中音频服务的集成架构、配置方法和使用指南。 ## 1. 音频服务架构概览 系统支持多个音频服务提供商,采用适配器模式实现统一接口: ``` Audio Service Architecture: ┌─────────────────────────────────────────────┐ │ Audio Service Layer │ │ ┌─────────────────────────────────────────┐ │ │ │ Service Factory │ │ │ │ ┌─────────────────────────────────┐ │ │ │ │ │ get_audio_service() │ │ │ │ │ │ - Provider Selection │ │ │ │ │ │ - Configuration Loading │ │ │ │ │ └─────────────────────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────┐ │ │ │ Base Audio Service │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ ASR │ │ TTS │ │ │ │ │ │(Speech2Text)│ │ (Text2Speech) │ │ │ │ │ │- Interface │ │ - Interface │ │ │ │ │ │- Validation │ │ - Validation │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────┐ │ │ │ Implementations │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Aliyun │ │ Volcengine │ │ │ │ │ │ NLS │ │ (Huoshan) │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ │ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Tencent │ │ Custom │ │ │ │ │ │ Cloud │ │ Provider │ │ │ │ │ └─────────────┘ └─────────────────────┘ │ │ │ └─────────────────────────────────────────┘ │ └─────────────────────────────────────────────┘ ``` ## 2. 支持的音频服务提供商 ### 2.1 火山引擎(Huoshan)- 推荐 **特点:** - **TTS质量**:高质量中文语音合成 - **延迟**:低延迟,实时性好 - **声音种类**:丰富的音色选择 - **价格**:性价比高 **配置示例:** ```bash # 环境变量 AUDIO_SERVICE_PROVIDER=huoshan AUDIO_SERVICE_HUOSHAN_APPID=your_app_id AUDIO_SERVICE_HUOSHAN_ACCESS_TOKEN=your_access_token AUDIO_SERVICE_HUOSHAN_CLUSTER=volcano_tts AUDIO_SERVICE_HUOSHAN_VOICE_TYPE=BV001_streaming AUDIO_SERVICE_HUOSHAN_STORAGE_DIR=/tmp/audio/ AUDIO_SERVICE_HUOSHAN_BASE_URL=https://openspeech.bytedance.com ``` ### 2.2 阿里云智能语音(Aliyun NLS) **特点:** - **全功能**:同时支持ASR和TTS - **稳定性**:企业级服务稳定性 - **多语言**:支持多种语言识别 - **集成度**:与阿里云生态深度集成 **配置示例:** ```bash # 环境变量 AUDIO_SERVICE_PROVIDER=aliyun ALIYUN_NLS_ACCESS_KEY_ID=your_access_key_id ALIYUN_NLS_ACCESS_KEY_SECRET=your_access_key_secret ALIYUN_NLS_APP_ID=your_app_id ALIYUN_OSS_ACCESS_KEY_ID=your_oss_key_id ALIYUN_OSS_ACCESS_KEY_SECRET=your_oss_key_secret ALIYUN_OSS_BUCKET=your_audio_bucket ALIYUN_OSS_ENDPOINT=oss-cn-beijing.aliyuncs.com ALIYUN_OSS_HOST=https://your-bucket.oss-cn-beijing.aliyuncs.com ALIYUN_OSS_AUDIO_BASE_DIR=audio/ ``` ### 2.3 腾讯云(Tencent) **特点:** - **性能**:高性能语音处理 - **准确率**:高识别准确率 - **实时性**:支持实时语音识别 **配置示例:** ```bash # 环境变量 AUDIO_SERVICE_PROVIDER=tencent AUDIO_SERVICE_TENCENT_API_KEY=your_api_key AUDIO_SERVICE_TENCENT_API_SECRET=your_api_secret ``` ## 3. 服务接口设计 ### 3.1 基础接口定义 ```python # aiapp/audio/AudioService.py from abc import ABC, abstractmethod from typing import Optional, Dict, Any class BaseAudioService(ABC): """音频服务基类""" @abstractmethod def text_to_speech( self, text: str, voice_type: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """ 文本转语音 Args: text: 要转换的文本 voice_type: 音色类型 **kwargs: 其他参数 Returns: { 'success': bool, 'audio_url': str, 'audio_data': bytes, 'duration': float, 'format': str, 'error': str } """ pass @abstractmethod def speech_to_text( self, audio_data: bytes, format: str = 'wav', sample_rate: int = 16000, **kwargs ) -> Dict[str, Any]: """ 语音转文本 Args: audio_data: 音频数据 format: 音频格式 sample_rate: 采样率 **kwargs: 其他参数 Returns: { 'success': bool, 'text': str, 'confidence': float, 'duration': float, 'error': str } """ pass ``` ### 3.2 服务工厂实现 ```python # aiapp/audio/AudioService.py from django.conf import settings from .AliyunAudioService import AliyunAudioService from .HuoshanAudioService import HuoshanAudioService from .TencentAudioService import TencentAudioService class AudioServiceFactory: """音频服务工厂类""" _instances = {} @classmethod def get_service(cls) -> BaseAudioService: """获取音频服务实例(单例模式)""" provider = settings.AUDIO_SERVICE_PROVIDER if provider not in cls._instances: cls._instances[provider] = cls._create_service(provider) return cls._instances[provider] @classmethod def _create_service(cls, provider: str) -> BaseAudioService: """创建音频服务实例""" services = { 'aliyun': AliyunAudioService, 'huoshan': HuoshanAudioService, 'tencent': TencentAudioService } if provider not in services: raise ValueError(f"不支持的音频服务提供商: {provider}") service_class = services[provider] config = settings.AUDIO_SERVICE_CONFIG.get(provider, {}) return service_class(config) # 便捷函数 def get_audio_service() -> BaseAudioService: """获取当前配置的音频服务""" return AudioServiceFactory.get_service() ``` ## 4. 具体实现详解 ### 4.1 火山引擎实现 ```python # aiapp/audio/HuoshanAudioService.py import json import requests import hashlib import time from typing import Dict, Any, Optional from .BaseAudioService import BaseAudioService class HuoshanAudioService(BaseAudioService): """火山引擎音频服务实现""" def __init__(self, config: Dict[str, Any]): self.appid = config.get('appid') self.access_token = config.get('access_token') self.cluster = config.get('cluster', 'volcano_tts') self.voice_type = config.get('voice_type', 'BV001_streaming') self.base_url = config.get('base_url', 'https://openspeech.bytedance.com') self.storage_dir = config.get('storage_dir', '/tmp/audio/') # 阿里云ASR配置(用于语音识别) self.aliyun_asr_config = config.get('aliyun_asr', {}) if self.aliyun_asr_config: from .AliyunAudioService import AliyunAudioService self.asr_service = AliyunAudioService(self.aliyun_asr_config) def text_to_speech( self, text: str, voice_type: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """火山引擎文本转语音""" try: # 准备请求参数 voice = voice_type or self.voice_type # 构建请求URL url = f"{self.base_url}/api/v1/tts" # 请求头 headers = { 'Authorization': f'Bearer {self.access_token}', 'Content-Type': 'application/json' } # 请求体 payload = { 'app': { 'appid': self.appid, 'cluster': self.cluster }, 'user': { 'uid': 'default_user' }, 'audio': { 'voice_type': voice, 'encoding': 'mp3', 'speed_ratio': kwargs.get('speed_ratio', 1.0), 'volume_ratio': kwargs.get('volume_ratio', 1.0), 'pitch_ratio': kwargs.get('pitch_ratio', 1.0) }, 'request': { 'reqid': self._generate_request_id(), 'text': text, 'text_type': 'plain', 'operation': 'submit' } } # 发送请求 response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code == 200: result = response.json() if result.get('code') == 0: audio_data = result.get('data', {}).get('audio') if audio_data: # 保存音频文件 filename = self._save_audio_file(audio_data, 'mp3') return { 'success': True, 'audio_url': filename, 'audio_data': audio_data, 'duration': result.get('data', {}).get('duration', 0), 'format': 'mp3', 'voice_type': voice } return { 'success': False, 'error': result.get('message', '未知错误') } else: return { 'success': False, 'error': f'HTTP {response.status_code}: {response.text}' } except Exception as e: return { 'success': False, 'error': str(e) } def speech_to_text( self, audio_data: bytes, format: str = 'wav', sample_rate: int = 16000, **kwargs ) -> Dict[str, Any]: """语音转文本(使用阿里云ASR)""" if hasattr(self, 'asr_service'): return self.asr_service.speech_to_text( audio_data, format, sample_rate, **kwargs ) else: return { 'success': False, 'error': '语音识别服务未配置' } def _generate_request_id(self) -> str: """生成请求ID""" timestamp = str(int(time.time() * 1000)) return hashlib.md5(timestamp.encode()).hexdigest() def _save_audio_file(self, audio_data: bytes, format: str) -> str: """保存音频文件""" import os import base64 # 确保存储目录存在 os.makedirs(self.storage_dir, exist_ok=True) # 生成文件名 timestamp = int(time.time() * 1000) filename = f"tts_{timestamp}.{format}" filepath = os.path.join(self.storage_dir, filename) # 解码并保存 if isinstance(audio_data, str): audio_bytes = base64.b64decode(audio_data) else: audio_bytes = audio_data with open(filepath, 'wb') as f: f.write(audio_bytes) return filepath ``` ### 4.2 阿里云实现 ```python # aiapp/audio/AliyunAudioService.py import json import base64 import time from typing import Dict, Any, Optional from .BaseAudioService import BaseAudioService class AliyunAudioService(BaseAudioService): """阿里云智能语音服务实现""" def __init__(self, config: Dict[str, Any]): self.access_key_id = config.get('api_key') self.access_key_secret = config.get('api_secret') self.app_id = config.get('app_id') # OSS配置 self.oss_config = { 'key_id': config.get('oss_key_id'), 'key_secret': config.get('oss_key_secret'), 'bucket': config.get('oss_bucket'), 'endpoint': config.get('oss_endpoint'), 'host': config.get('oss_host'), 'base_dir': config.get('oss_audio_base_dir', 'audio/') } def text_to_speech( self, text: str, voice_type: Optional[str] = None, **kwargs ) -> Dict[str, Any]: """阿里云文本转语音""" try: # 导入阿里云SDK from aliyunsdkcore.client import AcsClient from aliyunsdknls.request.v20180518 import CreateTTSTaskRequest # 创建客户端 client = AcsClient( self.access_key_id, self.access_key_secret, 'cn-shanghai' ) # 创建请求 request = CreateTTSTaskRequest.CreateTTSTaskRequest() request.set_Text(text) request.set_Voice(voice_type or 'xiaoyun') request.set_Format('mp3') request.set_SampleRate(16000) request.set_AppId(self.app_id) # 发送请求 response = client.do_action_with_exception(request) result = json.loads(response.decode('utf-8')) if 'TaskId' in result: # 轮询获取结果 task_result = self._poll_tts_result(client, result['TaskId']) if task_result['success']: # 下载音频文件 audio_url = task_result['audio_url'] audio_data = self._download_audio(audio_url) # 上传到OSS oss_url = self._upload_to_oss(audio_data, 'mp3') return { 'success': True, 'audio_url': oss_url, 'audio_data': audio_data, 'duration': task_result.get('duration', 0), 'format': 'mp3' } else: return task_result else: return { 'success': False, 'error': result.get('Message', '创建TTS任务失败') } except Exception as e: return { 'success': False, 'error': str(e) } def speech_to_text( self, audio_data: bytes, format: str = 'wav', sample_rate: int = 16000, **kwargs ) -> Dict[str, Any]: """阿里云语音识别""" try: # 使用实时语音识别 import websocket import threading # 上传音频文件到OSS audio_url = self._upload_to_oss(audio_data, format) # 使用一句话识别 from aliyunsdkcore.client import AcsClient from aliyunsdknls.request.v20180518 import CreateRecognitionTaskRequest client = AcsClient( self.access_key_id, self.access_key_secret, 'cn-shanghai' ) request = CreateRecognitionTaskRequest.CreateRecognitionTaskRequest() request.set_AppId(self.app_id) request.set_FileUrl(audio_url) request.set_EnablePunctuationPrediction(True) request.set_EnableInverseTextNormalization(True) request.set_EnableVoiceDetection(False) response = client.do_action_with_exception(request) result = json.loads(response.decode('utf-8')) if 'TaskId' in result: # 轮询获取识别结果 recognition_result = self._poll_asr_result(client, result['TaskId']) return recognition_result else: return { 'success': False, 'error': result.get('Message', '创建识别任务失败') } except Exception as e: return { 'success': False, 'error': str(e) } def _poll_tts_result(self, client, task_id: str) -> Dict[str, Any]: """轮询TTS结果""" # 实现轮询逻辑 pass def _poll_asr_result(self, client, task_id: str) -> Dict[str, Any]: """轮询ASR结果""" # 实现轮询逻辑 pass def _upload_to_oss(self, data: bytes, format: str) -> str: """上传到OSS""" import oss2 auth = oss2.Auth( self.oss_config['key_id'], self.oss_config['key_secret'] ) bucket = oss2.Bucket( auth, self.oss_config['endpoint'], self.oss_config['bucket'] ) # 生成文件名 timestamp = int(time.time() * 1000) filename = f"{self.oss_config['base_dir']}audio_{timestamp}.{format}" # 上传 bucket.put_object(filename, data) return f"{self.oss_config['host']}/{filename}" def _download_audio(self, url: str) -> bytes: """下载音频文件""" import requests response = requests.get(url) return response.content ``` ## 5. 使用示例 ### 5.1 在AI对话中使用 ```python # aiapp/views.py from .audio.AudioService import get_audio_service class ChatBotAPIView(APIView): def post(self, request, bot_id): input_type = request.data.get('input_type', 'text') output_type = request.data.get('output_type', 'text') # 处理语音输入 if input_type == 'audio': audio_file = request.FILES.get('audio_file') if audio_file: audio_service = get_audio_service() asr_result = audio_service.speech_to_text( audio_file.read(), format=audio_file.name.split('.')[-1] ) if asr_result['success']: message = asr_result['text'] else: return Response({ 'status': 'error', 'message': '语音识别失败', 'error': asr_result['error'] }, status=400) else: return Response({ 'status': 'error', 'message': '未提供音频文件' }, status=400) else: message = request.data.get('message') # AI处理 ai_response = self.process_ai_chat(message, bot_id) # 处理语音输出 response_data = { 'bot_id': bot_id, 'response': ai_response, 'input_type': input_type, 'output_type': output_type, 'audio_url': None } if output_type == 'audio': audio_service = get_audio_service() tts_result = audio_service.text_to_speech(ai_response) if tts_result['success']: response_data['audio_url'] = tts_result['audio_url'] response_data['audio_duration'] = tts_result.get('duration', 0) else: response_data['tts_error'] = tts_result['error'] return Response({ 'status': 'success', 'code': 200, 'data': response_data }) ``` ### 5.2 配置切换示例 ```python # 动态切换音频服务提供商 from django.conf import settings from aiapp.audio.AudioService import AudioServiceFactory def switch_audio_provider(provider: str): """动态切换音频服务提供商""" if provider in ['aliyun', 'huoshan', 'tencent']: # 更新设置 settings.AUDIO_SERVICE_PROVIDER = provider # 清除缓存的实例 AudioServiceFactory._instances.clear() return True return False # 使用示例 switch_audio_provider('huoshan') audio_service = get_audio_service() ``` ## 6. 性能优化 ### 6.1 音频文件缓存 ```python # 音频文件缓存策略 from django.core.cache import cache import hashlib def get_cached_tts(text: str, voice_type: str) -> Optional[str]: """获取缓存的TTS结果""" cache_key = f"tts:{hashlib.md5(f'{text}:{voice_type}'.encode()).hexdigest()}" return cache.get(cache_key) def cache_tts_result(text: str, voice_type: str, audio_url: str, duration: int = 3600): """缓存TTS结果""" cache_key = f"tts:{hashlib.md5(f'{text}:{voice_type}'.encode()).hexdigest()}" cache.set(cache_key, audio_url, duration) ``` ### 6.2 异步处理 ```python # 异步音频处理 import asyncio from concurrent.futures import ThreadPoolExecutor class AsyncAudioService: def __init__(self): self.executor = ThreadPoolExecutor(max_workers=5) async def async_text_to_speech(self, text: str, voice_type: str = None): """异步文本转语音""" loop = asyncio.get_event_loop() audio_service = get_audio_service() result = await loop.run_in_executor( self.executor, audio_service.text_to_speech, text, voice_type ) return result ``` ## 7. 错误处理和重试 ### 7.1 重试机制 ```python # 音频服务重试装饰器 import time from functools import wraps def retry_on_failure(max_retries: int = 3, delay: float = 1.0): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: result = func(*args, **kwargs) if result.get('success', False): return result else: if attempt < max_retries - 1: time.sleep(delay * (2 ** attempt)) # 指数退避 else: return result except Exception as e: if attempt < max_retries - 1: time.sleep(delay * (2 ** attempt)) else: return { 'success': False, 'error': str(e) } return { 'success': False, 'error': '重试次数超限' } return wrapper return decorator # 应用重试机制 class HuoshanAudioService(BaseAudioService): @retry_on_failure(max_retries=3, delay=1.0) def text_to_speech(self, text: str, voice_type: str = None, **kwargs): # 原始实现 pass ``` ### 7.2 服务降级 ```python # 服务降级策略 class FallbackAudioService: def __init__(self, primary_provider: str, fallback_provider: str): self.primary = AudioServiceFactory._create_service(primary_provider) self.fallback = AudioServiceFactory._create_service(fallback_provider) def text_to_speech(self, text: str, **kwargs): """主服务失败时自动切换到备用服务""" # 尝试主服务 result = self.primary.text_to_speech(text, **kwargs) if result['success']: return result # 主服务失败,使用备用服务 logger.warning(f"主音频服务失败: {result['error']}, 切换到备用服务") fallback_result = self.fallback.text_to_speech(text, **kwargs) if fallback_result['success']: fallback_result['fallback_used'] = True return fallback_result ``` ## 8. 监控和日志 ### 8.1 音频服务监控 ```python # 音频服务监控 import logging from django.utils import timezone logger = logging.getLogger('aiapp') class AudioServiceMonitor: @staticmethod def log_tts_request(text: str, provider: str, success: bool, duration: float, error: str = None): """记录TTS请求日志""" logger.info({ 'event': 'tts_request', 'provider': provider, 'text_length': len(text), 'success': success, 'duration': duration, 'error': error, 'timestamp': timezone.now().isoformat() }) @staticmethod def log_asr_request(audio_size: int, provider: str, success: bool, confidence: float = None, error: str = None): """记录ASR请求日志""" logger.info({ 'event': 'asr_request', 'provider': provider, 'audio_size': audio_size, 'success': success, 'confidence': confidence, 'error': error, 'timestamp': timezone.now().isoformat() }) ``` ### 8.2 性能指标收集 ```python # 性能指标收集 from django.core.cache import cache from collections import defaultdict class AudioMetrics: @staticmethod def record_latency(provider: str, service_type: str, latency: float): """记录服务延迟""" cache_key = f"audio_metrics:{provider}:{service_type}:latency" latencies = cache.get(cache_key, []) latencies.append(latency) # 保留最近100次记录 if len(latencies) > 100: latencies = latencies[-100:] cache.set(cache_key, latencies, 3600) @staticmethod def get_avg_latency(provider: str, service_type: str) -> float: """获取平均延迟""" cache_key = f"audio_metrics:{provider}:{service_type}:latency" latencies = cache.get(cache_key, []) if latencies: return sum(latencies) / len(latencies) return 0.0 ``` ## 9. 最佳实践 ### 9.1 音频文件处理 ```python # 音频文件格式转换 import subprocess import tempfile def convert_audio_format(input_data: bytes, input_format: str, output_format: str) -> bytes: """使用FFmpeg转换音频格式""" with tempfile.NamedTemporaryFile(suffix=f'.{input_format}') as input_file: with tempfile.NamedTemporaryFile(suffix=f'.{output_format}') as output_file: # 写入输入文件 input_file.write(input_data) input_file.flush() # 转换格式 subprocess.run([ 'ffmpeg', '-i', input_file.name, '-ar', '16000', # 采样率 '-ac', '1', # 单声道 output_file.name ], check=True, capture_output=True) # 读取输出文件 output_file.seek(0) return output_file.read() ``` ### 9.2 文本预处理 ```python # TTS文本预处理 import re def preprocess_tts_text(text: str) -> str: """TTS文本预处理""" # 移除特殊字符 text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?;:""''()【】]', '', text) # 限制长度 if len(text) > 500: text = text[:500] # 添加适当的停顿 text = text.replace('。', '。,') text = text.replace('!', '!,') text = text.replace('?', '?,') return text.strip() ``` ### 9.3 成本优化 ```python # 成本优化策略 class CostOptimizedAudioService: def __init__(self): self.providers = { 'cheap': 'tencent', # 成本最低 'balanced': 'huoshan', # 性价比 'premium': 'aliyun' # 质量最高 } def choose_provider(self, text: str, priority: str = 'balanced') -> str: """根据优先级选择服务提供商""" text_length = len(text) if priority == 'cost': return self.providers['cheap'] elif priority == 'quality': return self.providers['premium'] else: # 根据文本长度选择 if text_length < 100: return self.providers['premium'] else: return self.providers['balanced'] ``` 这个音频服务集成文档提供了完整的音频服务架构和实现指南,帮助开发者理解和维护音频服务系统。