2026-01-29 10:02:15 +08:00

230 lines
7.6 KiB
Python

"""
管理员模块视图
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from utils.response import success, error
from utils.exceptions import ErrorCode
from .models import AdminUser
from .serializers import (
AdminUserSerializer,
AdminLoginSerializer,
AdminChangePasswordSerializer,
CreateAdminUserSerializer
)
from .authentication import get_admin_tokens, AdminJWTAuthentication
from .permissions import IsAdminUser, IsSuperAdmin
class AdminAuthViewSet(viewsets.ViewSet):
"""管理员认证视图集"""
permission_classes = [AllowAny]
@action(detail=False, methods=['post'])
def login(self, request):
"""
管理员登录
POST /api/admin/auth/login
"""
serializer = AdminLoginSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
username = serializer.validated_data['username']
password = serializer.validated_data['password']
try:
admin = AdminUser.objects.get(username=username)
except AdminUser.DoesNotExist:
return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误')
if not admin.check_password(password):
return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误')
if not admin.is_active:
return error(code=ErrorCode.USER_DISABLED, message='账户已被禁用')
# 记录登录IP
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
admin.last_login_ip = ip
admin.save(update_fields=['last_login_ip'])
# 生成Token
tokens = get_admin_tokens(admin)
return success(data={
'token': tokens,
'admin': AdminUserSerializer(admin).data
}, message='登录成功')
@action(detail=False, methods=['post'])
def refresh(self, request):
"""
刷新Token
POST /api/admin/auth/refresh
"""
refresh_token = request.data.get('refresh')
if not refresh_token:
return error(message='refresh token不能为空')
try:
refresh = RefreshToken(refresh_token)
# 验证是否为admin token
if refresh.get('user_type') != 'admin':
return error(code=ErrorCode.TOKEN_EXPIRED, message='无效的管理员Token')
return success(data={
'access': str(refresh.access_token),
'refresh': str(refresh)
})
except Exception as e:
return error(code=ErrorCode.TOKEN_EXPIRED, message='Token已过期或无效')
class AdminProfileViewSet(viewsets.ViewSet):
"""管理员个人信息视图集"""
authentication_classes = [AdminJWTAuthentication]
permission_classes = [IsAdminUser]
@action(detail=False, methods=['get'], url_path='me')
def me(self, request):
"""
获取当前管理员信息
GET /api/admin/profile/me
"""
serializer = AdminUserSerializer(request.user)
return success(data=serializer.data)
@action(detail=False, methods=['put'], url_path='change-password')
def change_password(self, request):
"""
修改密码
PUT /api/admin/profile/change-password
"""
serializer = AdminChangePasswordSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
if not request.user.check_password(serializer.validated_data['old_password']):
return error(message='原密码错误')
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
return success(message='密码修改成功')
class AdminUserManageViewSet(viewsets.ModelViewSet):
"""管理员用户管理视图集(仅超级管理员可用)"""
queryset = AdminUser.objects.all().order_by('-created_at')
authentication_classes = [AdminJWTAuthentication]
permission_classes = [IsSuperAdmin]
def get_serializer_class(self):
if self.action == 'create':
return CreateAdminUserSerializer
return AdminUserSerializer
def list(self, request, *args, **kwargs):
"""
管理员列表
GET /api/admin/admins
"""
queryset = self.filter_queryset(self.get_queryset())
# 搜索
username = request.query_params.get('username')
role = request.query_params.get('role')
if username:
queryset = queryset.filter(username__contains=username)
if role:
queryset = queryset.filter(role=role)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = AdminUserSerializer(page, many=True)
return success(data={
'total': self.paginator.page.paginator.count,
'items': serializer.data
})
serializer = AdminUserSerializer(queryset, many=True)
return success(data={'items': serializer.data})
def create(self, request, *args, **kwargs):
"""
创建管理员
POST /api/admin/admins
"""
serializer = CreateAdminUserSerializer(data=request.data)
if serializer.is_valid():
admin = serializer.save()
return success(data=AdminUserSerializer(admin).data, message='创建成功')
return error(message=str(serializer.errors))
def retrieve(self, request, *args, **kwargs):
"""
管理员详情
GET /api/admin/admins/{id}
"""
instance = self.get_object()
serializer = AdminUserSerializer(instance)
return success(data=serializer.data)
def update(self, request, *args, **kwargs):
"""
更新管理员
PUT /api/admin/admins/{id}
"""
instance = self.get_object()
serializer = AdminUserSerializer(instance, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return success(data=serializer.data, message='更新成功')
return error(message=str(serializer.errors))
@action(detail=True, methods=['post'], url_path='toggle-status')
def toggle_status(self, request, pk=None):
"""
启用/禁用管理员
POST /api/admin/admins/{id}/toggle-status
"""
admin = self.get_object()
# 不能禁用自己
if admin.id == request.user.id:
return error(message='不能禁用自己的账户')
admin.is_active = not admin.is_active
admin.save()
return success(
data=AdminUserSerializer(admin).data,
message='已启用' if admin.is_active else '已禁用'
)
@action(detail=True, methods=['post'], url_path='reset-password')
def reset_password(self, request, pk=None):
"""
重置管理员密码
POST /api/admin/admins/{id}/reset-password
"""
admin = self.get_object()
new_password = request.data.get('new_password', '123456')
admin.set_password(new_password)
admin.save()
return success(message=f'密码已重置为: {new_password}')