lty/qy_lty/userapp/views.py
pmc afa88c142b feat: 实现设备动态绑定方案(步骤1-5)
- 步骤1: MacAddressLoginView 增强,返回 code=4010(未绑定)/4011(不存在),支持自动激活
- 步骤2: 新增 bind_status 接口,设备端轮询查询绑定状态(无需认证)
- 步骤3: 新增 register 设备自注册接口,首次开机自动注册(无需认证)
- 步骤4: UserDeviceSerializer 增加 mac_address 字段
- 步骤5: WebSocket 新增 device_info 消息类型,支持设备状态上报和广播

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 15:53:33 +08:00

823 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dj_rest_auth.registration.views import RegisterView
from rest_framework import viewsets
from .models import ParadiseUser, AffinityRule, AffinityLevel
from device_interaction.models import Device, UserDevice
from .serializers import ParadiseUserSerializer, CustomRegisterSerializer, UserInfoSerializer, ProfileUpdateSerializer
from rest_framework import viewsets, status
from django.contrib.auth.models import Group, Permission
from rest_framework.response import Response
from rest_framework.decorators import action, permission_classes, authentication_classes
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.decorators import action
from django.conf import settings
from django.contrib.auth import authenticate
import random
from .utils import send_sms, generate_token, get_user_id_from_token
from django.core.cache import cache
from .authentication import RedisTokenAuthentication
from rest_framework.views import APIView
from django.conf import settings
from django.utils import timezone
from django.http import HttpResponseRedirect
from django.utils import translation
# 添加drf-yasg相关导入
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework import serializers
# 引入标准化响应工具
from common.responses import success_response, error_response, created_response, api_response
from common.swagger_utils import get_standardized_response_schema, StandardizedResponseSchema
import logging
logger = logging.getLogger(__name__)
# 定义Swagger的请求和响应Schema
class RegisterRequestSchema(serializers.Serializer):
username = serializers.CharField(required=True, help_text="用户名")
password1 = serializers.CharField(required=True, help_text="密码")
password2 = serializers.CharField(required=True, help_text="确认密码")
email = serializers.EmailField(required=False, help_text="邮箱(可选)")
phone_number = serializers.CharField(required=False, help_text="手机号码(可选)")
class RegisterResponseSchema(serializers.Serializer):
status = serializers.CharField(help_text="操作状态")
code = serializers.IntegerField(help_text="状态码")
message = serializers.CharField(help_text="操作结果信息")
class PhoneLoginRequestSchema(serializers.Serializer):
phone_number = serializers.CharField(required=True, help_text="手机号码")
code = serializers.CharField(required=True, help_text="验证码")
class LoginResponseSchema(serializers.Serializer):
token = serializers.CharField(help_text="认证令牌")
class SendVerifyCodeRequestSchema(serializers.Serializer):
phone_number = serializers.CharField(required=True, help_text="手机号码")
class SendVerifyCodeResponseSchema(serializers.Serializer):
message = serializers.CharField(help_text="操作结果信息")
class UsernameLoginRequestSchema(serializers.Serializer):
username = serializers.CharField(required=True, help_text="用户名")
password = serializers.CharField(required=True, help_text="密码")
class EmailLoginRequestSchema(serializers.Serializer):
email = serializers.EmailField(required=True, help_text="邮箱")
password = serializers.CharField(required=True, help_text="密码")
class ErrorResponseSchema(serializers.Serializer):
error = serializers.CharField(help_text="错误信息")
class Meta:
ref_name = "UserErrorResponse"
class MacAddressLoginRequestSchema(serializers.Serializer):
mac_address = serializers.CharField(required=True, help_text="设备MAC地址")
class MacAddressLoginView(APIView):
"""
设备MAC地址登录接口
使用设备MAC地址进行登录。
---
请求参数:
- mac_address: 设备MAC地址
"""
permission_classes = [AllowAny]
tags = ['用户认证']
@swagger_auto_schema(
request_body=MacAddressLoginRequestSchema,
responses={
200: openapi.Response('登录成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
404: openapi.Response('设备不存在或未绑定', get_standardized_response_schema())
},
operation_description="使用设备MAC地址进行登录返回认证令牌"
)
def post(self, request):
"""
设备MAC地址登录
使用设备MAC地址进行登录返回认证令牌。
"""
mac_address = request.data.get('mac_address')
if not mac_address:
logger.warning("Attempt to login without MAC address")
return error_response(message="MAC address is required")
logger.info(f"Attempting MAC address login for device: {mac_address}")
try:
device = Device.objects.get(mac_address=mac_address)
# 检查设备是否已绑定给用户
user_device = UserDevice.objects.filter(device=device).first()
if not user_device:
logger.warning(f"Device not bound to any user: {mac_address}")
# 返回特定 code=4010让设备端可以识别"未绑定"状态
return api_response(
success=False,
message="Device is not bound to any user",
code=4010,
data={
'device_code': device.device_code,
'mac_address': mac_address,
'bound': False
}
)
# 如果设备未激活,在登录时自动激活
if not device.is_active:
device.is_active = True
device.activated_at = timezone.now()
device.save()
logger.info(f"Device auto-activated on login: {mac_address}")
# 生成绑定用户的 token
token = generate_token(user_device.user.id)
logger.info(f"Successfully logged in device with MAC: {mac_address}, bound to user: {user_device.user.id}")
return success_response(
data={
'token': token,
'user_id': user_device.user.id,
'device_code': device.device_code,
'mac_address': mac_address,
'bound': True
},
message="登录成功"
)
except Device.DoesNotExist:
logger.warning(f"Device not found: {mac_address}")
return api_response(
success=False,
message="Device not found",
code=4011,
data={
'mac_address': mac_address,
'registered': False
}
)
except Exception as e:
logger.error(f"MAC address login failed: {str(e)}")
return error_response(message=f"Login failed: {str(e)}")
class CustomRegisterView(RegisterView):
"""
用户注册接口
使用用户名和密码创建新用户账号。
---
请求参数:
- username: 用户名
- password1: 密码
- password2: 确认密码
- email: 邮箱(可选)
- phone_number: 手机号码(可选)
"""
serializer_class = CustomRegisterSerializer
tags = ['用户认证']
@swagger_auto_schema(
request_body=RegisterRequestSchema,
responses={
201: openapi.Response('注册成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema())
},
operation_description="通过用户名和密码创建新用户账号"
)
def create(self, request, *args, **kwargs):
"""
创建新用户
通过用户名和密码创建新用户账号。
"""
logger.info(f"Attempting to register new user with username: {request.data.get('username')}")
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
logger.warning(f"Registration validation failed: {serializer.errors}")
return error_response(message="Registration validation failed", code=400)
headers = self.get_success_headers(serializer.data)
user = serializer.save(self.request)
logger.info(f"Successfully registered new user with ID: {user.id}")
return created_response(
message="User registered successfully",
code=201,
headers=headers
)
# 新增专门的手机登录视图类
class PhoneLoginView(APIView):
"""
手机验证码登录接口
使用手机号和验证码进行登录。
---
请求参数:
- phone_number: 手机号码
- code: 验证码
"""
permission_classes = [AllowAny]
tags = ['用户认证']
@swagger_auto_schema(
request_body=PhoneLoginRequestSchema,
responses={
200: openapi.Response('登录成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema())
},
operation_description="使用手机号和验证码进行登录,返回认证令牌"
)
def post(self, request):
"""
手机验证码登录
使用手机号和验证码进行登录,返回认证令牌。
"""
phone_number = request.data.get('phone_number')
code = request.data.get('code')
if not phone_number or not code:
logger.warning("Attempt to verify code login without phone number or code")
return error_response(message="Phone number and code are required")
logger.info(f"Attempting verification code login for phone number: {phone_number}")
cached_code = cache.get(phone_number)
if cached_code and cached_code == code:
cache.delete(phone_number) # 验证成功后删除验证码
user, created = ParadiseUser.objects.get_or_create(
phone_number=phone_number,
defaults={'username': phone_number}
)
if created:
logger.info(f"Created new user with phone number: {phone_number}")
user.set_unusable_password() # 防止未设置密码的用户被黑客利用
user.save()
token = generate_token(user.id)
logger.info(f"Successfully logged in user with phone number: {phone_number}")
return success_response(data={'token': token}, message="登录成功")
logger.warning(f"Failed verification code login attempt for phone number: {phone_number}")
return error_response(message="Invalid or expired code")
# 新增发送验证码视图类
class SendVerifyCodeView(APIView):
"""
发送手机验证码接口
向指定手机号发送验证码。
---
请求参数:
- phone_number: 手机号码
"""
permission_classes = [AllowAny]
tags = ['用户认证']
@swagger_auto_schema(
request_body=SendVerifyCodeRequestSchema,
responses={
200: openapi.Response('发送成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema())
},
operation_description="向指定手机号发送6位数验证码有效期10分钟"
)
def post(self, request):
"""
发送手机验证码
向指定手机号发送6位数验证码有效期10分钟。
"""
phone_number = request.data.get('phone_number')
if not phone_number:
logger.warning("Attempt to send verification code without phone number")
return error_response(message="Phone number is required")
logger.info(f"Generating verification code for phone number: {phone_number}")
code = str(random.randint(100000, 999999))
response = send_sms(phone_number, code)
if response and 'Code' in response.decode('utf-8') and 'OK' in response.decode('utf-8'):
cache.set(phone_number, code, timeout=600) # 存储验证码超时时间为10分钟
logger.info(f"Successfully sent verification code to {phone_number}")
return success_response(message="Verification code sent")
logger.error(f"Failed to send verification code to {phone_number}")
return error_response(message="Failed to send verification code")
# 新增邮箱登录视图类
class EmailLoginView(APIView):
"""
邮箱登录接口
使用邮箱和密码进行登录。
---
请求参数:
- email: 邮箱
- password: 密码
"""
permission_classes = [AllowAny]
tags = ['用户认证']
@swagger_auto_schema(
request_body=EmailLoginRequestSchema,
responses={
200: openapi.Response('登录成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema())
},
operation_description="使用邮箱和密码进行登录,返回认证令牌"
)
def post(self, request):
"""
邮箱登录
使用邮箱和密码进行登录,返回认证令牌。
"""
email = request.data.get('email')
password = request.data.get('password')
if not email or not password:
logger.warning("Attempt to login without email or password")
return error_response(message="Email and password are required")
# 检查缓存中是否有该用户邮箱的认证结果
cache_key = f"auth:email:{email}"
cached_auth = cache.get(cache_key)
if cached_auth:
if cached_auth == password:
try:
user = ParadiseUser.objects.get(email=email)
token = generate_token(user.id)
logger.info(f"User logged in via cached email credentials: {email}")
return success_response(data={'token': token}, message="登录成功")
except ParadiseUser.DoesNotExist:
logger.warning(f"Login attempt with cached credentials but user not found: {email}")
return error_response(message="Invalid email or password")
else:
return error_response(message="Invalid email or password")
try:
user = ParadiseUser.objects.get(email=email)
if user.check_password(password):
cache.set(cache_key, password, timeout=3600) # 缓存认证结果有效期1小时
token = generate_token(user.id)
logger.info(f"User logged in with email: {email}")
return success_response(data={'token': token}, message="登录成功")
except ParadiseUser.DoesNotExist:
pass
logger.warning(f"Failed login attempt with email: {email}")
return error_response(message="Invalid email or password")
# 新增用户名登录视图类
class UsernameLoginView(APIView):
"""
用户名登录接口
使用用户名和密码进行登录。
---
请求参数:
- username: 用户名
- password: 密码
"""
permission_classes = [AllowAny]
tags = ['用户认证']
@swagger_auto_schema(
request_body=UsernameLoginRequestSchema,
responses={
200: openapi.Response('登录成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema())
},
operation_description="使用用户名和密码进行登录,返回认证令牌"
)
def post(self, request):
"""
用户名登录
使用用户名和密码进行登录,返回认证令牌。
"""
username = request.data.get('username')
password = request.data.get('password')
if not username or not password:
return error_response(message="Username and password are required")
# 检查缓存中是否有该用户的认证结果
cache_key = f"auth:{username}"
cached_auth = cache.get(cache_key)
if cached_auth:
if cached_auth == password:
user = ParadiseUser.objects.get(username=username)
token = generate_token(user.id)
return success_response(data={'token': token}, message="登录成功")
else:
return error_response(message="Invalid username or password")
user = authenticate(request, username=username, password=password)
if user is not None:
# 缓存认证结果有效期1小时
cache.set(cache_key, password, timeout=3600)
token = generate_token(user.id)
return success_response(data={'token': token}, message="登录成功")
else:
return error_response(message="Invalid username or password")
class UserInfoResponseSchema(serializers.Serializer):
status = serializers.CharField(help_text="操作状态")
code = serializers.IntegerField(help_text="状态码")
data = UserInfoSerializer(help_text="用户信息")
class ProfileUpdateRequestSchema(serializers.Serializer):
gender = serializers.ChoiceField(choices=['M', 'F', 'O'], required=False, help_text="性别")
resident_city = serializers.CharField(required=False, help_text="常驻城市")
birthday = serializers.DateField(required=False, help_text="生日")
zodiac_sign = serializers.CharField(required=False, help_text="星座")
mbti = serializers.ChoiceField(
choices=[
'INTJ', 'INTP', 'ENTJ', 'ENTP',
'INFJ', 'INFP', 'ENFJ', 'ENFP',
'ISTJ', 'ISFJ', 'ESTJ', 'ESFJ',
'ISTP', 'ISFP', 'ESTP', 'ESFP'
],
required=False,
help_text="MBTI性格类型"
)
interests = serializers.CharField(required=False, help_text="兴趣爱好")
social_identity = serializers.CharField(required=False, help_text="社会身份")
class BindPhoneRequestSchema(serializers.Serializer):
phone_number = serializers.CharField(required=True, help_text="手机号码")
code = serializers.CharField(required=True, help_text="验证码")
class GroupSerializer(serializers.ModelSerializer):
user_count = serializers.SerializerMethodField()
permissions = serializers.SerializerMethodField()
class Meta:
model = Group
fields = ['id', 'name', 'user_count', 'permissions']
def get_user_count(self, obj):
return obj.paradiseuser_set.count()
def get_permissions(self, obj):
return list(obj.permissions.values_list('codename', flat=True))
class GroupViewSet(viewsets.ModelViewSet):
"""
角色(用户组)管理接口
仅管理员可访问
"""
queryset = Group.objects.all()
serializer_class = GroupSerializer
permission_classes = [IsAuthenticated]
authentication_classes = [RedisTokenAuthentication]
def get_permissions(self):
if self.action in ['list', 'retrieve']:
return [IsAuthenticated()]
return [IsAuthenticated()]
def get_queryset(self):
if not self.request.user.is_staff:
return Group.objects.none()
return Group.objects.all()
class AffinityRuleSerializer(serializers.ModelSerializer):
class Meta:
model = AffinityRule
fields = ['id', 'name', 'description', 'points', 'daily_limit', 'is_active', 'created_at', 'updated_at']
class AffinityLevelSerializer(serializers.ModelSerializer):
class Meta:
model = AffinityLevel
fields = ['id', 'level', 'name', 'description', 'required_points', 'rewards', 'created_at', 'updated_at']
class AffinityRuleViewSet(viewsets.ModelViewSet):
"""
好感度规则管理接口
仅管理员可写,认证用户可读
"""
queryset = AffinityRule.objects.all()
serializer_class = AffinityRuleSerializer
permission_classes = [IsAuthenticated]
authentication_classes = [RedisTokenAuthentication]
def get_permissions(self):
if self.action in ['list', 'retrieve']:
return [IsAuthenticated()]
if not self.request.user.is_staff:
from rest_framework.exceptions import PermissionDenied
raise PermissionDenied()
return [IsAuthenticated()]
class AffinityLevelViewSet(viewsets.ModelViewSet):
"""
好感度等级管理接口
仅管理员可写,认证用户可读
"""
queryset = AffinityLevel.objects.all()
serializer_class = AffinityLevelSerializer
permission_classes = [IsAuthenticated]
authentication_classes = [RedisTokenAuthentication]
def get_permissions(self):
if self.action in ['list', 'retrieve']:
return [IsAuthenticated()]
if not self.request.user.is_staff:
from rest_framework.exceptions import PermissionDenied
raise PermissionDenied()
return [IsAuthenticated()]
class ParadiseUserViewSet(viewsets.ModelViewSet):
"""
用户管理接口
提供用户信息的管理功能,包括获取用户详情等。
"""
queryset = ParadiseUser.objects.all()
serializer_class = ParadiseUserSerializer
authentication_classes = [RedisTokenAuthentication]
permission_classes = [IsAuthenticated]
tags = ['用户管理']
def get_permissions(self):
"""根据不同的action设置不同的权限"""
if self.action == 'create':
return [AllowAny()]
return [IsAuthenticated()]
@swagger_auto_schema(
responses={
200: openapi.Response('获取成功', get_standardized_response_schema()),
401: openapi.Response('认证失败', get_standardized_response_schema())
},
operation_description="返回当前登录用户的详细信息,不包含敏感字段如密码",
security=[{'Bearer': []}]
)
@action(detail=False, methods=['get'])
def info(self, request):
"""
获取当前用户信息
返回当前登录用户的详细信息,不包含敏感字段如密码。
"""
user = request.user
serializer = UserInfoSerializer(user)
logger.info(f"User {user.id} retrieved their profile info")
return success_response(data=serializer.data)
@swagger_auto_schema(
methods=['put', 'patch'],
request_body=ProfileUpdateRequestSchema,
responses={
200: openapi.Response('更新成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
401: openapi.Response('认证失败', get_standardized_response_schema())
},
operation_description="更新当前登录用户的个人资料信息",
security=[{'Bearer': []}]
)
@action(detail=False, methods=['put', 'patch'])
def update_profile(self, request):
"""
更新用户个人资料
允许用户更新自己的个人资料信息,包括性别、常驻城市、生日等。
"""
user = request.user
serializer = ProfileUpdateSerializer(user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
logger.info(f"User {user.id} updated their profile")
return success_response(
data=serializer.data,
message="个人资料已更新"
)
else:
logger.warning(f"User {user.id} profile update failed: {serializer.errors}")
return error_response(
message="个人资料更新失败",
data=serializer.errors,
code=400
)
@swagger_auto_schema(
methods=['post'],
request_body=BindPhoneRequestSchema,
responses={
200: openapi.Response('绑定成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
401: openapi.Response('认证失败', get_standardized_response_schema()),
409: openapi.Response('手机号已被使用', get_standardized_response_schema())
},
operation_description="为当前用户绑定或更新手机号,需要通过验证码验证",
security=[{'Bearer': []}]
)
@action(detail=False, methods=['post'])
def bind_phone(self, request):
"""
绑定或修改手机号
允许用户绑定新手机号或修改已有手机号,需通过验证码验证。
"""
user = request.user
phone_number = request.data.get('phone_number')
code = request.data.get('code')
if not phone_number or not code:
logger.warning(f"User {user.id} attempted to bind phone without providing phone number or code")
return error_response(message="手机号和验证码不能为空", code=400)
# 验证手机号格式(简单验证)
if not phone_number.isdigit() or len(phone_number) < 5 or len(phone_number) > 20:
logger.warning(f"User {user.id} attempted to bind invalid phone number format: {phone_number}")
return error_response(message="手机号格式不正确", code=400)
# 检查手机号是否被其他用户使用
if ParadiseUser.objects.filter(phone_number=phone_number).exclude(id=user.id).exists():
logger.warning(f"User {user.id} attempted to bind phone number already in use: {phone_number}")
return error_response(message="该手机号已被其他账号使用", code=409)
# 验证验证码
cached_code = cache.get(phone_number)
if not cached_code or cached_code != code:
logger.warning(f"User {user.id} provided invalid verification code for phone: {phone_number}")
return error_response(message="验证码无效或已过期", code=400)
# 验证通过,更新手机号
old_phone = user.phone_number
user.phone_number = phone_number
user.save()
# 删除缓存中的验证码
cache.delete(phone_number)
if old_phone:
logger.info(f"User {user.id} changed phone number from {old_phone} to {phone_number}")
return success_response(message="手机号已更新")
else:
logger.info(f"User {user.id} bound new phone number: {phone_number}")
return success_response(message="手机号已绑定")
def custom_set_language(request):
lang_code = request.GET.get('language', None)
next_url = request.GET.get('next', '/')
if lang_code and lang_code in dict(settings.LANGUAGES):
logger.info(f"Setting language to: {lang_code}")
# 设置语言
translation.activate(lang_code)
response = HttpResponseRedirect(next_url)
response.set_cookie(settings.LANGUAGE_COOKIE_NAME, lang_code)
return response
logger.warning(f"Attempt to set invalid language code: {lang_code}")
return HttpResponseRedirect(next_url)
class AdminEmailLoginRequestSchema(serializers.Serializer):
email = serializers.EmailField(required=True, help_text="管理员邮箱")
password = serializers.CharField(required=True, help_text="密码")
class AdminEmailLoginView(APIView):
"""
管理员邮箱登录接口
专用于管理员通过邮箱和密码进行登录,拒绝普通用户登录。
---
请求参数:
- email: 管理员邮箱
- password: 密码
"""
permission_classes = [AllowAny]
tags = ['管理员认证']
@swagger_auto_schema(
request_body=AdminEmailLoginRequestSchema,
responses={
200: openapi.Response('登录成功', get_standardized_response_schema()),
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
403: openapi.Response('权限不足', get_standardized_response_schema())
},
operation_description="专用于管理员通过邮箱和密码登录,拒绝普通用户登录,返回认证令牌"
)
def post(self, request):
"""
管理员邮箱登录
使用邮箱和密码进行管理员登录,返回认证令牌。普通用户无法通过此接口登录。
"""
email = request.data.get('email')
password = request.data.get('password')
if not email or not password:
logger.warning("Attempt to admin login without email or password")
return error_response(message="Email and password are required")
try:
user = ParadiseUser.objects.get(email=email)
# 验证用户是否是管理员
if not user.is_staff:
logger.warning(f"Non-admin user attempted to login via admin endpoint: {email}")
return error_response(
message="Access denied. Admin privileges required.",
code=403,
status_code=status.HTTP_403_FORBIDDEN
)
if user.check_password(password):
# 使用is_admin=True生成管理员专用token
token = generate_token(user.id, is_admin=True)
logger.info(f"Admin logged in with email: {email}")
# 获取用户角色名称(取第一个分组名称)
role_name = "超级管理员" if user.is_superuser else None
if not role_name:
group = user.groups.first()
role_name = group.name if group else "管理员"
return success_response(data={
'token': token,
'is_superuser': user.is_superuser,
'role': role_name,
}, message="管理员登录成功")
except ParadiseUser.DoesNotExist:
pass
logger.warning(f"Failed admin login attempt with email: {email}")
return error_response(message="Invalid email or password")
class AdminLogoutView(APIView):
"""
管理员登出接口
使管理员的认证令牌失效。
"""
authentication_classes = [RedisTokenAuthentication]
permission_classes = [IsAuthenticated]
tags = ['管理员认证']
@swagger_auto_schema(
responses={
200: openapi.Response('登出成功', get_standardized_response_schema()),
401: openapi.Response('认证失败', get_standardized_response_schema()),
403: openapi.Response('权限不足', get_standardized_response_schema())
},
operation_description="使当前管理员的认证令牌失效需要在请求头中提供有效的管理员token",
security=[{'Bearer': []}]
)
def post(self, request):
"""
管理员登出
使当前管理员的认证令牌失效。
"""
user = request.user
# 验证用户是否是管理员
if not user.is_staff:
logger.warning(f"Non-admin user attempted to use admin logout endpoint: {user.id}")
return error_response(
message="Access denied. Admin privileges required.",
code=403,
status_code=status.HTTP_403_FORBIDDEN
)
# 获取请求头中的token
auth_header = request.headers.get('Authorization', '')
if ' ' in auth_header:
_, token = auth_header.split(' ', 1)
# 删除Redis中的token
cache.delete(f"admin_token:{token}")
logger.info(f"Admin {user.id} logged out successfully")
return success_response(message="管理员已成功登出")
return error_response(message="Invalid authorization header")