feat(02-01): 新增 CredentialSlotAdminView(GET 脱敏 / PUT 全字段覆写)
- 1:1 复刻 RTCChatHistoryAPIView 单 URL 多方法 APIView 风格(不走 RetrieveUpdateAPIView) - authentication_classes=[RedisTokenAuthentication] - permission_classes=[IsAuthenticated] + view 内 _ensure_admin 二次校验 is_staff (per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated 类,沿用 AdminEmailLoginView 模式) - _build_response_data helper 强制脱敏:data['access_token'] = mask_token(instance.access_token) - GET / PUT 都走 _build_response_data,避免 PUT 直接 return success_response(data=serializer.data) 导致明文回显(CONTEXT.md / Pitfall 3 锁定) - @swagger_auto_schema method-level 装饰:access_token 字段 description 显式标注脱敏掩码 - 顶部 import 追加:CredentialSlot / CredentialSlotSerializer / mask_token / get_standardized_response_schema
This commit is contained in:
parent
6820fe7fd4
commit
192d0a15ec
@ -2,14 +2,15 @@ from rest_framework.views import APIView
|
||||
from rest_framework.response import Response
|
||||
from rest_framework import status, viewsets, permissions
|
||||
from django.contrib.auth.models import User
|
||||
from .models import ChatMessage, Bot
|
||||
from .models import ChatMessage, Bot, CredentialSlot
|
||||
from userapp.models import ParadiseUser
|
||||
from .serializers import ChatMessageSerializer
|
||||
from .serializers import ChatMessageSerializer, CredentialSlotSerializer
|
||||
from rest_framework.permissions import IsAuthenticated
|
||||
from userapp.authentication import RedisTokenAuthentication
|
||||
from rest_framework import serializers
|
||||
from common.swagger_utils import swagger_schema
|
||||
from common.swagger_utils import swagger_schema, get_standardized_response_schema
|
||||
from common.responses import success_response, created_response, error_response
|
||||
from common.utils import mask_token
|
||||
from drf_yasg import openapi
|
||||
from drf_yasg.utils import swagger_auto_schema
|
||||
import logging
|
||||
@ -552,4 +553,135 @@ class RTCChatHistoryAPIView(APIView):
|
||||
return error_response(message='RTC Bot 未配置', code=500, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
|
||||
count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete()
|
||||
return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录')
|
||||
return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录')
|
||||
|
||||
|
||||
# ======================================================================
|
||||
# Phase 2 — 通用凭据槽位管理端读写接口(CRED-03 + CRED-04)
|
||||
# 1:1 复刻 RTCChatHistoryAPIView 的单 URL 多方法 APIView 风格
|
||||
# ======================================================================
|
||||
|
||||
class CredentialSlotPutRequestSchema(serializers.Serializer):
|
||||
"""drf-yasg 专用 — PUT 请求体 schema(不参与实际写入校验,仅给 swagger 看)。
|
||||
|
||||
实际写入校验由 CredentialSlotSerializer.is_valid 执行。
|
||||
"""
|
||||
app_id = serializers.CharField(
|
||||
required=False, allow_blank=True,
|
||||
help_text="第三方服务商分配的 APP ID(明文写入;缺省时保留原值)"
|
||||
)
|
||||
access_token = serializers.CharField(
|
||||
required=False, allow_blank=True,
|
||||
help_text="第三方服务商访问令牌(明文写入;响应阶段会脱敏返回末 4 位)"
|
||||
)
|
||||
|
||||
|
||||
# 响应 data 子 schema:access_token 字段 description 显式标注脱敏掩码语义
|
||||
_credential_slot_data_schema = openapi.Schema(
|
||||
type=openapi.TYPE_OBJECT,
|
||||
properties={
|
||||
'app_id': openapi.Schema(
|
||||
type=openapi.TYPE_STRING,
|
||||
description='第三方服务商分配的 APP ID(明文)',
|
||||
),
|
||||
'access_token': openapi.Schema(
|
||||
type=openapi.TYPE_STRING,
|
||||
description='Access Token 末 4 位脱敏掩码(如 "*********1234",前缀字符数 = 原长 - 4)',
|
||||
),
|
||||
'updated_at': openapi.Schema(
|
||||
type=openapi.TYPE_STRING,
|
||||
format='date-time',
|
||||
description='最近一次更新时间(ISO 8601)',
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class CredentialSlotAdminView(APIView):
|
||||
"""通用凭据槽位管理端读写接口(admin token 鉴权)。
|
||||
|
||||
GET: 返回 access_token 末 4 位脱敏后的凭据槽位
|
||||
PUT: 全字段覆写凭据槽位(空记录场景自动 get_or_create),响应同样脱敏
|
||||
"""
|
||||
authentication_classes = [RedisTokenAuthentication]
|
||||
permission_classes = [IsAuthenticated]
|
||||
tags = ['通用凭据槽位(管理端)']
|
||||
|
||||
def _ensure_admin(self, request):
|
||||
"""admin-only 二次校验:拒绝非 staff 用户(含普通 user token 持有者)。
|
||||
|
||||
per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated permission 类;
|
||||
现有 AdminEmailLoginView (userapp/views.py:748-754) / AdminLogoutView 一律走
|
||||
视图内 is_staff 检查。统一沿用此模式,不发明新 permission 类。
|
||||
"""
|
||||
if not request.user.is_staff:
|
||||
logger.warning(
|
||||
f"Non-admin user attempted CredentialSlot admin endpoint: user_id={request.user.id}"
|
||||
)
|
||||
return error_response(
|
||||
message="需要管理员权限",
|
||||
code=403,
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
)
|
||||
return None
|
||||
|
||||
def _build_response_data(self, instance):
|
||||
"""构造脱敏后的响应 data 字典。
|
||||
|
||||
per CONTEXT.md:GET 与 PUT 响应都必须脱敏 access_token,避免运营在
|
||||
admin UI 看到自己刚提交的明文回显(CONTEXT.md 决策"PUT 响应也走脱敏")。
|
||||
"""
|
||||
serializer = CredentialSlotSerializer(instance)
|
||||
data = dict(serializer.data)
|
||||
# 关键脱敏点:用 instance.access_token(明文)走 mask_token,覆盖 serializer.data 里的明文
|
||||
data['access_token'] = mask_token(instance.access_token)
|
||||
return data
|
||||
|
||||
@swagger_auto_schema(
|
||||
operation_description="读取通用凭据槽位(access_token 末 4 位脱敏返回,admin token 鉴权)",
|
||||
responses={
|
||||
200: openapi.Response('读取成功', get_standardized_response_schema(_credential_slot_data_schema)),
|
||||
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
|
||||
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
|
||||
},
|
||||
security=[{'Bearer': []}],
|
||||
tags=['通用凭据槽位(管理端)'],
|
||||
)
|
||||
def get(self, request):
|
||||
forbidden = self._ensure_admin(request)
|
||||
if forbidden:
|
||||
return forbidden
|
||||
instance = CredentialSlot.get_solo()
|
||||
data = self._build_response_data(instance)
|
||||
return success_response(data=data, message="读取成功")
|
||||
|
||||
@swagger_auto_schema(
|
||||
request_body=CredentialSlotPutRequestSchema,
|
||||
operation_description="全字段覆写通用凭据槽位(admin token 鉴权;写入后响应脱敏返回)",
|
||||
responses={
|
||||
200: openapi.Response('更新成功', get_standardized_response_schema(_credential_slot_data_schema)),
|
||||
400: openapi.Response('参数无效', get_standardized_response_schema()),
|
||||
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
|
||||
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
|
||||
},
|
||||
security=[{'Bearer': []}],
|
||||
tags=['通用凭据槽位(管理端)'],
|
||||
)
|
||||
def put(self, request):
|
||||
forbidden = self._ensure_admin(request)
|
||||
if forbidden:
|
||||
return forbidden
|
||||
instance = CredentialSlot.get_solo() # 空记录场景自动 get_or_create
|
||||
serializer = CredentialSlotSerializer(instance, data=request.data)
|
||||
if not serializer.is_valid():
|
||||
return error_response(
|
||||
message="参数无效",
|
||||
code=400,
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
data=serializer.errors,
|
||||
)
|
||||
serializer.save() # auto_now 自动刷 updated_at
|
||||
# 重新读取 instance.access_token:serializer.save() 后 instance 已被同步刷新;
|
||||
# _build_response_data 内部会再次 dict(serializer.data) 拿最新 OrderedDict。
|
||||
data = self._build_response_data(instance)
|
||||
return success_response(data=data, message="凭据已更新")
|
||||
Loading…
x
Reference in New Issue
Block a user