feat(02-01): 新增 CredentialSlotAdminView(GET 脱敏 / PUT 全字段覆写)
- 1:1 复刻 RTCChatHistoryAPIView 单 URL 多方法 APIView 风格(不走 RetrieveUpdateAPIView) - authentication_classes=[RedisTokenAuthentication] - permission_classes=[IsAuthenticated] + view 内 _ensure_admin 二次校验 is_staff (per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated 类,沿用 AdminEmailLoginView 模式) - _build_response_data helper 强制脱敏:data['access_token'] = mask_token(instance.access_token) - GET / PUT 都走 _build_response_data,避免 PUT 直接 return success_response(data=serializer.data) 导致明文回显(CONTEXT.md / Pitfall 3 锁定) - @swagger_auto_schema method-level 装饰:access_token 字段 description 显式标注脱敏掩码 - 顶部 import 追加:CredentialSlot / CredentialSlotSerializer / mask_token / get_standardized_response_schema
This commit is contained in:
parent
6820fe7fd4
commit
192d0a15ec
@ -2,14 +2,15 @@ from rest_framework.views import APIView
|
|||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
from rest_framework import status, viewsets, permissions
|
from rest_framework import status, viewsets, permissions
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from .models import ChatMessage, Bot
|
from .models import ChatMessage, Bot, CredentialSlot
|
||||||
from userapp.models import ParadiseUser
|
from userapp.models import ParadiseUser
|
||||||
from .serializers import ChatMessageSerializer
|
from .serializers import ChatMessageSerializer, CredentialSlotSerializer
|
||||||
from rest_framework.permissions import IsAuthenticated
|
from rest_framework.permissions import IsAuthenticated
|
||||||
from userapp.authentication import RedisTokenAuthentication
|
from userapp.authentication import RedisTokenAuthentication
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
from common.swagger_utils import swagger_schema
|
from common.swagger_utils import swagger_schema, get_standardized_response_schema
|
||||||
from common.responses import success_response, created_response, error_response
|
from common.responses import success_response, created_response, error_response
|
||||||
|
from common.utils import mask_token
|
||||||
from drf_yasg import openapi
|
from drf_yasg import openapi
|
||||||
from drf_yasg.utils import swagger_auto_schema
|
from drf_yasg.utils import swagger_auto_schema
|
||||||
import logging
|
import logging
|
||||||
@ -553,3 +554,134 @@ class RTCChatHistoryAPIView(APIView):
|
|||||||
|
|
||||||
count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete()
|
count, _ = ChatMessage.objects.filter(user=request.user, bot=bot).delete()
|
||||||
return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录')
|
return success_response(data={'deleted': count}, message=f'已删除 {count} 条记录')
|
||||||
|
|
||||||
|
|
||||||
|
# ======================================================================
|
||||||
|
# Phase 2 — 通用凭据槽位管理端读写接口(CRED-03 + CRED-04)
|
||||||
|
# 1:1 复刻 RTCChatHistoryAPIView 的单 URL 多方法 APIView 风格
|
||||||
|
# ======================================================================
|
||||||
|
|
||||||
|
class CredentialSlotPutRequestSchema(serializers.Serializer):
|
||||||
|
"""drf-yasg 专用 — PUT 请求体 schema(不参与实际写入校验,仅给 swagger 看)。
|
||||||
|
|
||||||
|
实际写入校验由 CredentialSlotSerializer.is_valid 执行。
|
||||||
|
"""
|
||||||
|
app_id = serializers.CharField(
|
||||||
|
required=False, allow_blank=True,
|
||||||
|
help_text="第三方服务商分配的 APP ID(明文写入;缺省时保留原值)"
|
||||||
|
)
|
||||||
|
access_token = serializers.CharField(
|
||||||
|
required=False, allow_blank=True,
|
||||||
|
help_text="第三方服务商访问令牌(明文写入;响应阶段会脱敏返回末 4 位)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# 响应 data 子 schema:access_token 字段 description 显式标注脱敏掩码语义
|
||||||
|
_credential_slot_data_schema = openapi.Schema(
|
||||||
|
type=openapi.TYPE_OBJECT,
|
||||||
|
properties={
|
||||||
|
'app_id': openapi.Schema(
|
||||||
|
type=openapi.TYPE_STRING,
|
||||||
|
description='第三方服务商分配的 APP ID(明文)',
|
||||||
|
),
|
||||||
|
'access_token': openapi.Schema(
|
||||||
|
type=openapi.TYPE_STRING,
|
||||||
|
description='Access Token 末 4 位脱敏掩码(如 "*********1234",前缀字符数 = 原长 - 4)',
|
||||||
|
),
|
||||||
|
'updated_at': openapi.Schema(
|
||||||
|
type=openapi.TYPE_STRING,
|
||||||
|
format='date-time',
|
||||||
|
description='最近一次更新时间(ISO 8601)',
|
||||||
|
),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CredentialSlotAdminView(APIView):
|
||||||
|
"""通用凭据槽位管理端读写接口(admin token 鉴权)。
|
||||||
|
|
||||||
|
GET: 返回 access_token 末 4 位脱敏后的凭据槽位
|
||||||
|
PUT: 全字段覆写凭据槽位(空记录场景自动 get_or_create),响应同样脱敏
|
||||||
|
"""
|
||||||
|
authentication_classes = [RedisTokenAuthentication]
|
||||||
|
permission_classes = [IsAuthenticated]
|
||||||
|
tags = ['通用凭据槽位(管理端)']
|
||||||
|
|
||||||
|
def _ensure_admin(self, request):
|
||||||
|
"""admin-only 二次校验:拒绝非 staff 用户(含普通 user token 持有者)。
|
||||||
|
|
||||||
|
per RESEARCH.md:仓库零处 IsAdminTokenAuthenticated permission 类;
|
||||||
|
现有 AdminEmailLoginView (userapp/views.py:748-754) / AdminLogoutView 一律走
|
||||||
|
视图内 is_staff 检查。统一沿用此模式,不发明新 permission 类。
|
||||||
|
"""
|
||||||
|
if not request.user.is_staff:
|
||||||
|
logger.warning(
|
||||||
|
f"Non-admin user attempted CredentialSlot admin endpoint: user_id={request.user.id}"
|
||||||
|
)
|
||||||
|
return error_response(
|
||||||
|
message="需要管理员权限",
|
||||||
|
code=403,
|
||||||
|
status_code=status.HTTP_403_FORBIDDEN,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _build_response_data(self, instance):
|
||||||
|
"""构造脱敏后的响应 data 字典。
|
||||||
|
|
||||||
|
per CONTEXT.md:GET 与 PUT 响应都必须脱敏 access_token,避免运营在
|
||||||
|
admin UI 看到自己刚提交的明文回显(CONTEXT.md 决策"PUT 响应也走脱敏")。
|
||||||
|
"""
|
||||||
|
serializer = CredentialSlotSerializer(instance)
|
||||||
|
data = dict(serializer.data)
|
||||||
|
# 关键脱敏点:用 instance.access_token(明文)走 mask_token,覆盖 serializer.data 里的明文
|
||||||
|
data['access_token'] = mask_token(instance.access_token)
|
||||||
|
return data
|
||||||
|
|
||||||
|
@swagger_auto_schema(
|
||||||
|
operation_description="读取通用凭据槽位(access_token 末 4 位脱敏返回,admin token 鉴权)",
|
||||||
|
responses={
|
||||||
|
200: openapi.Response('读取成功', get_standardized_response_schema(_credential_slot_data_schema)),
|
||||||
|
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
|
||||||
|
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
|
||||||
|
},
|
||||||
|
security=[{'Bearer': []}],
|
||||||
|
tags=['通用凭据槽位(管理端)'],
|
||||||
|
)
|
||||||
|
def get(self, request):
|
||||||
|
forbidden = self._ensure_admin(request)
|
||||||
|
if forbidden:
|
||||||
|
return forbidden
|
||||||
|
instance = CredentialSlot.get_solo()
|
||||||
|
data = self._build_response_data(instance)
|
||||||
|
return success_response(data=data, message="读取成功")
|
||||||
|
|
||||||
|
@swagger_auto_schema(
|
||||||
|
request_body=CredentialSlotPutRequestSchema,
|
||||||
|
operation_description="全字段覆写通用凭据槽位(admin token 鉴权;写入后响应脱敏返回)",
|
||||||
|
responses={
|
||||||
|
200: openapi.Response('更新成功', get_standardized_response_schema(_credential_slot_data_schema)),
|
||||||
|
400: openapi.Response('参数无效', get_standardized_response_schema()),
|
||||||
|
401: openapi.Response('未提供有效 token', get_standardized_response_schema()),
|
||||||
|
403: openapi.Response('需要管理员权限', get_standardized_response_schema()),
|
||||||
|
},
|
||||||
|
security=[{'Bearer': []}],
|
||||||
|
tags=['通用凭据槽位(管理端)'],
|
||||||
|
)
|
||||||
|
def put(self, request):
|
||||||
|
forbidden = self._ensure_admin(request)
|
||||||
|
if forbidden:
|
||||||
|
return forbidden
|
||||||
|
instance = CredentialSlot.get_solo() # 空记录场景自动 get_or_create
|
||||||
|
serializer = CredentialSlotSerializer(instance, data=request.data)
|
||||||
|
if not serializer.is_valid():
|
||||||
|
return error_response(
|
||||||
|
message="参数无效",
|
||||||
|
code=400,
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
data=serializer.errors,
|
||||||
|
)
|
||||||
|
serializer.save() # auto_now 自动刷 updated_at
|
||||||
|
# 重新读取 instance.access_token:serializer.save() 后 instance 已被同步刷新;
|
||||||
|
# _build_response_data 内部会再次 dict(serializer.data) 拿最新 OrderedDict。
|
||||||
|
data = self._build_response_data(instance)
|
||||||
|
return success_response(data=data, message="凭据已更新")
|
||||||
Loading…
x
Reference in New Issue
Block a user