lty/qy_lty/aiapp/views.py
pmc bd95ba470c feat: update admin panel, API modules, and add migrations
- Update food, outfits, props, home-decor pages and components
- Add permissions page and sidebar updates
- Update API client and all API modules (auth, food, dances, etc.)
- Add card model migrations for optional fields
- Update Django views, serializers, and authentication
- Add affinity level migrations and user app updates
- Add project documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 13:06:50 +08:00

430 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, viewsets, permissions
from django.contrib.auth.models import User
from .models import ChatMessage, Bot
from userapp.models import ParadiseUser
from .serializers import ChatMessageSerializer
from rest_framework.permissions import IsAuthenticated
from userapp.authentication import RedisTokenAuthentication
from rest_framework import serializers
from common.swagger_utils import swagger_schema
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
import logging
import requests
import re
logger = logging.getLogger(__name__)
class BotSerializer(serializers.ModelSerializer):
class Meta:
model = Bot
fields = ['id', 'name', 'description']
class IsAdminOrReadOnly(permissions.BasePermission):
def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return request.user and request.user.is_authenticated
return request.user and request.user.is_staff
class BotViewSet(viewsets.ModelViewSet):
"""
机器人(AI模型)管理接口
管理员可增删改查,普通用户只读
"""
queryset = Bot.objects.all()
serializer_class = BotSerializer
permission_classes = [IsAdminOrReadOnly]
authentication_classes = [RedisTokenAuthentication]
from .kimi import KIMI
from .audio.AudioService import get_audio_service
# Swagger Schema 定义
class ChatRequestSchema(serializers.Serializer):
message = serializers.CharField(required=True, help_text="用户发送的文本消息内容")
class ChatMessageDetailSchema(serializers.Serializer):
id = serializers.IntegerField(help_text="消息ID")
user = serializers.IntegerField(help_text="用户ID")
bot = serializers.IntegerField(help_text="机器人ID")
message = serializers.CharField(help_text="消息内容")
sender = serializers.CharField(help_text="发送者user或bot")
message_type = serializers.CharField(help_text="消息类型")
message_audio_url = serializers.URLField(help_text="音频链接", allow_null=True, required=False)
message_video_url = serializers.URLField(help_text="视频链接", allow_null=True, required=False)
created_at = serializers.DateTimeField(help_text="创建时间")
class ChatResponseSchema(serializers.Serializer):
user_message = ChatMessageDetailSchema(help_text="用户发送的消息详情")
bot_reply = ChatMessageDetailSchema(help_text="机器人的回复消息详情")
class MultiChatRequestSchema(serializers.Serializer):
botId = serializers.IntegerField(required=True, help_text="机器人ID")
message = serializers.CharField(required=True, help_text="用户发送的消息内容")
messageType = serializers.ChoiceField(
choices=[ChatMessage.MESSAGE_TYPE_TEXT, ChatMessage.MESSAGE_TYPE_AUDIO, ChatMessage.MESSAGE_TYPE_VIDEO],
help_text="消息类型(text, audio, video)"
)
messageAudioUrl = serializers.URLField(required=False, help_text="音频消息的URL(当messageType为audio时)")
messageAudioBase64 = serializers.CharField(required=False, help_text="Base64编码的音频数据(当messageType为audio时)")
messageVideoUrl = serializers.URLField(required=False, help_text="视频消息的URL(当messageType为video时)")
messageVideoBase64 = serializers.CharField(required=False, help_text="Base64编码的视频数据(当messageType为video时)")
returnAudioAsBase64 = serializers.BooleanField(required=False, default=False, help_text="是否返回Base64编码的音频而不是URL")
class ChatBotAPIView(APIView):
"""
AI 聊天机器人接口
提供与 AI 聊天机器人的单轮对话功能。
支持发送文本消息并获取 AI 的回复。
"""
authentication_classes = [RedisTokenAuthentication] # 使用自定义的认证类
permission_classes = [IsAuthenticated] # 仅允许已认证用户访问
tags = ['ai-chat']
@swagger_schema(
request_schema=ChatRequestSchema,
responses={
201: openapi.Response('对话成功', ChatResponseSchema),
404: openapi.Response('机器人不存在', openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息')
}
))
},
operation_description="与AI机器人进行单轮对话发送文本消息并获取回复",
tags=['AI聊天'],
security=[{'Bearer': []}],
manual_parameters=[
openapi.Parameter(
'bot_id',
openapi.IN_PATH,
description="AI机器人的唯一标识",
type=openapi.TYPE_INTEGER,
required=True
)
]
)
def post(self, request, bot_id):
"""
发送消息给 AI 聊天机器人
接收用户发送的文本消息,并返回 AI 机器人的回复。
"""
self.user = request.user
try:
self.bot = Bot.objects.get(id=bot_id)
except Exception as _:
return Response(
{"error": "Bot not found"}, status=status.HTTP_404_NOT_FOUND
)
# Check is there a message for the current user and bot,if not, create a initial message
handle_first_access(self.user, self.bot)
# create a new message for the current request
message_type = ChatMessage.MESSAGE_TYPE_TEXT
user_message = request.data.get("message")
message_audio_url = ''
message_video_url = ''
user_chat_message = create_user_message(self.user, self.bot, message_type, user_message, message_audio_url, message_video_url)
# prepare the history
history = list(map(lambda message: {'role':message.sender, 'content':message.message},
ChatMessage.objects.filter(bot=self.bot,user=self.user).all()))
bot_chat_message = ask_kimi(self.user, self.bot, history)
# serialize the messages
user_message_serializer = ChatMessageSerializer(user_chat_message)
bot_message_serializer = ChatMessageSerializer(bot_chat_message)
logger.info(bot_chat_message)
return Response(
{
"user_message": user_message_serializer.data,
"bot_reply": bot_message_serializer.data,
},
status=status.HTTP_201_CREATED,
)
class MultiChatAPIView(APIView):
"""
AI 多轮对话接口
提供与 AI 的多轮对话功能。
支持上下文关联的对话AI 能够记住对话历史。
"""
authentication_classes = [RedisTokenAuthentication] # 使用自定义的认证类
permission_classes = [IsAuthenticated] # 仅允许已认证用户访问
tags = ['ai-multichat']
@swagger_schema(
request_schema=MultiChatRequestSchema,
responses={
201: openapi.Response('对话成功', ChatMessageDetailSchema),
400: openapi.Response('请求参数错误', openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息')
}
)),
404: openapi.Response('机器人不存在', openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息')
}
))
},
operation_description="与AI进行多轮对话支持文本、音频和视频消息AI能记住对话历史",
tags=['AI聊天'],
security=[{'Bearer': []}]
)
def post(self, request):
"""
发起多轮对话
支持与 AI 进行多轮对话AI 会记住对话上下文。
"""
self.user = request.user
bot_id = request.data.get("botId")
if bot_id is None:
logger.warning("Bot id is missing in request")
return Response(
{"error": "Bot id is required"}
)
try:
self.bot = Bot.objects.get(id=bot_id)
logger.info(f"Found bot with id {bot_id}")
except Exception as e:
logger.error(f"Bot not found: {str(e)}")
return Response(
{"error": "Bot not found"}, status=status.HTTP_404_NOT_FOUND
)
# Check is there a message for the current user and bot,if not, create a initial message
handle_first_access(self.user, self.bot)
# create a new message for the current request
message_type = request.data.get("messageType")
user_message = request.data.get("message")
message_audio_url = request.data.get("messageAudioUrl")
message_audio_base64 = request.data.get("messageAudioBase64")
message_video_url = request.data.get("messageVideoUrl")
message_video_base64 = request.data.get("messageVideoBase64")
return_audio_as_base64 = request.data.get("returnAudioAsBase64", False)
logger.info(f"Processing message of type {message_type} from user {self.user.id}")
user_chat_message = create_user_message(
self.user,
self.bot,
message_type,
user_message,
message_audio_url,
message_video_url,
message_audio_base64,
message_video_base64
)
# prepare the history
history = list(map(lambda message: {'role':message.sender, 'content':message.message},
ChatMessage.objects.filter(bot=self.bot,user=self.user).all()))
bot_chat_message = ask_kimi(self.user, self.bot, history)
# 对于语音请求,合成语音返回
if message_type == ChatMessage.MESSAGE_TYPE_AUDIO:
audio_ser = get_audio_service()
logger.info(f"Processing audio message: {bot_chat_message.message}")
if return_audio_as_base64:
# 生成音频并返回Base64编码
audio_data = audio_ser.synthesize_speech_raw(bot_chat_message.message)
import base64
response_message_audio_base64 = base64.b64encode(audio_data).decode('utf-8')
bot_chat_message.message_audio_url = None # 不使用URL
bot_chat_message.message_type = ChatMessage.MESSAGE_TYPE_AUDIO
bot_chat_message.save()
logger.info("Generated audio as Base64 data")
else:
# 生成音频URL并返回
response_message_audio_url = audio_ser.synthesize_speech(bot_chat_message.message)
logger.info(f"Generated audio URL: {response_message_audio_url}")
bot_chat_message.message_audio_url = response_message_audio_url
bot_chat_message.message_type = ChatMessage.MESSAGE_TYPE_AUDIO
bot_chat_message.save()
logger.info(f"Bot response: {bot_chat_message.message}")
# 构建响应数据
response_data = {
"text": bot_chat_message.message
}
# 添加用户的语音URL和识别的文字如果是语音消息
if message_type == ChatMessage.MESSAGE_TYPE_AUDIO:
response_data["user_message"] = user_chat_message.message
if message_audio_url:
response_data["user_audio_url"] = message_audio_url
elif message_audio_base64:
response_data["user_audio_base64"] = message_audio_base64
# 如果有音频URL添加到响应中
if bot_chat_message.message_audio_url:
response_data["audio_url"] = bot_chat_message.message_audio_url
# 如果有返回Base64编码的音频数据
if return_audio_as_base64 and message_type == ChatMessage.MESSAGE_TYPE_AUDIO:
response_data["audio_base64"] = response_message_audio_base64
# 如果有视频URL添加到响应中
if bot_chat_message.message_video_url:
response_data["video_url"] = bot_chat_message.message_video_url
return Response(
response_data,
status=status.HTTP_201_CREATED,
)
def ask_kimi(user: ParadiseUser, bot: Bot, history: list) -> ChatMessage:
response = KIMI(history).get_response()
# 移除开头和结尾的括号内容
# 移除开头的中英文括号内容
response = re.sub(r'^[\(][^)]*[\)][\s]*', '', response)
# 移除结尾的中英文括号内容
response = re.sub(r'[\s]*[\(][^)]*[\)]$', '', response)
chat_message = ChatMessage.objects.create(
user=user,
bot=bot,
message=response,
sender=ChatMessage.SENDER_BOT,
)
chat_message.save()
return chat_message
def handle_first_access(user: ParadiseUser, bot: Bot):
if not ChatMessage.objects.filter(user=user, bot=bot).exists():
ChatMessage.objects.create(
user=user,
bot=bot,
message=bot.description,
sender=ChatMessage.SENDER_SYSTEM,
message_type=ChatMessage.MESSAGE_TYPE_TEXT
).save()
def create_user_message(user: ParadiseUser, bot: Bot, message_type: str, user_message: str, message_audio_url: str, message_video_url: str, message_audio_base64: str = None, message_video_base64: str = None) -> ChatMessage:
if message_type == ChatMessage.MESSAGE_TYPE_TEXT:
user_chat_message = ChatMessage.objects.create(
user=user,
bot=bot,
message=user_message,
sender=ChatMessage.SENDER_USER,
message_type=ChatMessage.MESSAGE_TYPE_TEXT
)
user_chat_message.save()
logger.info(f"Created text message for user {user.id} and bot {bot.id}")
return user_chat_message
if message_type == ChatMessage.MESSAGE_TYPE_AUDIO:
# 从 URL 获取音频文件内容或处理 Base64 音频数据
audio_data = None
audio_service = get_audio_service()
# 检查是否提供了音频URL
if message_audio_url:
logger.info(f"Processing audio message from URL: {message_audio_url}")
response = requests.get(message_audio_url)
if response.status_code != 200:
logger.error(f"Failed to download audio file: {response.status_code}")
raise Exception("无法下载音频文件")
audio_data = response.content
elif message_audio_base64:
# 处理Base64编码的音频数据
logger.info("Processing audio message from Base64 data")
import base64
try:
audio_data = base64.b64decode(message_audio_base64)
except Exception as e:
logger.error(f"Failed to decode base64 audio data: {str(e)}")
raise Exception("无法解码Base64音频数据")
if not audio_data:
logger.error("No audio data provided (neither URL nor Base64)")
raise Exception("未提供音频数据")
message = audio_service.recognize_speech(audio_data)
logger.info(f"Recognized speech message: {message}")
user_chat_message = ChatMessage.objects.create(
user=user,
bot=bot,
message=message,
sender=ChatMessage.SENDER_USER,
message_type=ChatMessage.MESSAGE_TYPE_AUDIO
)
if message_audio_url:
user_chat_message.message_audio_url = message_audio_url
user_chat_message.save()
logger.info(f"Created audio message for user {user.id} and bot {bot.id}")
return user_chat_message
if message_type == ChatMessage.MESSAGE_TYPE_VIDEO:
# 处理视频消息
# 目前仅保存视频URL或转换Base64数据为文件URL的逻辑
video_url = message_video_url
# 处理Base64视频数据
if not video_url and message_video_base64:
logger.info("Processing video message from Base64 data")
import base64
try:
# 这里需要将Base64解码并保存为文件然后返回URL
# 以下为示例代码,实际实现需要根据项目需求调整
import os
import uuid
from django.conf import settings
# 创建保存视频的目录
video_dir = os.path.join(settings.MEDIA_ROOT, 'video')
os.makedirs(video_dir, exist_ok=True)
# 生成唯一文件名
video_filename = f"{uuid.uuid4()}.mp4"
video_path = os.path.join(video_dir, video_filename)
# 解码并保存文件
video_data = base64.b64decode(message_video_base64)
with open(video_path, 'wb') as f:
f.write(video_data)
# 生成URL
video_url = f"{settings.MEDIA_URL}video/{video_filename}"
logger.info(f"Generated video URL from Base64: {video_url}")
except Exception as e:
logger.error(f"Failed to process base64 video data: {str(e)}")
raise Exception("无法处理Base64视频数据")
if not video_url:
logger.error("No video URL or data provided")
raise Exception("未提供视频数据")
user_chat_message = ChatMessage.objects.create(
user=user,
bot=bot,
message=user_message, # 可能需要从视频中提取文本
sender=ChatMessage.SENDER_USER,
message_type=ChatMessage.MESSAGE_TYPE_VIDEO,
message_video_url=video_url
)
user_chat_message.save()
logger.info(f"Created video message for user {user.id} and bot {bot.id}")
return user_chat_message
return None