from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status, viewsets, permissions from django.contrib.auth.models import User from .models import ChatMessage, Bot, CredentialSlot from userapp.models import ParadiseUser from .serializers import ChatMessageSerializer, CredentialSlotSerializer from rest_framework.permissions import IsAuthenticated from userapp.authentication import RedisTokenAuthentication from rest_framework import serializers from common.swagger_utils import swagger_schema, get_standardized_response_schema from common.responses import success_response, created_response, error_response from common.utils import mask_token from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema import logging import requests import re logger = logging.getLogger(__name__) class BotSerializer(serializers.ModelSerializer): class Meta: model = Bot fields = ['id', 'name', 'description'] class IsAdminOrReadOnly(permissions.BasePermission): def has_permission(self, request, view): if request.method in permissions.SAFE_METHODS: return request.user and request.user.is_authenticated return request.user and request.user.is_staff class BotViewSet(viewsets.ModelViewSet): """ 机器人(AI模型)管理接口 管理员可增删改查,普通用户只读 """ queryset = Bot.objects.all() serializer_class = BotSerializer permission_classes = [IsAdminOrReadOnly] authentication_classes = [RedisTokenAuthentication] from .kimi import KIMI from .audio.AudioService import get_audio_service # Swagger Schema 定义 class ChatRequestSchema(serializers.Serializer): message = serializers.CharField(required=True, help_text="用户发送的文本消息内容") class ChatMessageDetailSchema(serializers.Serializer): id = serializers.IntegerField(help_text="消息ID") user = serializers.IntegerField(help_text="用户ID") bot = serializers.IntegerField(help_text="机器人ID") message = serializers.CharField(help_text="消息内容") sender = serializers.CharField(help_text="发送者,user或bot") message_type = serializers.CharField(help_text="消息类型") message_audio_url = serializers.URLField(help_text="音频链接", allow_null=True, required=False) message_video_url = serializers.URLField(help_text="视频链接", allow_null=True, required=False) created_at = serializers.DateTimeField(help_text="创建时间") class ChatResponseSchema(serializers.Serializer): user_message = ChatMessageDetailSchema(help_text="用户发送的消息详情") bot_reply = ChatMessageDetailSchema(help_text="机器人的回复消息详情") class MultiChatRequestSchema(serializers.Serializer): botId = serializers.IntegerField(required=True, help_text="机器人ID") message = serializers.CharField(required=True, help_text="用户发送的消息内容") messageType = serializers.ChoiceField( choices=[ChatMessage.MESSAGE_TYPE_TEXT, ChatMessage.MESSAGE_TYPE_AUDIO, ChatMessage.MESSAGE_TYPE_VIDEO], help_text="消息类型(text, audio, video)" ) messageAudioUrl = serializers.URLField(required=False, help_text="音频消息的URL(当messageType为audio时)") messageAudioBase64 = serializers.CharField(required=False, help_text="Base64编码的音频数据(当messageType为audio时)") messageVideoUrl = serializers.URLField(required=False, help_text="视频消息的URL(当messageType为video时)") messageVideoBase64 = serializers.CharField(required=False, help_text="Base64编码的视频数据(当messageType为video时)") returnAudioAsBase64 = serializers.BooleanField(required=False, default=False, help_text="是否返回Base64编码的音频而不是URL") class ChatBotAPIView(APIView): """ AI 聊天机器人接口 提供与 AI 聊天机器人的单轮对话功能。 支持发送文本消息并获取 AI 的回复。 """ authentication_classes = [RedisTokenAuthentication] # 使用自定义的认证类 permission_classes = [IsAuthenticated] # 仅允许已认证用户访问 tags = ['ai-chat'] @swagger_schema( request_schema=ChatRequestSchema, responses={ 201: openapi.Response('对话成功', ChatResponseSchema), 404: openapi.Response('机器人不存在', openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息') } )) }, operation_description="与AI机器人进行单轮对话,发送文本消息并获取回复", tags=['AI聊天'], security=[{'Bearer': []}], manual_parameters=[ openapi.Parameter( 'bot_id', openapi.IN_PATH, description="AI机器人的唯一标识", type=openapi.TYPE_INTEGER, required=True ) ] ) def post(self, request, bot_id): """ 发送消息给 AI 聊天机器人 接收用户发送的文本消息,并返回 AI 机器人的回复。 """ self.user = request.user try: self.bot = Bot.objects.get(id=bot_id) except Exception as _: return Response( {"error": "Bot not found"}, status=status.HTTP_404_NOT_FOUND ) # Check is there a message for the current user and bot,if not, create a initial message handle_first_access(self.user, self.bot) # create a new message for the current request message_type = ChatMessage.MESSAGE_TYPE_TEXT user_message = request.data.get("message") message_audio_url = '' message_video_url = '' user_chat_message = create_user_message(self.user, self.bot, message_type, user_message, message_audio_url, message_video_url) # prepare the history history = list(map(lambda message: {'role':message.sender, 'content':message.message}, ChatMessage.objects.filter(bot=self.bot,user=self.user).all())) bot_chat_message = ask_kimi(self.user, self.bot, history) # serialize the messages user_message_serializer = ChatMessageSerializer(user_chat_message) bot_message_serializer = ChatMessageSerializer(bot_chat_message) logger.info(bot_chat_message) return Response( { "user_message": user_message_serializer.data, "bot_reply": bot_message_serializer.data, }, status=status.HTTP_201_CREATED, ) class MultiChatAPIView(APIView): """ AI 多轮对话接口 提供与 AI 的多轮对话功能。 支持上下文关联的对话,AI 能够记住对话历史。 """ authentication_classes = [RedisTokenAuthentication] # 使用自定义的认证类 permission_classes = [IsAuthenticated] # 仅允许已认证用户访问 tags = ['ai-multichat'] @swagger_schema( request_schema=MultiChatRequestSchema, responses={ 201: openapi.Response('对话成功', ChatMessageDetailSchema), 400: openapi.Response('请求参数错误', openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息') } )), 404: openapi.Response('机器人不存在', openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'error': openapi.Schema(type=openapi.TYPE_STRING, description='错误信息') } )) }, operation_description="与AI进行多轮对话,支持文本、音频和视频消息,AI能记住对话历史", tags=['AI聊天'], security=[{'Bearer': []}] ) def post(self, request): """ 发起多轮对话 支持与 AI 进行多轮对话,AI 会记住对话上下文。 """ self.user = request.user bot_id = request.data.get("botId") if bot_id is None: logger.warning("Bot id is missing in request") return Response( {"error": "Bot id is required"} ) try: self.bot = Bot.objects.get(id=bot_id) logger.info(f"Found bot with id {bot_id}") except Exception as e: logger.error(f"Bot not found: {str(e)}") return Response( {"error": "Bot not found"}, status=status.HTTP_404_NOT_FOUND ) # Check is there a message for the current user and bot,if not, create a initial message handle_first_access(self.user, self.bot) # create a new message for the current request message_type = request.data.get("messageType") user_message = request.data.get("message") message_audio_url = request.data.get("messageAudioUrl") message_audio_base64 = request.data.get("messageAudioBase64") message_video_url = request.data.get("messageVideoUrl") message_video_base64 = request.data.get("messageVideoBase64") return_audio_as_base64 = request.data.get("returnAudioAsBase64", False) logger.info(f"Processing message of type {message_type} from user {self.user.id}") user_chat_message = create_user_message( self.user, self.bot, message_type, user_message, message_audio_url, message_video_url, message_audio_base64, message_video_base64 ) # prepare the history history = list(map(lambda message: {'role':message.sender, 'content':message.message}, ChatMessage.objects.filter(bot=self.bot,user=self.user).all())) bot_chat_message = ask_kimi(self.user, self.bot, history) # 对于语音请求,合成语音返回 if message_type == ChatMessage.MESSAGE_TYPE_AUDIO: audio_ser = get_audio_service() logger.info(f"Processing audio message: {bot_chat_message.message}") if return_audio_as_base64: # 生成音频并返回Base64编码 audio_data = audio_ser.synthesize_speech_raw(bot_chat_message.message) import base64 response_message_audio_base64 = base64.b64encode(audio_data).decode('utf-8') bot_chat_message.message_audio_url = None # 不使用URL bot_chat_message.message_type = ChatMessage.MESSAGE_TYPE_AUDIO bot_chat_message.save() logger.info("Generated audio as Base64 data") else: # 生成音频URL并返回 response_message_audio_url = audio_ser.synthesize_speech(bot_chat_message.message) logger.info(f"Generated audio URL: {response_message_audio_url}") bot_chat_message.message_audio_url = response_message_audio_url bot_chat_message.message_type = ChatMessage.MESSAGE_TYPE_AUDIO bot_chat_message.save() logger.info(f"Bot response: {bot_chat_message.message}") # 构建响应数据 response_data = { "text": bot_chat_message.message } # 添加用户的语音URL和识别的文字(如果是语音消息) if message_type == ChatMessage.MESSAGE_TYPE_AUDIO: response_data["user_message"] = user_chat_message.message if message_audio_url: response_data["user_audio_url"] = message_audio_url elif message_audio_base64: response_data["user_audio_base64"] = message_audio_base64 # 如果有音频URL,添加到响应中 if bot_chat_message.message_audio_url: response_data["audio_url"] = bot_chat_message.message_audio_url # 如果有返回Base64编码的音频数据 if return_audio_as_base64 and message_type == ChatMessage.MESSAGE_TYPE_AUDIO: response_data["audio_base64"] = response_message_audio_base64 # 如果有视频URL,添加到响应中 if bot_chat_message.message_video_url: response_data["video_url"] = bot_chat_message.message_video_url return Response( response_data, status=status.HTTP_201_CREATED, ) def ask_kimi(user: ParadiseUser, bot: Bot, history: list) -> ChatMessage: response = KIMI(history).get_response() # 移除开头和结尾的括号内容 # 移除开头的中英文括号内容 response = re.sub(r'^[\((][^))]*[\))][\s]*', '', response) # 移除结尾的中英文括号内容 response = re.sub(r'[\s]*[\((][^))]*[\))]$', '', response) chat_message = ChatMessage.objects.create( user=user, bot=bot, message=response, sender=ChatMessage.SENDER_BOT, ) chat_message.save() return chat_message def handle_first_access(user: ParadiseUser, bot: Bot): if not ChatMessage.objects.filter(user=user, bot=bot).exists(): ChatMessage.objects.create( user=user, bot=bot, message=bot.description, sender=ChatMessage.SENDER_SYSTEM, message_type=ChatMessage.MESSAGE_TYPE_TEXT ).save() def create_user_message(user: ParadiseUser, bot: Bot, message_type: str, user_message: str, message_audio_url: str, message_video_url: str, message_audio_base64: str = None, message_video_base64: str = None) -> ChatMessage: if message_type == ChatMessage.MESSAGE_TYPE_TEXT: user_chat_message = ChatMessage.objects.create( user=user, bot=bot, message=user_message, sender=ChatMessage.SENDER_USER, message_type=ChatMessage.MESSAGE_TYPE_TEXT ) user_chat_message.save() logger.info(f"Created text message for user {user.id} and bot {bot.id}") return user_chat_message if message_type == ChatMessage.MESSAGE_TYPE_AUDIO: # 从 URL 获取音频文件内容或处理 Base64 音频数据 audio_data = None audio_service = get_audio_service() # 检查是否提供了音频URL if message_audio_url: logger.info(f"Processing audio message from URL: {message_audio_url}") response = requests.get(message_audio_url) if response.status_code != 200: logger.error(f"Failed to download audio file: {response.status_code}") raise Exception("无法下载音频文件") audio_data = response.content elif message_audio_base64: # 处理Base64编码的音频数据 logger.info("Processing audio message from Base64 data") import base64 try: audio_data = base64.b64decode(message_audio_base64) except Exception as e: logger.error(f"Failed to decode base64 audio data: {str(e)}") raise Exception("无法解码Base64音频数据") if not audio_data: logger.error("No audio data provided (neither URL nor Base64)") raise Exception("未提供音频数据") message = audio_service.recognize_speech(audio_data) logger.info(f"Recognized speech message: {message}") user_chat_message = ChatMessage.objects.create( user=user, bot=bot, message=message, sender=ChatMessage.SENDER_USER, message_type=ChatMessage.MESSAGE_TYPE_AUDIO ) if message_audio_url: user_chat_message.message_audio_url = message_audio_url user_chat_message.save() logger.info(f"Created audio message for user {user.id} and bot {bot.id}") return user_chat_message if message_type == ChatMessage.MESSAGE_TYPE_VIDEO: # 处理视频消息 # 目前仅保存视频URL或转换Base64数据为文件URL的逻辑 video_url = message_video_url # 处理Base64视频数据 if not video_url and message_video_base64: logger.info("Processing video message from Base64 data") import base64 try: # 这里需要将Base64解码并保存为文件,然后返回URL # 以下为示例代码,实际实现需要根据项目需求调整 import os import uuid from django.conf import settings # 创建保存视频的目录 video_dir = os.path.join(settings.MEDIA_ROOT, 'video') os.makedirs(video_dir, exist_ok=True) # 生成唯一文件名 video_filename = f"{uuid.uuid4()}.mp4" video_path = os.path.join(video_dir, video_filename) # 解码并保存文件 video_data = base64.b64decode(message_video_base64) with open(video_path, 'wb') as f: f.write(video_data) # 生成URL video_url = f"{settings.MEDIA_URL}video/{video_filename}" logger.info(f"Generated video URL from Base64: {video_url}") except Exception as e: logger.error(f"Failed to process base64 video data: {str(e)}") raise Exception("无法处理Base64视频数据") if not video_url: logger.error("No video URL or data provided") raise Exception("未提供视频数据") user_chat_message = ChatMessage.objects.create( user=user, bot=bot, message=user_message, # 可能需要从视频中提取文本 sender=ChatMessage.SENDER_USER, message_type=ChatMessage.MESSAGE_TYPE_VIDEO, message_video_url=video_url ) user_chat_message.save() logger.info(f"Created video message for user {user.id} and bot {bot.id}") return user_chat_message return None class RTCChatHistoryAPIView(APIView): """ RTC 语音智能体聊天记录接口 GET: 获取当前用户的 RTC 聊天历史 POST: 保存一条 RTC 聊天消息 DELETE: 清空当前用户的 RTC 聊天记录 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] RTC_BOT_NAME = 'RTC_Voice_Agent' def _get_rtc_bot(self): try: return Bot.objects.get(name=self.RTC_BOT_NAME) except Bot.DoesNotExist: return None def get(self, request): bot = self._get_rtc_bot() if bot is None: return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) page_size = int(request.query_params.get('page_size', 50)) page_size = min(page_size, 200) # since_id 增量拉取:B' 方案 WebSocket 漏推后,客户端用此参数补漏 # 不传 since_id:返回最近 page_size 条(兼容老逻辑) # 传 since_id:返回 id > since_id 的所有消息,按时间升序,最多 page_size 条 since_id_raw = request.query_params.get('since_id') queryset = ChatMessage.objects.filter( user=request.user, bot=bot ).order_by('timestamp') if since_id_raw: try: since_id = int(since_id_raw) queryset = queryset.filter(id__gt=since_id) messages = queryset[:page_size] total = queryset.count() has_more = total > page_size except ValueError: return error_response(message='since_id 必须是整数') else: total = queryset.count() messages = queryset[max(0, total - page_size):] has_more = total > page_size data = { 'messages': [ { 'id': msg.id, 'message': msg.message, 'sender': msg.sender, 'timestamp': msg.timestamp.isoformat(), } for msg in messages ], 'total': total, 'has_more': has_more, } return success_response(data=data) def post(self, request): bot = self._get_rtc_bot() if bot is None: return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) message_text = request.data.get('message', '').strip() sender = request.data.get('sender', '').strip() if not message_text: return error_response(message='消息内容不能为空') if sender not in ('user', 'assistant'): return error_response(message='sender 必须是 user 或 assistant') # B' 灰度期去重:手机端老 App 仍走 POST,与 strategy B webhook 路径并存会双倒 # 同一 (user, bot, sender, message) 在 ±2s 内已存在则跳过 # 该机制同时保护 strategy B 自身重放 / 客户端重试场景 from datetime import timedelta from django.utils import timezone now = timezone.now() existing = ChatMessage.objects.filter( user=request.user, bot=bot, sender=sender, message=message_text, timestamp__gte=now - timedelta(seconds=2), timestamp__lte=now + timedelta(seconds=2), ).first() if existing is not None: logger.info('RTC 聊天 POST 去重命中: user=%s sender=%s msg_id=%s', request.user.id, sender, existing.id) return success_response(data={ 'id': existing.id, 'timestamp': existing.timestamp.isoformat(), 'deduplicated': True, }, message='消息已存在(去重)') chat_msg = ChatMessage.objects.create( user=request.user, bot=bot, message=message_text, sender=sender, message_type=ChatMessage.MESSAGE_TYPE_TEXT, ) return created_response(data={ 'id': chat_msg.id, 'timestamp': chat_msg.timestamp.isoformat(), }) def delete(self, request): bot = self._get_rtc_bot() if bot is None: return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete() return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录') # ====================================================================== # Phase 2 — 通用凭据槽位管理端读写接口(CRED-03 + CRED-04) # 1:1 复刻 RTCChatHistoryAPIView 的单 URL 多方法 APIView 风格 # ====================================================================== class CredentialSlotPutRequestSchema(serializers.Serializer): """drf-yasg 专用 — PUT 请求体 schema(不参与实际写入校验,仅给 swagger 看)。 实际写入校验由 CredentialSlotSerializer.is_valid 执行。 """ app_id = serializers.CharField( required=False, allow_blank=True, help_text="第三方服务商分配的 APP ID(明文写入;缺省时保留原值)" ) access_token = serializers.CharField( required=False, allow_blank=True, help_text="第三方服务商访问令牌(明文写入;响应阶段会脱敏返回末 4 位)" ) # 响应 data 子 schema:access_token 字段 description 显式标注脱敏掩码语义 _credential_slot_data_schema = openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'app_id': openapi.Schema( type=openapi.TYPE_STRING, description='第三方服务商分配的 APP ID(明文)', ), 'access_token': openapi.Schema( type=openapi.TYPE_STRING, description='Access Token 末 4 位脱敏掩码(如 "*********1234",前缀字符数 = 原长 - 4)', ), 'updated_at': openapi.Schema( type=openapi.TYPE_STRING, format='date-time', description='最近一次更新时间(ISO 8601)', ), }, ) class CredentialSlotAdminView(APIView): """通用凭据槽位管理端读写接口(admin token 鉴权)。 GET: 返回 access_token 末 4 位脱敏后的凭据槽位 PUT: 全字段覆写凭据槽位(空记录场景自动 get_or_create),响应同样脱敏 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['通用凭据槽位(管理端)'] def _ensure_admin(self, request): """admin-only 二次校验:拒绝非 staff 用户(含普通 user token 持有者)。 per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated permission 类; 现有 AdminEmailLoginView (userapp/views.py:748-754) / AdminLogoutView 一律走 视图内 is_staff 检查。统一沿用此模式,不发明新 permission 类。 """ if not request.user.is_staff: logger.warning( f"Non-admin user attempted CredentialSlot admin endpoint: user_id={request.user.id}" ) return error_response( message="需要管理员权限", code=403, status_code=status.HTTP_403_FORBIDDEN, ) return None def _build_response_data(self, instance): """构造脱敏后的响应 data 字典。 per CONTEXT.md:GET 与 PUT 响应都必须脱敏 access_token,避免运营在 admin UI 看到自己刚提交的明文回显(CONTEXT.md 决策"PUT 响应也走脱敏")。 """ serializer = CredentialSlotSerializer(instance) data = dict(serializer.data) # 关键脱敏点:用 instance.access_token(明文)走 mask_token,覆盖 serializer.data 里的明文 data['access_token'] = mask_token(instance.access_token) return data @swagger_auto_schema( operation_description="读取通用凭据槽位(access_token 末 4 位脱敏返回,admin token 鉴权)", responses={ 200: openapi.Response('读取成功', get_standardized_response_schema(_credential_slot_data_schema)), 401: openapi.Response('未提供有效 token', get_standardized_response_schema()), 403: openapi.Response('需要管理员权限', get_standardized_response_schema()), }, security=[{'Bearer': []}], tags=['通用凭据槽位(管理端)'], ) def get(self, request): forbidden = self._ensure_admin(request) if forbidden: return forbidden instance = CredentialSlot.get_solo() data = self._build_response_data(instance) return success_response(data=data, message="读取成功") @swagger_auto_schema( request_body=CredentialSlotPutRequestSchema, operation_description="全字段覆写通用凭据槽位(admin token 鉴权;写入后响应脱敏返回)", responses={ 200: openapi.Response('更新成功', get_standardized_response_schema(_credential_slot_data_schema)), 400: openapi.Response('参数无效', get_standardized_response_schema()), 401: openapi.Response('未提供有效 token', get_standardized_response_schema()), 403: openapi.Response('需要管理员权限', get_standardized_response_schema()), }, security=[{'Bearer': []}], tags=['通用凭据槽位(管理端)'], ) def put(self, request): forbidden = self._ensure_admin(request) if forbidden: return forbidden instance = CredentialSlot.get_solo() # 空记录场景自动 get_or_create serializer = CredentialSlotSerializer(instance, data=request.data) if not serializer.is_valid(): return error_response( message="参数无效", code=400, status_code=status.HTTP_400_BAD_REQUEST, data=serializer.errors, ) serializer.save() # auto_now 自动刷 updated_at # 重新读取 instance.access_token:serializer.save() 后 instance 已被同步刷新; # _build_response_data 内部会再次 dict(serializer.data) 拿最新 OrderedDict。 data = self._build_response_data(instance) return success_response(data=data, message="凭据已更新") # ====================================================================== # Phase 3 — 通用凭据槽位客户端读取接口(CRED-05) # 1:1 复刻 CredentialSlotAdminView 的 GET 部分,删 _ensure_admin / _build_response_data / PUT # 关键差异:明文返回 access_token,不调 mask_token,不做 is_staff 二次校验 # ====================================================================== # 客户端响应 data 子 schema:access_token 字段 description 显式标注「明文」 _credential_slot_client_data_schema = openapi.Schema( type=openapi.TYPE_OBJECT, properties={ 'app_id': openapi.Schema( type=openapi.TYPE_STRING, description='第三方服务商分配的 APP ID(明文)', ), 'access_token': openapi.Schema( type=openapi.TYPE_STRING, description='明文 Access Token,供手机/设备端实际调用第三方服务(管理端同接口会脱敏返回末 4 位)', ), 'updated_at': openapi.Schema( type=openapi.TYPE_STRING, format='date-time', description='最近一次更新时间(ISO 8601)', ), }, ) class CredentialSlotClientView(APIView): """通用凭据槽位客户端读取接口(user / admin token 鉴权,明文返回)。 GET: 返回明文 app_id + access_token,供手机/设备端实际调用第三方服务。 与管理端 CredentialSlotAdminView 的关键差异(per CONTEXT.md D-Client-View): - 不做 is_staff 二次校验:admin / user token 都允许(admin 用户是手机用户超集) - 不脱敏:直接返回 serializer.data(明文返回) - 仅 GET:客户端只读,不能写入 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['通用凭据槽位(客户端)'] @swagger_auto_schema( operation_description="读取通用凭据槽位(明文 access_token,供手机/设备端实际调用第三方服务)", responses={ 200: openapi.Response( '读取成功', get_standardized_response_schema(_credential_slot_client_data_schema), ), 401: openapi.Response('未提供有效 token', get_standardized_response_schema()), }, security=[{'Bearer': []}], tags=['通用凭据槽位(客户端)'], ) def get(self, request): instance = CredentialSlot.get_solo() serializer = CredentialSlotSerializer(instance) return success_response(data=serializer.data, message="读取成功")