feat(02-01): 新增 CredentialSlotAdminView(GET 脱敏 / PUT 全字段覆写)

- 1:1 复刻 RTCChatHistoryAPIView 单 URL 多方法 APIView 风格(不走 RetrieveUpdateAPIView)
- authentication_classes=[RedisTokenAuthentication]
- permission_classes=[IsAuthenticated] + view 内 _ensure_admin 二次校验 is_staff
  (per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated 类,沿用 AdminEmailLoginView 模式)
- _build_response_data helper 强制脱敏:data['access_token'] = mask_token(instance.access_token)
- GET / PUT 都走 _build_response_data,避免 PUT 直接 return success_response(data=serializer.data)
  导致明文回显(CONTEXT.md / Pitfall 3 锁定)
- @swagger_auto_schema method-level 装饰:access_token 字段 description 显式标注脱敏掩码
- 顶部 import 追加:CredentialSlot / CredentialSlotSerializer / mask_token /
  get_standardized_response_schema
This commit is contained in:
pmc 2026-05-07 22:53:38 +08:00
parent 6820fe7fd4
commit 192d0a15ec

View File

@ -2,14 +2,15 @@ from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework import status, viewsets, permissions from rest_framework import status, viewsets, permissions
from django.contrib.auth.models import User from django.contrib.auth.models import User
from .models import ChatMessage, Bot from .models import ChatMessage, Bot, CredentialSlot
from userapp.models import ParadiseUser from userapp.models import ParadiseUser
from .serializers import ChatMessageSerializer from .serializers import ChatMessageSerializer, CredentialSlotSerializer
from rest_framework.permissions import IsAuthenticated from rest_framework.permissions import IsAuthenticated
from userapp.authentication import RedisTokenAuthentication from userapp.authentication import RedisTokenAuthentication
from rest_framework import serializers from rest_framework import serializers
from common.swagger_utils import swagger_schema from common.swagger_utils import swagger_schema, get_standardized_response_schema
from common.responses import success_response, created_response, error_response from common.responses import success_response, created_response, error_response
from common.utils import mask_token
from drf_yasg import openapi from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema from drf_yasg.utils import swagger_auto_schema
import logging import logging
@ -552,4 +553,135 @@ class RTCChatHistoryAPIView(APIView):
return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete() count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete()
return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录') return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录')
# ======================================================================
# Phase 2 — 通用凭据槽位管理端读写接口CRED-03 + CRED-04
# 1:1 复刻 RTCChatHistoryAPIView 的单 URL 多方法 APIView 风格
# ======================================================================
class CredentialSlotPutRequestSchema(serializers.Serializer):
"""drf-yasg 专用 — PUT 请求体 schema不参与实际写入校验仅给 swagger 看)。
实际写入校验由 CredentialSlotSerializer.is_valid 执行
"""
app_id = serializers.CharField(
required=False, allow_blank=True,
help_text="第三方服务商分配的 APP ID明文写入缺省时保留原值"
)
access_token = serializers.CharField(
required=False, allow_blank=True,
help_text="第三方服务商访问令牌(明文写入;响应阶段会脱敏返回末 4 位)"
)
# 响应 data 子 schemaaccess_token 字段 description 显式标注脱敏掩码语义
_credential_slot_data_schema = openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'app_id': openapi.Schema(
type=openapi.TYPE_STRING,
description='第三方服务商分配的 APP ID明文',
),
'access_token': openapi.Schema(
type=openapi.TYPE_STRING,
description='Access Token 末 4 位脱敏掩码(如 "*********1234",前缀字符数 = 原长 - 4',
),
'updated_at': openapi.Schema(
type=openapi.TYPE_STRING,
format='date-time',
description='最近一次更新时间ISO 8601',
),
},
)
class CredentialSlotAdminView(APIView):
"""通用凭据槽位管理端读写接口admin token 鉴权)。
GET: 返回 access_token 4 位脱敏后的凭据槽位
PUT: 全字段覆写凭据槽位空记录场景自动 get_or_create响应同样脱敏
"""
authentication_classes = [RedisTokenAuthentication]
permission_classes = [IsAuthenticated]
tags = ['通用凭据槽位(管理端)']
def _ensure_admin(self, request):
"""admin-only 二次校验:拒绝非 staff 用户(含普通 user token 持有者)。
per RESEARCH.md仓库零处 IsAdminTokenAuthenticated permission
现有 AdminEmailLoginView (userapp/views.py:748-754) / AdminLogoutView 一律走
视图内 is_staff 检查统一沿用此模式不发明新 permission
"""
if not request.user.is_staff:
logger.warning(
f"Non-admin user attempted CredentialSlot admin endpoint: user_id={request.user.id}"
)
return error_response(
message="需要管理员权限",
code=403,
status_code=status.HTTP_403_FORBIDDEN,
)
return None
def _build_response_data(self, instance):
"""构造脱敏后的响应 data 字典。
per CONTEXT.mdGET PUT 响应都必须脱敏 access_token避免运营在
admin UI 看到自己刚提交的明文回显CONTEXT.md 决策"PUT 响应也走脱敏"
"""
serializer = CredentialSlotSerializer(instance)
data = dict(serializer.data)
# 关键脱敏点:用 instance.access_token明文走 mask_token覆盖 serializer.data 里的明文
data['access_token'] = mask_token(instance.access_token)
return data
@swagger_auto_schema(
operation_description="读取通用凭据槽位access_token 末 4 位脱敏返回admin token 鉴权)",
responses={
200: openapi.Response('读取成功', get_standardized_response_schema(_credential_slot_data_schema)),
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
},
security=[{'Bearer': []}],
tags=['通用凭据槽位(管理端)'],
)
def get(self, request):
forbidden = self._ensure_admin(request)
if forbidden:
return forbidden
instance = CredentialSlot.get_solo()
data = self._build_response_data(instance)
return success_response(data=data, message="读取成功")
@swagger_auto_schema(
request_body=CredentialSlotPutRequestSchema,
operation_description="全字段覆写通用凭据槽位admin token 鉴权;写入后响应脱敏返回)",
responses={
200: openapi.Response('更新成功', get_standardized_response_schema(_credential_slot_data_schema)),
400: openapi.Response('参数无效', get_standardized_response_schema()),
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
},
security=[{'Bearer': []}],
tags=['通用凭据槽位(管理端)'],
)
def put(self, request):
forbidden = self._ensure_admin(request)
if forbidden:
return forbidden
instance = CredentialSlot.get_solo() # 空记录场景自动 get_or_create
serializer = CredentialSlotSerializer(instance, data=request.data)
if not serializer.is_valid():
return error_response(
message="参数无效",
code=400,
status_code=status.HTTP_400_BAD_REQUEST,
data=serializer.errors,
)
serializer.save() # auto_now 自动刷 updated_at
# 重新读取 instance.access_tokenserializer.save() 后 instance 已被同步刷新;
# _build_response_data 内部会再次 dict(serializer.data) 拿最新 OrderedDict。
data = self._build_response_data(instance)
return success_response(data=data, message="凭据已更新")