30 KiB
30 KiB
音频服务集成指南
本文档详细说明QY LTY Backend系统中音频服务的集成架构、配置方法和使用指南。
1. 音频服务架构概览
系统支持多个音频服务提供商,采用适配器模式实现统一接口:
Audio Service Architecture:
┌─────────────────────────────────────────────┐
│ Audio Service Layer │
│ ┌─────────────────────────────────────────┐ │
│ │ Service Factory │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ get_audio_service() │ │ │
│ │ │ - Provider Selection │ │ │
│ │ │ - Configuration Loading │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────┐ │
│ │ Base Audio Service │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ ASR │ │ TTS │ │ │
│ │ │(Speech2Text)│ │ (Text2Speech) │ │ │
│ │ │- Interface │ │ - Interface │ │ │
│ │ │- Validation │ │ - Validation │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────┐ │
│ │ Implementations │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Aliyun │ │ Volcengine │ │ │
│ │ │ NLS │ │ (Huoshan) │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Tencent │ │ Custom │ │ │
│ │ │ Cloud │ │ Provider │ │ │
│ │ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
2. 支持的音频服务提供商
2.1 火山引擎(Huoshan)- 推荐
特点:
- TTS质量:高质量中文语音合成
- 延迟:低延迟,实时性好
- 声音种类:丰富的音色选择
- 价格:性价比高
配置示例:
# 环境变量
AUDIO_SERVICE_PROVIDER=huoshan
AUDIO_SERVICE_HUOSHAN_APPID=your_app_id
AUDIO_SERVICE_HUOSHAN_ACCESS_TOKEN=your_access_token
AUDIO_SERVICE_HUOSHAN_CLUSTER=volcano_tts
AUDIO_SERVICE_HUOSHAN_VOICE_TYPE=BV001_streaming
AUDIO_SERVICE_HUOSHAN_STORAGE_DIR=/tmp/audio/
AUDIO_SERVICE_HUOSHAN_BASE_URL=https://openspeech.bytedance.com
2.2 阿里云智能语音(Aliyun NLS)
特点:
- 全功能:同时支持ASR和TTS
- 稳定性:企业级服务稳定性
- 多语言:支持多种语言识别
- 集成度:与阿里云生态深度集成
配置示例:
# 环境变量
AUDIO_SERVICE_PROVIDER=aliyun
ALIYUN_NLS_ACCESS_KEY_ID=your_access_key_id
ALIYUN_NLS_ACCESS_KEY_SECRET=your_access_key_secret
ALIYUN_NLS_APP_ID=your_app_id
ALIYUN_OSS_ACCESS_KEY_ID=your_oss_key_id
ALIYUN_OSS_ACCESS_KEY_SECRET=your_oss_key_secret
ALIYUN_OSS_BUCKET=your_audio_bucket
ALIYUN_OSS_ENDPOINT=oss-cn-beijing.aliyuncs.com
ALIYUN_OSS_HOST=https://your-bucket.oss-cn-beijing.aliyuncs.com
ALIYUN_OSS_AUDIO_BASE_DIR=audio/
2.3 腾讯云(Tencent)
特点:
- 性能:高性能语音处理
- 准确率:高识别准确率
- 实时性:支持实时语音识别
配置示例:
# 环境变量
AUDIO_SERVICE_PROVIDER=tencent
AUDIO_SERVICE_TENCENT_API_KEY=your_api_key
AUDIO_SERVICE_TENCENT_API_SECRET=your_api_secret
3. 服务接口设计
3.1 基础接口定义
# aiapp/audio/AudioService.py
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
class BaseAudioService(ABC):
"""音频服务基类"""
@abstractmethod
def text_to_speech(
self,
text: str,
voice_type: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""
文本转语音
Args:
text: 要转换的文本
voice_type: 音色类型
**kwargs: 其他参数
Returns:
{
'success': bool,
'audio_url': str,
'audio_data': bytes,
'duration': float,
'format': str,
'error': str
}
"""
pass
@abstractmethod
def speech_to_text(
self,
audio_data: bytes,
format: str = 'wav',
sample_rate: int = 16000,
**kwargs
) -> Dict[str, Any]:
"""
语音转文本
Args:
audio_data: 音频数据
format: 音频格式
sample_rate: 采样率
**kwargs: 其他参数
Returns:
{
'success': bool,
'text': str,
'confidence': float,
'duration': float,
'error': str
}
"""
pass
3.2 服务工厂实现
# aiapp/audio/AudioService.py
from django.conf import settings
from .AliyunAudioService import AliyunAudioService
from .HuoshanAudioService import HuoshanAudioService
from .TencentAudioService import TencentAudioService
class AudioServiceFactory:
"""音频服务工厂类"""
_instances = {}
@classmethod
def get_service(cls) -> BaseAudioService:
"""获取音频服务实例(单例模式)"""
provider = settings.AUDIO_SERVICE_PROVIDER
if provider not in cls._instances:
cls._instances[provider] = cls._create_service(provider)
return cls._instances[provider]
@classmethod
def _create_service(cls, provider: str) -> BaseAudioService:
"""创建音频服务实例"""
services = {
'aliyun': AliyunAudioService,
'huoshan': HuoshanAudioService,
'tencent': TencentAudioService
}
if provider not in services:
raise ValueError(f"不支持的音频服务提供商: {provider}")
service_class = services[provider]
config = settings.AUDIO_SERVICE_CONFIG.get(provider, {})
return service_class(config)
# 便捷函数
def get_audio_service() -> BaseAudioService:
"""获取当前配置的音频服务"""
return AudioServiceFactory.get_service()
4. 具体实现详解
4.1 火山引擎实现
# aiapp/audio/HuoshanAudioService.py
import json
import requests
import hashlib
import time
from typing import Dict, Any, Optional
from .BaseAudioService import BaseAudioService
class HuoshanAudioService(BaseAudioService):
"""火山引擎音频服务实现"""
def __init__(self, config: Dict[str, Any]):
self.appid = config.get('appid')
self.access_token = config.get('access_token')
self.cluster = config.get('cluster', 'volcano_tts')
self.voice_type = config.get('voice_type', 'BV001_streaming')
self.base_url = config.get('base_url', 'https://openspeech.bytedance.com')
self.storage_dir = config.get('storage_dir', '/tmp/audio/')
# 阿里云ASR配置(用于语音识别)
self.aliyun_asr_config = config.get('aliyun_asr', {})
if self.aliyun_asr_config:
from .AliyunAudioService import AliyunAudioService
self.asr_service = AliyunAudioService(self.aliyun_asr_config)
def text_to_speech(
self,
text: str,
voice_type: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""火山引擎文本转语音"""
try:
# 准备请求参数
voice = voice_type or self.voice_type
# 构建请求URL
url = f"{self.base_url}/api/v1/tts"
# 请求头
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
# 请求体
payload = {
'app': {
'appid': self.appid,
'cluster': self.cluster
},
'user': {
'uid': 'default_user'
},
'audio': {
'voice_type': voice,
'encoding': 'mp3',
'speed_ratio': kwargs.get('speed_ratio', 1.0),
'volume_ratio': kwargs.get('volume_ratio', 1.0),
'pitch_ratio': kwargs.get('pitch_ratio', 1.0)
},
'request': {
'reqid': self._generate_request_id(),
'text': text,
'text_type': 'plain',
'operation': 'submit'
}
}
# 发送请求
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
audio_data = result.get('data', {}).get('audio')
if audio_data:
# 保存音频文件
filename = self._save_audio_file(audio_data, 'mp3')
return {
'success': True,
'audio_url': filename,
'audio_data': audio_data,
'duration': result.get('data', {}).get('duration', 0),
'format': 'mp3',
'voice_type': voice
}
return {
'success': False,
'error': result.get('message', '未知错误')
}
else:
return {
'success': False,
'error': f'HTTP {response.status_code}: {response.text}'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def speech_to_text(
self,
audio_data: bytes,
format: str = 'wav',
sample_rate: int = 16000,
**kwargs
) -> Dict[str, Any]:
"""语音转文本(使用阿里云ASR)"""
if hasattr(self, 'asr_service'):
return self.asr_service.speech_to_text(
audio_data, format, sample_rate, **kwargs
)
else:
return {
'success': False,
'error': '语音识别服务未配置'
}
def _generate_request_id(self) -> str:
"""生成请求ID"""
timestamp = str(int(time.time() * 1000))
return hashlib.md5(timestamp.encode()).hexdigest()
def _save_audio_file(self, audio_data: bytes, format: str) -> str:
"""保存音频文件"""
import os
import base64
# 确保存储目录存在
os.makedirs(self.storage_dir, exist_ok=True)
# 生成文件名
timestamp = int(time.time() * 1000)
filename = f"tts_{timestamp}.{format}"
filepath = os.path.join(self.storage_dir, filename)
# 解码并保存
if isinstance(audio_data, str):
audio_bytes = base64.b64decode(audio_data)
else:
audio_bytes = audio_data
with open(filepath, 'wb') as f:
f.write(audio_bytes)
return filepath
4.2 阿里云实现
# aiapp/audio/AliyunAudioService.py
import json
import base64
import time
from typing import Dict, Any, Optional
from .BaseAudioService import BaseAudioService
class AliyunAudioService(BaseAudioService):
"""阿里云智能语音服务实现"""
def __init__(self, config: Dict[str, Any]):
self.access_key_id = config.get('api_key')
self.access_key_secret = config.get('api_secret')
self.app_id = config.get('app_id')
# OSS配置
self.oss_config = {
'key_id': config.get('oss_key_id'),
'key_secret': config.get('oss_key_secret'),
'bucket': config.get('oss_bucket'),
'endpoint': config.get('oss_endpoint'),
'host': config.get('oss_host'),
'base_dir': config.get('oss_audio_base_dir', 'audio/')
}
def text_to_speech(
self,
text: str,
voice_type: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""阿里云文本转语音"""
try:
# 导入阿里云SDK
from aliyunsdkcore.client import AcsClient
from aliyunsdknls.request.v20180518 import CreateTTSTaskRequest
# 创建客户端
client = AcsClient(
self.access_key_id,
self.access_key_secret,
'cn-shanghai'
)
# 创建请求
request = CreateTTSTaskRequest.CreateTTSTaskRequest()
request.set_Text(text)
request.set_Voice(voice_type or 'xiaoyun')
request.set_Format('mp3')
request.set_SampleRate(16000)
request.set_AppId(self.app_id)
# 发送请求
response = client.do_action_with_exception(request)
result = json.loads(response.decode('utf-8'))
if 'TaskId' in result:
# 轮询获取结果
task_result = self._poll_tts_result(client, result['TaskId'])
if task_result['success']:
# 下载音频文件
audio_url = task_result['audio_url']
audio_data = self._download_audio(audio_url)
# 上传到OSS
oss_url = self._upload_to_oss(audio_data, 'mp3')
return {
'success': True,
'audio_url': oss_url,
'audio_data': audio_data,
'duration': task_result.get('duration', 0),
'format': 'mp3'
}
else:
return task_result
else:
return {
'success': False,
'error': result.get('Message', '创建TTS任务失败')
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def speech_to_text(
self,
audio_data: bytes,
format: str = 'wav',
sample_rate: int = 16000,
**kwargs
) -> Dict[str, Any]:
"""阿里云语音识别"""
try:
# 使用实时语音识别
import websocket
import threading
# 上传音频文件到OSS
audio_url = self._upload_to_oss(audio_data, format)
# 使用一句话识别
from aliyunsdkcore.client import AcsClient
from aliyunsdknls.request.v20180518 import CreateRecognitionTaskRequest
client = AcsClient(
self.access_key_id,
self.access_key_secret,
'cn-shanghai'
)
request = CreateRecognitionTaskRequest.CreateRecognitionTaskRequest()
request.set_AppId(self.app_id)
request.set_FileUrl(audio_url)
request.set_EnablePunctuationPrediction(True)
request.set_EnableInverseTextNormalization(True)
request.set_EnableVoiceDetection(False)
response = client.do_action_with_exception(request)
result = json.loads(response.decode('utf-8'))
if 'TaskId' in result:
# 轮询获取识别结果
recognition_result = self._poll_asr_result(client, result['TaskId'])
return recognition_result
else:
return {
'success': False,
'error': result.get('Message', '创建识别任务失败')
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _poll_tts_result(self, client, task_id: str) -> Dict[str, Any]:
"""轮询TTS结果"""
# 实现轮询逻辑
pass
def _poll_asr_result(self, client, task_id: str) -> Dict[str, Any]:
"""轮询ASR结果"""
# 实现轮询逻辑
pass
def _upload_to_oss(self, data: bytes, format: str) -> str:
"""上传到OSS"""
import oss2
auth = oss2.Auth(
self.oss_config['key_id'],
self.oss_config['key_secret']
)
bucket = oss2.Bucket(
auth,
self.oss_config['endpoint'],
self.oss_config['bucket']
)
# 生成文件名
timestamp = int(time.time() * 1000)
filename = f"{self.oss_config['base_dir']}audio_{timestamp}.{format}"
# 上传
bucket.put_object(filename, data)
return f"{self.oss_config['host']}/{filename}"
def _download_audio(self, url: str) -> bytes:
"""下载音频文件"""
import requests
response = requests.get(url)
return response.content
5. 使用示例
5.1 在AI对话中使用
# aiapp/views.py
from .audio.AudioService import get_audio_service
class ChatBotAPIView(APIView):
def post(self, request, bot_id):
input_type = request.data.get('input_type', 'text')
output_type = request.data.get('output_type', 'text')
# 处理语音输入
if input_type == 'audio':
audio_file = request.FILES.get('audio_file')
if audio_file:
audio_service = get_audio_service()
asr_result = audio_service.speech_to_text(
audio_file.read(),
format=audio_file.name.split('.')[-1]
)
if asr_result['success']:
message = asr_result['text']
else:
return Response({
'status': 'error',
'message': '语音识别失败',
'error': asr_result['error']
}, status=400)
else:
return Response({
'status': 'error',
'message': '未提供音频文件'
}, status=400)
else:
message = request.data.get('message')
# AI处理
ai_response = self.process_ai_chat(message, bot_id)
# 处理语音输出
response_data = {
'bot_id': bot_id,
'response': ai_response,
'input_type': input_type,
'output_type': output_type,
'audio_url': None
}
if output_type == 'audio':
audio_service = get_audio_service()
tts_result = audio_service.text_to_speech(ai_response)
if tts_result['success']:
response_data['audio_url'] = tts_result['audio_url']
response_data['audio_duration'] = tts_result.get('duration', 0)
else:
response_data['tts_error'] = tts_result['error']
return Response({
'status': 'success',
'code': 200,
'data': response_data
})
5.2 配置切换示例
# 动态切换音频服务提供商
from django.conf import settings
from aiapp.audio.AudioService import AudioServiceFactory
def switch_audio_provider(provider: str):
"""动态切换音频服务提供商"""
if provider in ['aliyun', 'huoshan', 'tencent']:
# 更新设置
settings.AUDIO_SERVICE_PROVIDER = provider
# 清除缓存的实例
AudioServiceFactory._instances.clear()
return True
return False
# 使用示例
switch_audio_provider('huoshan')
audio_service = get_audio_service()
6. 性能优化
6.1 音频文件缓存
# 音频文件缓存策略
from django.core.cache import cache
import hashlib
def get_cached_tts(text: str, voice_type: str) -> Optional[str]:
"""获取缓存的TTS结果"""
cache_key = f"tts:{hashlib.md5(f'{text}:{voice_type}'.encode()).hexdigest()}"
return cache.get(cache_key)
def cache_tts_result(text: str, voice_type: str, audio_url: str, duration: int = 3600):
"""缓存TTS结果"""
cache_key = f"tts:{hashlib.md5(f'{text}:{voice_type}'.encode()).hexdigest()}"
cache.set(cache_key, audio_url, duration)
6.2 异步处理
# 异步音频处理
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncAudioService:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=5)
async def async_text_to_speech(self, text: str, voice_type: str = None):
"""异步文本转语音"""
loop = asyncio.get_event_loop()
audio_service = get_audio_service()
result = await loop.run_in_executor(
self.executor,
audio_service.text_to_speech,
text,
voice_type
)
return result
7. 错误处理和重试
7.1 重试机制
# 音频服务重试装饰器
import time
from functools import wraps
def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
if result.get('success', False):
return result
else:
if attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt)) # 指数退避
else:
return result
except Exception as e:
if attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt))
else:
return {
'success': False,
'error': str(e)
}
return {
'success': False,
'error': '重试次数超限'
}
return wrapper
return decorator
# 应用重试机制
class HuoshanAudioService(BaseAudioService):
@retry_on_failure(max_retries=3, delay=1.0)
def text_to_speech(self, text: str, voice_type: str = None, **kwargs):
# 原始实现
pass
7.2 服务降级
# 服务降级策略
class FallbackAudioService:
def __init__(self, primary_provider: str, fallback_provider: str):
self.primary = AudioServiceFactory._create_service(primary_provider)
self.fallback = AudioServiceFactory._create_service(fallback_provider)
def text_to_speech(self, text: str, **kwargs):
"""主服务失败时自动切换到备用服务"""
# 尝试主服务
result = self.primary.text_to_speech(text, **kwargs)
if result['success']:
return result
# 主服务失败,使用备用服务
logger.warning(f"主音频服务失败: {result['error']}, 切换到备用服务")
fallback_result = self.fallback.text_to_speech(text, **kwargs)
if fallback_result['success']:
fallback_result['fallback_used'] = True
return fallback_result
8. 监控和日志
8.1 音频服务监控
# 音频服务监控
import logging
from django.utils import timezone
logger = logging.getLogger('aiapp')
class AudioServiceMonitor:
@staticmethod
def log_tts_request(text: str, provider: str, success: bool, duration: float, error: str = None):
"""记录TTS请求日志"""
logger.info({
'event': 'tts_request',
'provider': provider,
'text_length': len(text),
'success': success,
'duration': duration,
'error': error,
'timestamp': timezone.now().isoformat()
})
@staticmethod
def log_asr_request(audio_size: int, provider: str, success: bool, confidence: float = None, error: str = None):
"""记录ASR请求日志"""
logger.info({
'event': 'asr_request',
'provider': provider,
'audio_size': audio_size,
'success': success,
'confidence': confidence,
'error': error,
'timestamp': timezone.now().isoformat()
})
8.2 性能指标收集
# 性能指标收集
from django.core.cache import cache
from collections import defaultdict
class AudioMetrics:
@staticmethod
def record_latency(provider: str, service_type: str, latency: float):
"""记录服务延迟"""
cache_key = f"audio_metrics:{provider}:{service_type}:latency"
latencies = cache.get(cache_key, [])
latencies.append(latency)
# 保留最近100次记录
if len(latencies) > 100:
latencies = latencies[-100:]
cache.set(cache_key, latencies, 3600)
@staticmethod
def get_avg_latency(provider: str, service_type: str) -> float:
"""获取平均延迟"""
cache_key = f"audio_metrics:{provider}:{service_type}:latency"
latencies = cache.get(cache_key, [])
if latencies:
return sum(latencies) / len(latencies)
return 0.0
9. 最佳实践
9.1 音频文件处理
# 音频文件格式转换
import subprocess
import tempfile
def convert_audio_format(input_data: bytes, input_format: str, output_format: str) -> bytes:
"""使用FFmpeg转换音频格式"""
with tempfile.NamedTemporaryFile(suffix=f'.{input_format}') as input_file:
with tempfile.NamedTemporaryFile(suffix=f'.{output_format}') as output_file:
# 写入输入文件
input_file.write(input_data)
input_file.flush()
# 转换格式
subprocess.run([
'ffmpeg',
'-i', input_file.name,
'-ar', '16000', # 采样率
'-ac', '1', # 单声道
output_file.name
], check=True, capture_output=True)
# 读取输出文件
output_file.seek(0)
return output_file.read()
9.2 文本预处理
# TTS文本预处理
import re
def preprocess_tts_text(text: str) -> str:
"""TTS文本预处理"""
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?;:""''()【】]', '', text)
# 限制长度
if len(text) > 500:
text = text[:500]
# 添加适当的停顿
text = text.replace('。', '。,')
text = text.replace('!', '!,')
text = text.replace('?', '?,')
return text.strip()
9.3 成本优化
# 成本优化策略
class CostOptimizedAudioService:
def __init__(self):
self.providers = {
'cheap': 'tencent', # 成本最低
'balanced': 'huoshan', # 性价比
'premium': 'aliyun' # 质量最高
}
def choose_provider(self, text: str, priority: str = 'balanced') -> str:
"""根据优先级选择服务提供商"""
text_length = len(text)
if priority == 'cost':
return self.providers['cheap']
elif priority == 'quality':
return self.providers['premium']
else:
# 根据文本长度选择
if text_length < 100:
return self.providers['premium']
else:
return self.providers['balanced']
这个音频服务集成文档提供了完整的音频服务架构和实现指南,帮助开发者理解和维护音频服务系统。