from django.shortcuts import render from django.http import JsonResponse from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from .consumers import DeviceConsumer import json from django.views.decorators.http import require_http_methods from django.views.decorators.csrf import csrf_exempt from django.core.cache import cache from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated, AllowAny from userapp.authentication import RedisTokenAuthentication from .weather import QWeatherAPI, format_weather_data from common.VolcEngineAccessToken import VolcEngineAccessToken, PrivPublishStream, PrivSubscribeStream import time from django.conf import settings import uuid import datetime from .volcengine_api import update_voice_chat from .amap_api import search_nearby from .models import DeviceType, DeviceBatch, Device, UserDevice from aiapp.models import ChatMessage, Bot from userapp.models import ParadiseUser from .serializers import ( DeviceTypeSerializer, DeviceBatchSerializer, DeviceSerializer, DeviceCreateSerializer, DeviceBatchCreateSerializer, UserDeviceSerializer, DeviceBindSerializer, DeviceRegisterSerializer ) from django.db import transaction from django.utils import timezone from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from .swagger import ( device_type_schema, device_batch_schema, device_schema, device_create_schema, device_batch_create_schema, user_device_schema, device_bind_schema, token_generate_schema, token_response_schema, message_send_schema, success_response_schema, error_response_schema, device_type_param, device_batch_param, device_param, user_device_param, mac_address_param, expire_time_param ) # 引入标准化响应工具 from common.responses import success_response, error_response, created_response, not_found_response from common.swagger_utils import get_standardized_response_schema import logging logger = logging.getLogger(__name__) def _merge_subv_segments(segments): """合并字幕分片:自动检测累积式(每段以前段为前缀)vs 增量式(直接拼接)。 segments: 已按 seq 升序排好的 [{seq, text}, ...] """ if not segments: return '' if len(segments) == 1: return segments[0]['text'] is_cumulative = all( segments[i]['text'].startswith(segments[i - 1]['text']) for i in range(1, len(segments)) ) if is_cumulative: return segments[-1]['text'] return ''.join(s['text'] for s in segments) # Create your views here. # 设备相关视图集 class DeviceTypeViewSet(viewsets.ModelViewSet): """ 设备类型管理接口 提供设备类型的增删改查功能。 """ queryset = DeviceType.objects.all() serializer_class = DeviceTypeSerializer authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] @swagger_auto_schema( operation_summary="获取设备类型列表", operation_description="获取所有设备类型的列表", responses={200: openapi.Response('成功', device_type_schema)} ) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @swagger_auto_schema( operation_summary="创建设备类型", operation_description="创建新的设备类型", request_body=device_type_schema, responses={201: openapi.Response('创建成功', device_type_schema)} ) def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) @swagger_auto_schema( operation_summary="获取设备类型详情", operation_description="根据ID获取设备类型详情", manual_parameters=[device_type_param], responses={200: openapi.Response('成功', device_type_schema)} ) def retrieve(self, request, *args, **kwargs): return super().retrieve(request, *args, **kwargs) @swagger_auto_schema( operation_summary="更新设备类型", operation_description="更新现有设备类型", manual_parameters=[device_type_param], request_body=device_type_schema, responses={200: openapi.Response('更新成功', device_type_schema)} ) def update(self, request, *args, **kwargs): return super().update(request, *args, **kwargs) @swagger_auto_schema( operation_summary="删除设备类型", operation_description="删除现有设备类型", manual_parameters=[device_type_param], responses={204: "删除成功"} ) def destroy(self, request, *args, **kwargs): return super().destroy(request, *args, **kwargs) class DeviceBatchViewSet(viewsets.ModelViewSet): """ 设备批次管理接口 提供设备批次的增删改查功能。 """ queryset = DeviceBatch.objects.all() serializer_class = DeviceBatchSerializer authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] @swagger_auto_schema( operation_summary="获取设备批次列表", operation_description="获取所有设备批次的列表", responses={200: openapi.Response('成功', device_batch_schema)} ) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @swagger_auto_schema( operation_summary="创建设备批次", operation_description="创建新的设备批次", request_body=device_batch_schema, responses={201: openapi.Response('创建成功', device_batch_schema)} ) def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) @swagger_auto_schema( operation_summary="获取设备批次详情", operation_description="根据ID获取设备批次详情", manual_parameters=[device_batch_param], responses={200: openapi.Response('成功', device_batch_schema)} ) def retrieve(self, request, *args, **kwargs): return super().retrieve(request, *args, **kwargs) @swagger_auto_schema( operation_summary="更新设备批次", operation_description="更新现有设备批次", manual_parameters=[device_batch_param], request_body=device_batch_schema, responses={200: openapi.Response('更新成功', device_batch_schema)} ) def update(self, request, *args, **kwargs): return super().update(request, *args, **kwargs) @swagger_auto_schema( operation_summary="删除设备批次", operation_description="删除现有设备批次", manual_parameters=[device_batch_param], responses={204: "删除成功"} ) def destroy(self, request, *args, **kwargs): return super().destroy(request, *args, **kwargs) class DeviceViewSet(viewsets.ModelViewSet): """ 设备管理接口 提供设备的增删改查、批量创建等功能。 """ queryset = Device.objects.all() serializer_class = DeviceSerializer authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] def get_serializer_class(self): if self.action == 'create': return DeviceCreateSerializer elif self.action == 'batch_create': return DeviceBatchCreateSerializer return self.serializer_class @swagger_auto_schema( operation_summary="获取设备列表", operation_description="获取所有设备的列表", responses={200: openapi.Response('成功', device_schema)} ) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @swagger_auto_schema( operation_summary="创建设备", operation_description="创建新的设备", request_body=device_create_schema, responses={201: openapi.Response('创建成功', device_schema)} ) def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) @swagger_auto_schema( operation_summary="获取设备详情", operation_description="根据ID获取设备详情", manual_parameters=[device_param], responses={200: openapi.Response('成功', device_schema)} ) def retrieve(self, request, *args, **kwargs): return super().retrieve(request, *args, **kwargs) @swagger_auto_schema( operation_summary="更新设备", operation_description="更新现有设备", manual_parameters=[device_param], request_body=device_schema, responses={200: openapi.Response('更新成功', device_schema)} ) def update(self, request, *args, **kwargs): return super().update(request, *args, **kwargs) @swagger_auto_schema( operation_summary="删除设备", operation_description="删除现有设备", manual_parameters=[device_param], responses={204: "删除成功"} ) def destroy(self, request, *args, **kwargs): return super().destroy(request, *args, **kwargs) @swagger_auto_schema( operation_summary="批量创建设备", operation_description="批量创建指定数量的设备,自动生成序列号和MAC地址", request_body=device_batch_create_schema, responses={ 200: openapi.Response('创建成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=False, methods=['post']) def batch_create(self, request): """ 批量创建设备 批量创建指定数量的设备,自动生成序列号和MAC地址。 参数: - device_type: 设备类型ID - batch: 批次ID - start_serial: 起始序列号 - count: 创建数量 - mac_prefix: MAC地址前缀(可选) 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: 创建的设备ID列表 """ serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) device_type = serializer.validated_data['device_type'] batch = serializer.validated_data['batch'] start_serial = serializer.validated_data['start_serial'] count = serializer.validated_data['count'] mac_prefix = serializer.validated_data.get('mac_prefix', '00:11:22') created_devices = [] try: with transaction.atomic(): # 解析起始序列号 try: start_num = int(start_serial) except ValueError: # 如果起始序列号不是纯数字,尝试提取末尾的数字部分 import re match = re.search(r'(\d+)$', start_serial) if match: num_part = match.group(1) prefix_part = start_serial[:-len(num_part)] start_num = int(num_part) else: return error_response(message='无法解析起始序列号中的数字部分', code=status.HTTP_400_BAD_REQUEST) # 批量创建设备 for i in range(count): # 生成序列号 if isinstance(start_num, int): serial_number = f"{prefix_part if 'prefix_part' in locals() else ''}{start_num + i}" else: serial_number = f"{start_serial}-{i+1}" # 生成MAC地址(简单示例,实际可能需要更复杂的逻辑) last_part = format(i % 0xFFFFFF, '06x') mac = f"{mac_prefix}:{last_part[:2]}:{last_part[2:4]}:{last_part[4:6]}" # 创建设备 device = Device( device_type=device_type, batch=batch, serial_number=serial_number, mac_address=mac ) device.save() created_devices.append(device.id) return success_response( data=created_devices, message=f'成功创建{count}个设备' ) except Exception as e: logger.error(f"Batch device creation failed: {str(e)}") return error_response( message=f'创建失败: {str(e)}', code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @swagger_auto_schema( operation_summary="更新设备状态", operation_description="更新设备的状态、电量、固件版本等信息", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'status': openapi.Schema(type=openapi.TYPE_STRING, description='设备状态 (connected/disconnected)'), 'battery_level': openapi.Schema(type=openapi.TYPE_INTEGER, description='电池电量百分比(0-100)'), 'firmware_version': openapi.Schema(type=openapi.TYPE_STRING, description='固件版本号'), 'wifi_name': openapi.Schema(type=openapi.TYPE_STRING, description='WiFi名称'), 'wifi_password': openapi.Schema(type=openapi.TYPE_STRING, description='WiFi密码'), 'brightness': openapi.Schema(type=openapi.TYPE_INTEGER, description='屏幕亮度(0-100)') } ), responses={ 200: openapi.Response('更新成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 404: openapi.Response('设备不存在', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=True, methods=['post']) def update_status(self, request, pk=None): """ 更新设备状态 更新设备的状态、电量、固件版本等信息。 参数: - status: 设备状态 (connected/disconnected),可选 - battery_level: 电池电量百分比(0-100),可选 - firmware_version: 固件版本号,可选 - wifi_name: WiFi名称,可选 - wifi_password: WiFi密码,可选 - brightness: 屏幕亮度(0-100),可选 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: 更新后的设备信息 """ try: device = self.get_object() # 更新设备状态 if 'status' in request.data and request.data['status'] in dict(Device.STATUS_CHOICES): device.status = request.data['status'] # 更新电量 if 'battery_level' in request.data: try: battery_level = int(request.data['battery_level']) if 0 <= battery_level <= 100: device.battery_level = battery_level except (ValueError, TypeError): pass # 更新固件版本 if 'firmware_version' in request.data: device.firmware_version = request.data['firmware_version'] # 更新WiFi设置 if 'wifi_name' in request.data: device.wifi_name = request.data['wifi_name'] if 'wifi_password' in request.data: device.wifi_password = request.data['wifi_password'] # 更新亮度 if 'brightness' in request.data: try: brightness = int(request.data['brightness']) if 0 <= brightness <= 100: device.brightness = brightness except (ValueError, TypeError): pass device.save() # 返回更新后的设备信息 return success_response( data=DeviceSerializer(device).data, message='设备状态更新成功' ) except Exception as e: logger.error(f"Failed to update device status: {str(e)}") return error_response( message=f'更新设备状态失败: {str(e)}', code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @swagger_auto_schema( operation_summary="查询设备绑定状态", operation_description="通过MAC地址查询设备是否已被用户绑定(不需要认证)", manual_parameters=[mac_address_param], responses={ 200: openapi.Response('查询成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 404: openapi.Response('设备不存在', get_standardized_response_schema()) } ) @action(detail=False, methods=['get'], permission_classes=[AllowAny], authentication_classes=[]) def bind_status(self, request): """ 查询设备绑定状态 通过MAC地址查询设备是否已被用户绑定。 此接口不需要认证,供设备端轮询使用。 """ mac_address = request.query_params.get('mac_address') if not mac_address: return error_response(message='MAC地址不能为空', code=status.HTTP_400_BAD_REQUEST) try: device = Device.objects.get(mac_address=mac_address) user_device = UserDevice.objects.filter(device=device).first() if user_device: return success_response( data={ 'bound': True, 'user_id': user_device.user.id, 'nickname': user_device.nickname or '', 'device_code': device.device_code, 'is_primary': user_device.is_primary }, message='设备已绑定' ) else: return success_response( data={ 'bound': False, 'device_code': device.device_code, 'mac_address': mac_address }, message='设备未绑定' ) except Device.DoesNotExist: return not_found_response(message='设备不存在') @swagger_auto_schema( operation_summary="设备自注册", operation_description="设备首次开机时自动注册(不需要认证)", request_body=openapi.Schema( type=openapi.TYPE_OBJECT, required=['mac_address'], properties={ 'mac_address': openapi.Schema(type=openapi.TYPE_STRING, description='设备MAC地址'), 'device_type_code': openapi.Schema(type=openapi.TYPE_STRING, description='设备类型代码,默认T01'), 'firmware_version': openapi.Schema(type=openapi.TYPE_STRING, description='固件版本号'), } ), responses={ 201: openapi.Response('注册成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), } ) @action(detail=False, methods=['post'], permission_classes=[AllowAny], authentication_classes=[]) def register(self, request): """ 设备自注册 设备首次开机时,如果MAC地址不存在则自动注册设备。 如果MAC地址已存在,返回已存在的设备信息。 此接口不需要认证。 """ mac_address = request.data.get('mac_address') if not mac_address: return error_response(message='MAC地址不能为空', code=status.HTTP_400_BAD_REQUEST) # 如果设备已存在,直接返回设备信息 existing_device = Device.objects.filter(mac_address=mac_address).first() if existing_device: return success_response( data={ 'device_code': existing_device.device_code, 'mac_address': existing_device.mac_address, 'is_new': False }, message='设备已注册' ) device_type_code = request.data.get('device_type_code', 'T01') firmware_version = request.data.get('firmware_version', '') try: device_type = DeviceType.objects.get(code=device_type_code) except DeviceType.DoesNotExist: return error_response(message=f'设备类型 {device_type_code} 不存在', code=status.HTTP_400_BAD_REQUEST) # 获取或创建一个默认批次 batch, _ = DeviceBatch.objects.get_or_create( device_type=device_type, batch_number=f'AUTO-{device_type_code}', defaults={ 'production_date': timezone.now().date(), 'quantity': 99999, 'description': '设备自动注册批次' } ) # 生成序列号 existing_count = Device.objects.filter(batch=batch).count() serial_number = f"{existing_count + 1:05d}" try: device = Device( device_type=device_type, batch=batch, serial_number=serial_number, mac_address=mac_address, firmware_version=firmware_version ) device.save() return created_response( data={ 'device_code': device.device_code, 'mac_address': device.mac_address, 'is_new': True }, message='设备注册成功' ) except Exception as e: logger.error(f"Device registration failed: {str(e)}") return error_response(message=f'注册失败: {str(e)}', code=status.HTTP_500_INTERNAL_SERVER_ERROR) class UserDeviceViewSet(viewsets.ModelViewSet): """ 用户设备管理接口 提供用户绑定设备、查询设备等功能。 """ serializer_class = UserDeviceSerializer authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] def get_queryset(self): # 检查用户是否已认证,如未认证则返回空查询集 if not self.request.user.is_authenticated: return UserDevice.objects.none() # 只返回当前用户的设备 return UserDevice.objects.filter(user=self.request.user) def perform_create(self, serializer): serializer.save(user=self.request.user) @swagger_auto_schema( operation_summary="获取用户设备列表", operation_description="获取当前用户的所有设备列表", responses={200: openapi.Response('成功', user_device_schema)} ) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) @swagger_auto_schema( operation_summary="创建用户设备关联", operation_description="创建用户与设备的关联", request_body=user_device_schema, responses={201: openapi.Response('创建成功', user_device_schema)} ) def create(self, request, *args, **kwargs): return super().create(request, *args, **kwargs) @swagger_auto_schema( operation_summary="获取用户设备详情", operation_description="根据ID获取用户设备关联详情", manual_parameters=[user_device_param], responses={200: openapi.Response('成功', user_device_schema)} ) def retrieve(self, request, *args, **kwargs): return super().retrieve(request, *args, **kwargs) @swagger_auto_schema( operation_summary="更新用户设备", operation_description="更新用户设备关联信息", manual_parameters=[user_device_param], request_body=user_device_schema, responses={200: openapi.Response('更新成功', user_device_schema)} ) def update(self, request, *args, **kwargs): return super().update(request, *args, **kwargs) @swagger_auto_schema( operation_summary="删除用户设备", operation_description="解除用户与设备的绑定关系", manual_parameters=[user_device_param], responses={204: "解绑成功"} ) def destroy(self, request, *args, **kwargs): return super().destroy(request, *args, **kwargs) @swagger_auto_schema( operation_summary="绑定设备", operation_description="通过设备MAC地址将设备绑定到当前用户", request_body=device_bind_schema, responses={ 200: openapi.Response('绑定成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 404: openapi.Response('设备不存在', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=False, methods=['post']) def bind(self, request): """ 绑定设备 通过设备MAC地址将设备绑定到当前用户。 参数: - mac_address: 设备MAC地址 - nickname: 设备昵称(可选) - is_primary: 是否为主要设备(可选) 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: 绑定的设备信息 """ serializer = DeviceBindSerializer(data=request.data) # 不使用raise_exception=True,手动处理验证错误并返回友好的错误消息 if not serializer.is_valid(): # 提取错误信息 error_messages = [] for field, errors in serializer.errors.items(): for error in errors: error_messages.append(f"{field}: {error}") error_message = ";".join(error_messages) return error_response(message=error_message, code=status.HTTP_400_BAD_REQUEST) mac_address = serializer.validated_data['mac_address'] nickname = serializer.validated_data.get('nickname', '') is_primary = serializer.validated_data.get('is_primary', False) try: # 获取设备 device = Device.objects.get(mac_address=mac_address) # 检查是否已被当前用户绑定 existing = UserDevice.objects.filter(device=device, user=request.user).first() if existing: return success_response( data=UserDeviceSerializer(existing).data, message='设备已绑定' ) # 检查是否已被其他用户绑定(测试 MAC 跳过此检查) if mac_address != 'AA:BB:CC:DD:EE:FF' and UserDevice.objects.filter(device=device).exists(): return error_response(message='设备已被其他用户绑定', code=status.HTTP_400_BAD_REQUEST) # 激活设备 if not device.is_active: device.is_active = True device.activated_at = timezone.now() device.save() # 创建用户设备关联 user_device = UserDevice( user=request.user, device=device, nickname=nickname, is_primary=is_primary ) user_device.save() # 返回绑定结果 return success_response( data=UserDeviceSerializer(user_device).data, message='设备绑定成功' ) except Device.DoesNotExist: return not_found_response(message='设备不存在') except Exception as e: logger.error(f"Device binding failed: {str(e)}") return error_response( message=f'绑定失败: {str(e)}', code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @swagger_auto_schema( operation_summary="设置主要设备", operation_description="将指定设备设置为用户的主要设备", manual_parameters=[user_device_param], responses={ 200: openapi.Response('设置成功', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=True, methods=['post']) def set_primary(self, request, pk=None): """ 设置主要设备 将指定设备设置为用户的主要设备。 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 """ try: user_device = self.get_object() if user_device.is_primary: return success_response(message='已经是主要设备') # 取消其他主要设备 UserDevice.objects.filter(user=request.user, is_primary=True).update(is_primary=False) # 设置为主要设备 user_device.is_primary = True user_device.save() return success_response(message='设置主要设备成功') except Exception as e: logger.error(f"Failed to set primary device: {str(e)}") return error_response( message=f'设置失败: {str(e)}', code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) class MessageViewSet(viewsets.ViewSet): """ 设备消息交互接口 提供设备消息的发送和处理功能。 支持多种类型的消息,如天气查询、聊天等。 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] @swagger_auto_schema( operation_summary="发送消息到设备", operation_description="向当前用户绑定的设备发送各类消息,支持天气查询、聊天等功能。", request_body=message_send_schema, responses={ 200: openapi.Response('发送成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=False, methods=['post']) def send(self, request): """ 发送消息到设备 向当前用户绑定的设备发送各类消息,支持天气查询、聊天等功能。 参数: - message_type: 消息类型(weather/chat_message/sing/dance/touch/factory_reset等) - message: 消息内容 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: 返回的数据(如天气信息等) """ try: # 使用认证信息中的用户ID user_id = str(request.user.id) message = request.data.get('message') message_type = request.data.get('type', 'text') # 默认为文本类型 if not message: return error_response(message='消息内容不能为空', code=status.HTTP_400_BAD_REQUEST) # 构建组名 group_name = f"device_{user_id}" # 根据消息类型处理不同的消息 if message_type == 'weather': # 从消息中获取城市名 try: message_data = message # message_data = json.loads(message) city = message_data.get('city') if not city: return error_response(message='缺少城市参数', code=status.HTTP_400_BAD_REQUEST) # 初始化天气API qweather = QWeatherAPI("3c2e4e483f294a739f1757e0ebbe7a09") # 搜索城市 cities = qweather.search_city(city, range="cn") if not cities or len(cities) == 0: return error_response(message='未找到匹配的城市', code=status.HTTP_400_BAD_REQUEST) # 获取第一个匹配城市的天气 selected_city = cities[0] weather_data = qweather.get_weather(selected_city['id']) if not weather_data: return error_response( message='获取天气数据失败', code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) # 组合天气和城市数据 weather_info = { 'city': { 'name': selected_city['name'], 'id': selected_city['id'], 'adm1': selected_city['adm1'], 'adm2': selected_city['adm2'] }, 'weather': weather_data } # 发送天气信息到WebSocket channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'weather', 'message': weather_info, 'user_id': user_id } ) return success_response( data=weather_info, message='天气信息已发送' ) except json.JSONDecodeError: return error_response(message='消息格式错误', code=status.HTTP_400_BAD_REQUEST) elif message_type == 'sing': # 处理唱歌消息 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'sing', 'message': message, 'user_id': user_id } ) return success_response(message='唱歌消息已发送') elif message_type == 'dance': # 处理跳舞消息 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'dance', 'message': message, 'user_id': user_id } ) return success_response(message='跳舞消息已发送') elif message_type == 'touch': # 处理触摸消息 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'touch', 'message': message, 'user_id': user_id } ) return success_response(message='触摸消息已发送') elif message_type == 'factory_reset': # 处理恢复出厂设置消息 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'factory_reset', 'message': message, 'user_id': user_id } ) return success_response(message='恢复出厂设置指令已发送') else: # 处理普通文本消息 channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'chat_message', 'message': message, 'user_id': user_id } ) return success_response(message='消息已发送') except json.JSONDecodeError: return error_response(message='无效的JSON数据', code=status.HTTP_400_BAD_REQUEST) except Exception as e: return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) def send_message_to_user(request, user_id): """ 通过 HTTP 接口发送消息到指定用户的 WebSocket """ logger.debug(f"Received {request.method} request for user {user_id}") if request.method == 'POST': try: # 获取消息内容 body = json.loads(request.body) message = body.get('message', '') logger.debug(f"Processing message for user {user_id}: {message}") # 获取 channel layer channel_layer = get_channel_layer() # 构建 group name group_name = f"device_{user_id}" logger.debug(f"Sending message to group: {group_name}") # 发送消息到指定用户的 WebSocket async_to_sync(channel_layer.group_send)( group_name, { 'type': 'forward_message', 'message': message } ) return success_response(message='消息已发送') except json.JSONDecodeError: logger.error('Invalid JSON format in request body') return error_response(message='无效的 JSON 格式', code=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error(f'Error processing message: {str(e)}') return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) return error_response( message='仅支持 POST 请求', code=status.HTTP_405_METHOD_NOT_ALLOWED, status_code=status.HTTP_405_METHOD_NOT_ALLOWED ) class VolcEngineTokenViewSet(viewsets.ViewSet): """ 火山引擎Token生成接口 提供火山引擎实时音视频服务的Token生成功能。 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['device'] @swagger_auto_schema( operation_summary="生成火山引擎Token", operation_description="为当前用户生成火山引擎实时音视频服务的Token", request_body=token_generate_schema, responses={ 200: openapi.Response('生成成功', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=False, methods=['post']) def generate(self, request): """ 生成火山引擎Token 参数: - expire_time: Token过期时间(秒),可选,默认30天 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: - token: 生成的Token - room_id: 生成的房间ID - user_id: 用户ID - app_id: 火山引擎应用ID - expire_time: Token有效期(秒) - task_id: 任务ID """ try: expire_time = request.data.get('expire_time', settings.VOLCENGINE_TOKEN_EXPIRE_TIME) # 使用当前登录用户的ID user_id = str(request.user.id) # 生成房间ID room_id = f"room_{user_id}" # 生成任务ID task_id = str(uuid.uuid4()) # 创建Token实例 token = VolcEngineAccessToken( settings.VOLCENGINE_APP_ID, settings.VOLCENGINE_APP_KEY, room_id, user_id ) # 添加发布和订阅权限 token.add_privilege(PrivPublishStream, int(time.time()) + expire_time) token.add_privilege(PrivSubscribeStream, int(time.time()) + expire_time) # 设置Token过期时间 token.expire_time(int(time.time()) + expire_time) # 生成Token字符串 token_str = token.serialize() # 构建Redis key redis_key = f"rtc_room:{user_id}:{task_id}" # 构建存储的数据 token_data = { 'token': token_str, 'room_id': room_id, 'user_id': user_id, 'app_id': settings.VOLCENGINE_APP_ID, 'expire_time': expire_time, 'task_id': task_id } # 存储到Redis,设置过期时间 cache.set(redis_key, token_data, expire_time) return success_response( data=token_data, message='Token生成成功' ) except Exception as e: return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @swagger_auto_schema( operation_summary="根据MAC地址获取Token", operation_description="通过设备MAC地址获取对应的火山引擎Token,无需鉴权", manual_parameters=[mac_address_param, expire_time_param], responses={ 200: openapi.Response('获取成功', get_standardized_response_schema()), 400: openapi.Response('参数错误', get_standardized_response_schema()), 404: openapi.Response('设备不存在', get_standardized_response_schema()), 500: openapi.Response('服务器错误', get_standardized_response_schema()) } ) @action(detail=False, methods=['get'], authentication_classes=[], permission_classes=[]) def get_by_mac(self, request): """ 根据设备MAC地址获取火山引擎Token 通过设备MAC地址获取对应的火山引擎Token,无需鉴权。 参数: - mac_address: 设备MAC地址(查询参数) - expire_time: Token过期时间(秒),可选,默认30天 返回: - success: 操作成功标志 - code: 状态码 - message: 操作结果信息 - data: - token: 生成的Token - room_id: 生成的房间ID - user_id: 关联的用户ID - app_id: 火山引擎应用ID - expire_time: Token有效期(秒) - task_id: 任务ID - device_code: 设备码 - device_type: 设备类型 """ try: mac_address = request.query_params.get('mac_address') expire_time = int(request.query_params.get('expire_time', settings.VOLCENGINE_TOKEN_EXPIRE_TIME)) if not mac_address: return error_response(message='缺少设备MAC地址', code=status.HTTP_400_BAD_REQUEST) # 查找设备 try: device = Device.objects.get(mac_address=mac_address) except Device.DoesNotExist: return not_found_response(message='设备不存在') # 检查设备是否已激活绑定给用户 user_device = UserDevice.objects.filter(device=device).first() if not user_device: return error_response(message='设备未绑定给任何用户', code=status.HTTP_400_BAD_REQUEST) user_id = str(user_device.user.id) # 生成房间ID room_id = f"room_{user_id}" # 生成任务ID task_id = str(uuid.uuid4()) # 创建Token实例 token = VolcEngineAccessToken( settings.VOLCENGINE_APP_ID, settings.VOLCENGINE_APP_KEY, room_id, user_id ) # 添加发布和订阅权限 token.add_privilege(PrivPublishStream, int(time.time()) + expire_time) token.add_privilege(PrivSubscribeStream, int(time.time()) + expire_time) # 设置Token过期时间 token.expire_time(int(time.time()) + expire_time) # 生成Token字符串 token_str = token.serialize() # 构建Redis key redis_key = f"rtc_room:{user_id}:{task_id}" # 构建存储的数据 token_data = { 'token': token_str, 'room_id': room_id, 'user_id': user_id, 'app_id': settings.VOLCENGINE_APP_ID, 'expire_time': expire_time, 'task_id': task_id, 'device_code': device.device_code, 'device_type': device.device_type.name } # 存储到Redis,设置过期时间 cache.set(redis_key, token_data, expire_time) # 反向索引:task_id -> user_id,用于字幕回调中 bot01 字幕的 user 归属 cache.set(f"rtc_task_user:{task_id}", user_id, expire_time) return success_response( data=token_data, message='Token生成成功' ) except Exception as e: logger.error(f"Failed to get token by MAC address: {str(e)}") return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @action(detail=False, methods=['post'], authentication_classes=[], permission_classes=[]) def conversation_status(self, request): """ 处理火山引擎对话式AI状态回调 接收火山引擎实时对话式AI的状态变化消息,解析后进行相应处理。 https://www.volcengine.com/docs/6348/1415216 参数: - 请求体:二进制数据,包含状态消息 返回: - 处理后的响应数据 """ try: logger.info("-----------------------------------------") # 记录回调日志 logger.info('Conversation AI status callback: %s', request.method) # 永久保留:记录 webhook 上下文(headers + query),用于字幕归属排查 try: logger.info('Webhook context: headers=%s, query=%s', dict(request.headers), dict(request.query_params)) except Exception: pass # 处理消息 try: # 获取原始数据 raw_data = request.body size = len(raw_data) logger.info('Message size: %s', size) # 解析请求JSON try: json_data = json.loads(raw_data) logger.info('JSON data: %s', json_data) # 获取base64编码的消息 base64_message = json_data.get('message') is_binary = json_data.get('binary', False) signature = json_data.get('signature', '') # logger.info('Base64 message: %s', base64_message) # logger.info('Is binary: %s', is_binary) # logger.info('Signature: %s', signature) if not base64_message: logger.error('No message found in request') return error_response(message='No message found in request', code=status.HTTP_400_BAD_REQUEST) # 解码base64消息 import base64 try: decoded_data = base64.b64decode(base64_message) logger.info('Decoded data length: %s', len(decoded_data)) # logger.info('Decoded data: %s', decoded_data[:100]) # 仅打印前100个字节避免日志过大 # 检查magic number if len(decoded_data) >= 4: magic_bytes = decoded_data[:4] magic_hex = ''.join(f'{b:02x}' for b in magic_bytes) logger.info('Magic bytes hex: %s', magic_hex) # 字幕格式 "subv" (0x73756276) if magic_bytes == b'subv': logger.info('Detected subtitle binary format (subv)') # 尝试查找JSON部分的开始位置 # 字幕消息格式: "subv" + 4字节长度 + JSON数据 header_size = 8 if len(decoded_data) > header_size: try: # 获取JSON部分 json_part = decoded_data[header_size:] # 尝试以UTF-8解码JSON部分 json_str = json_part.decode('utf-8') # logger.info('Subtitle JSON content: %s', json_str[:100]) # 仅打印开头部分 # 解析JSON subtitle_data = json.loads(json_str) logger.info('Parsed subtitle data: %s', subtitle_data) # 提取字幕信息 data_list = subtitle_data.get('data', []) subtitle_items = [] for item in data_list: subtitle = { 'definite': item.get('definite', False), 'language': item.get('language', ''), 'mode': item.get('mode', 0), 'paragraph': item.get('paragraph', False), 'sequence': item.get('sequence', 0), 'text': item.get('text', ''), 'timestamp': item.get('timestamp', 0), 'userId': item.get('userId', '') } subtitle_items.append(subtitle) logger.info('Subtitle: %s', subtitle) # === 字幕落库(策略 B:按 paragraph 累积拼接 + AI 归属锁)=== try: # 主路径:尝试从 webhook 上下文提取 task_id(火山如带) webhook_task_id = ( request.query_params.get('task_id') or request.headers.get('X-Volc-Task-Id') or request.headers.get('X-Task-Id') or json_data.get('task_id') ) # 拿 RTC Bot id(带缓存,避免每条字幕查 DB) rtc_bot_id = cache.get('rtc_voice_agent_bot_id') if not rtc_bot_id: rtc_bot_id = Bot.objects.filter(name='RTC_Voice_Agent').values_list('id', flat=True).first() if rtc_bot_id: cache.set('rtc_voice_agent_bot_id', rtc_bot_id, 3600) if not rtc_bot_id: logger.error('RTC_Voice_Agent Bot 未配置,跳过字幕落库') else: MAX_BUFFER_SEGMENTS = 50 # paragraph 始终不来时的兜底强制 flush for item in subtitle_items: text = item.get('text') or '' # 中间识别结果不入库 if not item.get('definite'): continue user_id_in_subtitle = item.get('userId') or '' sequence = item.get('sequence', 0) is_paragraph_end = bool(item.get('paragraph')) # Mode=1 下 paragraph=True 是独立的空文本终止信号,必须放过去触发 flush # 仅当文本空 + 又不是终止信号时才跳过 if not text.strip() and not is_paragraph_end: continue # 解析 ParadiseUser 归属 paradise_user_id = None if user_id_in_subtitle == 'bot01': # AI 字幕:先看"当前 AI 回合归属锁" paradise_user_id = cache.get('rtc_current_bot_owner') if not paradise_user_id: # 没锁就用主路径/兜底解析,并加锁 if webhook_task_id: paradise_user_id = cache.get(f"rtc_task_user:{webhook_task_id}") if not paradise_user_id: paradise_user_id = cache.get('rtc_last_active_user') if paradise_user_id: # 续期归属锁,确保整段 AI 回复内一致(无论中间多长) cache.set('rtc_current_bot_owner', str(paradise_user_id), 300) sender = ChatMessage.SENDER_BOT elif user_id_in_subtitle.isdigit(): paradise_user_id = user_id_in_subtitle sender = ChatMessage.SENDER_USER # 用户字幕到达,刷新最近活跃用户(给后续 bot01 兜底) cache.set('rtc_last_active_user', user_id_in_subtitle, 60) else: logger.warning('字幕 userId %r 无法识别,跳过', user_id_in_subtitle) continue if not paradise_user_id: logger.warning('字幕无法归属用户: userId=%s task_id=%s sequence=%s', user_id_in_subtitle, webhook_task_id, sequence) continue # 累积到 buffer(仅在文本非空时;空文本是 Mode=1 的 paragraph 终止信号,不入 buffer 但要触发 flush) buffer_key = f"rtc_subv_buffer:{sender}:{paradise_user_id}" buf = cache.get(buffer_key) or [] if text.strip(): buf.append({'seq': sequence, 'text': text}) force_flush = len(buf) > MAX_BUFFER_SEGMENTS if force_flush: logger.warning('字幕 buffer 超过 %d 段仍未见 paragraph 边界,强制落库 user=%s sender=%s', MAX_BUFFER_SEGMENTS, paradise_user_id, sender) if is_paragraph_end or force_flush: # 防重:同一 (sender, paradise_user_id, last_sequence) 只落一次 dedup_key = f"rtc_subv_flushed:{sender}:{paradise_user_id}:{sequence}" if cache.get(dedup_key): cache.delete(buffer_key) continue cache.set(dedup_key, 1, 300) # 排序拼接(自适应累积/增量) buf_sorted = sorted(buf, key=lambda x: x.get('seq', 0)) full_text = _merge_subv_segments(buf_sorted).strip() if full_text: try: ChatMessage.objects.create( user_id=int(paradise_user_id), bot_id=rtc_bot_id, message=full_text[:2048], sender=sender, message_type=ChatMessage.MESSAGE_TYPE_TEXT, ) logger.info('字幕落库成功: sender=%s user=%s len=%d segs=%d', sender, paradise_user_id, len(full_text), len(buf_sorted)) except Exception as e: logger.error('字幕落库失败: %s, sender=%s, text=%r', e, sender, full_text[:100]) cache.delete(buffer_key) else: # 还在回合中,刷新 buffer TTL cache.set(buffer_key, buf, 300) except Exception as e: logger.error('字幕落库流程异常: %s', e) # === 字幕落库结束 === # 构建响应数据 response_data = { 'type': 'subtitle', 'subtitles': subtitle_items, 'signature': signature } return success_response(data=response_data, message='Subtitle received successfully') except UnicodeDecodeError as e: logger.error('Failed to decode subtitle JSON: %s', str(e)) except json.JSONDecodeError as e: logger.error('Failed to parse subtitle JSON: %s', str(e)) # 状态消息格式 "conv" (0x636F6E76) elif magic_bytes == b'conv': logger.info('Detected conversation status binary format (conv)') # 状态消息格式: "conv" + 4字节长度 + JSON数据 header_size = 8 if len(decoded_data) > header_size: # 获取长度 length_bytes = decoded_data[4:8] length = int.from_bytes(length_bytes, byteorder='big') # logger.info('Conversation status length from binary: %d bytes', length) if len(decoded_data) - header_size >= length: try: # 获取JSON部分 json_part = decoded_data[header_size:header_size + length] # 尝试以UTF-8解码JSON部分 json_str = json_part.decode('utf-8') # logger.info('Status JSON content: %s', json_str[:100]) # 仅打印开头部分 # 解析JSON status_json = json.loads(json_str) # 提取状态信息 task_id = status_json.get('TaskId') user_id = status_json.get('UserID') round_id = status_json.get('RoundID') event_time = status_json.get('EventTime') stage = status_json.get('Stage', {}) stage_code = stage.get('Code') stage_description = stage.get('Description') # 记录状态信息 logger.info('Conversation status: TaskId=%s, UserID=%s, RoundID=%s, StageCode=%s, Description=%s', task_id, user_id, round_id, stage_code, stage_description) # 维护 task->user 反向索引和最近活跃用户(用于 bot01 字幕归属) if task_id and user_id: cache.set(f"rtc_task_user:{task_id}", str(user_id), 3600) if user_id: cache.set('rtc_last_active_user', str(user_id), 60) # 根据不同的状态码进行不同的处理 response_data = { 'task_id': task_id, 'user_id': user_id, 'round_id': round_id, 'event_time': event_time, 'stage_code': stage_code, 'stage_description': stage_description, 'status': 'processed', 'signature': signature } # 如果需要,可以发送WebSocket消息通知前端 if user_id: try: # 构建组名 group_name = f"device_{user_id}" # 发送消息到WebSocket channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'conversation_status', 'message': response_data } ) logger.info('Status message sent via WebSocket to user: %s', user_id) except Exception as e: logger.error('WebSocket send failed: %s', str(e)) return success_response(data=response_data, message='Status callback processed successfully') except UnicodeDecodeError as e: logger.error('Failed to decode status JSON: %s', str(e)) except json.JSONDecodeError as e: logger.error('Failed to parse status JSON: %s', str(e)) else: logger.error('Invalid status message length: header=%d, expected=%d, actual=%d', header_size, length, len(decoded_data) - header_size) else: logger.warning('Unknown binary format: %s', magic_bytes) # 将解码后的数据作为原始数据返回 return success_response( data={ 'binary_data_size': len(decoded_data), 'magic': magic_hex, 'format': 'unknown' }, message='Received binary data with unknown format' ) else: logger.error('Decoded data too short') return error_response(message='Decoded data too short', code=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error('Base64 decoding failed: %s', str(e)) return error_response(message=f'Base64 decoding failed: {str(e)}', code=status.HTTP_400_BAD_REQUEST) except json.JSONDecodeError as e: logger.error('JSON parsing failed: %s', str(e)) return error_response(message='Invalid JSON format', code=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error('Message processing failed: %s', str(e)) return error_response( message=f'Message processing failed: {str(e)}', code=status.HTTP_400_BAD_REQUEST ) except Exception as e: logger.error('Status callback processing failed: %s', str(e)) return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR ) @action(detail=False, methods=['post'], authentication_classes=[], permission_classes=[]) def callback(self, request): """ 处理RTC回调 参数: - Message: 包含函数调用信息的JSON字符串 - Signature: 用户ID和任务ID的组合,格式为 "user_id:task_id" - Type: 回调类型 返回: - 处理后的响应数据 """ try: # 记录回调日志 logger.info('RTC callback request: %s', request.method) # 处理请求体 try: # 尝试获取原始请求体 body = request.body.decode('utf-8') logger.info('Request body: %s', body) # 尝试解析JSON try: json_data = json.loads(body) except json.JSONDecodeError as e: logger.error('JSON parsing failed: %s', str(e)) return error_response(message='Invalid JSON format', code=status.HTTP_400_BAD_REQUEST) # 从请求体中提取必要信息 message_str = json_data.get('Message', '[]') sign = json_data.get('Signature') callback_type = json_data.get('Type') # 解析Message try: msg_list = json.loads(message_str) if not isinstance(msg_list, list) or len(msg_list) == 0: raise ValueError('Invalid message format') msg = msg_list[0] except (json.JSONDecodeError, ValueError) as e: logger.error('Message parsing failed: %s', str(e)) return error_response(message='Invalid message format', code=status.HTTP_400_BAD_REQUEST) # 提取函数调用信息 call_id = msg.get('id') function = msg.get('function', {}) function_name = function.get('name') arguments_str = function.get('arguments', '{}') # 解析函数参数 try: arguments = json.loads(arguments_str) location = arguments.get('location') if function_name == 'get_nearby': target_type = arguments.get('target_type') if function_name == 'flow_light': switch_type = arguments.get('switch_type') except json.JSONDecodeError as e: logger.error('Function argument parsing failed: %s', str(e)) return error_response(message='Invalid function arguments', code=status.HTTP_400_BAD_REQUEST) # 从sign中解析user_id和task_id if not sign or ':' not in sign: logger.error('Invalid signature format: %s', sign) return error_response(message='Invalid signature format', code=status.HTTP_400_BAD_REQUEST) user_id, task_id = sign.split(':') # 构建Redis key redis_key = f"rtc_room:{user_id}:{task_id}" # 从Redis获取房间信息 room_info = cache.get(redis_key) if not room_info: logger.error('Room info not found: %s', redis_key) return not_found_response(message='Room info not found') # 处理天气请求 weather_content = "" if function_name == 'get_weather' and location: logger.info('Processing weather request: %s', location) # 初始化天气API qweather = QWeatherAPI("3c2e4e483f294a739f1757e0ebbe7a09") # 搜索城市 cities = qweather.search_city(location, range="cn") if cities and len(cities) > 0: # 获取第一个匹配城市的天气 selected_city = cities[0] weather_data = qweather.get_weather(selected_city['id']) if weather_data: # 构建详细的天气内容 now = weather_data['now'] weather_content = ( f"{selected_city['name']}({selected_city['adm1']})的天气信息:\n" f"天气状况:{now['text']}\n" f"当前温度:{now['temp']}℃\n" f"体感温度:{now['feelsLike']}℃\n" f"相对湿度:{now['humidity']}%\n" f"风向:{now['windDir']}\n" f"风力等级:{now['windScale']}级\n" f"风速:{now['windSpeed']}km/h\n" f"降水量:{now['precip']}mm\n" f"气压:{now['pressure']}hPa\n" f"能见度:{now['vis']}km\n" f"更新时间:{weather_data['updateTime']}" ) elif function_name == 'get_nearby': logger.info('Processing nearby info request: %s, %s', location, target_type) if not target_type: target_type = '美食' nearby_info = search_nearby(location, target_type) # 组装响应数据 response_data = { "AppId": room_info['app_id'], "RoomId": room_info['room_id'], "TaskId": room_info['task_id'], "Command": "function", "Message": json.dumps({ "ToolCallID": call_id, "Content": nearby_info }) } # 调用火山引擎UpdateVoiceChat API try: volcengine_response = update_voice_chat( app_id=room_info['app_id'], room_id=room_info['room_id'], task_id=room_info['task_id'], call_id=call_id, content=nearby_info ) response_data['volcengine_response'] = volcengine_response except Exception as e: logger.error('Volcengine API call failed: %s', str(e)) response_data['volcengine_error'] = str(e) return success_response(data=response_data) elif function_name == 'dance': logger.info('Processing dance info request') # 组装响应数据 response_data = { "AppId": room_info['app_id'], "RoomId": room_info['room_id'], "TaskId": room_info['task_id'], "Command": "function", "Message": json.dumps({ "ToolCallID": call_id, "Content": "洛天依开始跳舞啦~" }) } # 构建组名 group_name = f"device_{user_id}" # 发送天气信息到WebSocket channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'dance', 'message': "洛天依开始跳舞啦~", 'user_id': user_id } ) # 调用火山引擎UpdateVoiceChat API try: volcengine_response = update_voice_chat( app_id=room_info['app_id'], room_id=room_info['room_id'], task_id=room_info['task_id'], call_id=call_id, content="洛天依开始跳舞啦~" ) response_data['volcengine_response'] = volcengine_response except Exception as e: logger.error('Volcengine API call failed: %s', str(e)) response_data['volcengine_error'] = str(e) return success_response(data=response_data) elif function_name == 'flow_light': logger.info('Processing flow_light info request') if not switch_type: switch_type = 'on' # 组装响应数据 response_data = { "AppId": room_info['app_id'], "RoomId": room_info['room_id'], "TaskId": room_info['task_id'], "Command": "function", "Message": json.dumps({ "ToolCallID": call_id, "Content": f"{switch_type}" }) } # 构建组名 group_name = f"device_{user_id}" # 发送天气信息到WebSocket channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type': 'flow_light', 'message': f"{switch_type}", 'user_id': user_id } ) # 调用火山引擎UpdateVoiceChat API try: volcengine_response = update_voice_chat( app_id=room_info['app_id'], room_id=room_info['room_id'], task_id=room_info['task_id'], call_id=call_id, content=f"{switch_type}" ) response_data['volcengine_response'] = volcengine_response except Exception as e: logger.error('Volcengine API call failed: %s', str(e)) response_data['volcengine_error'] = str(e) return success_response(data=response_data) # 组装响应数据 response_data = { "AppId": room_info['app_id'], "RoomId": room_info['room_id'], "TaskId": room_info['task_id'], "Command": "function", "Message": json.dumps({ "ToolCallID": call_id, "Content": weather_content if weather_content else location }) } # 调用火山引擎UpdateVoiceChat API try: volcengine_response = update_voice_chat( app_id=room_info['app_id'], room_id=room_info['room_id'], task_id=room_info['task_id'], call_id=call_id, content=weather_content if weather_content else location ) response_data['volcengine_response'] = volcengine_response except Exception as e: logger.error('Volcengine API call failed: %s', str(e)) response_data['volcengine_error'] = str(e) return success_response(data=response_data) except Exception as e: logger.error('Request processing failed: %s', str(e)) return error_response( message=f'Error processing request: {str(e)}', code=status.HTTP_400_BAD_REQUEST ) except Exception as e: logger.error('Callback processing failed: %s', str(e)) return error_response( message=str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR )