""" 管理员模块视图 """ from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.permissions import AllowAny from rest_framework_simplejwt.tokens import RefreshToken from drf_spectacular.utils import extend_schema from utils.response import success, error from utils.exceptions import ErrorCode from .models import AdminUser from .serializers import ( AdminUserSerializer, AdminLoginSerializer, AdminChangePasswordSerializer, CreateAdminUserSerializer ) from .authentication import get_admin_tokens, AdminJWTAuthentication from .permissions import IsAdminUser, IsSuperAdmin @extend_schema(tags=['管理员-认证']) class AdminAuthViewSet(viewsets.ViewSet): """管理员认证视图集""" permission_classes = [AllowAny] @action(detail=False, methods=['post']) def login(self, request): """ 管理员登录 POST /api/admin/auth/login """ serializer = AdminLoginSerializer(data=request.data) if not serializer.is_valid(): return error(message=str(serializer.errors)) username = serializer.validated_data['username'] password = serializer.validated_data['password'] try: admin = AdminUser.objects.get(username=username) except AdminUser.DoesNotExist: return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误') if not admin.check_password(password): return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误') if not admin.is_active: return error(code=ErrorCode.USER_DISABLED, message='账户已被禁用') # 记录登录IP x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') admin.last_login_ip = ip admin.save(update_fields=['last_login_ip']) # 生成Token tokens = get_admin_tokens(admin) return success(data={ 'token': tokens, 'admin': AdminUserSerializer(admin).data }, message='登录成功') @action(detail=False, methods=['post']) def refresh(self, request): """ 刷新Token POST /api/admin/auth/refresh """ refresh_token = request.data.get('refresh') if not refresh_token: return error(message='refresh token不能为空') try: refresh = RefreshToken(refresh_token) # 验证是否为admin token if refresh.get('user_type') != 'admin': return error(code=ErrorCode.TOKEN_EXPIRED, message='无效的管理员Token') return success(data={ 'access': str(refresh.access_token), 'refresh': str(refresh) }) except Exception as e: return error(code=ErrorCode.TOKEN_EXPIRED, message='Token已过期或无效') @extend_schema(tags=['管理员-认证']) class AdminProfileViewSet(viewsets.ViewSet): """管理员个人信息视图集""" authentication_classes = [AdminJWTAuthentication] permission_classes = [IsAdminUser] @action(detail=False, methods=['get'], url_path='me') def me(self, request): """ 获取当前管理员信息 GET /api/admin/profile/me """ serializer = AdminUserSerializer(request.user) return success(data=serializer.data) @action(detail=False, methods=['put'], url_path='change-password') def change_password(self, request): """ 修改密码 PUT /api/admin/profile/change-password """ serializer = AdminChangePasswordSerializer(data=request.data) if not serializer.is_valid(): return error(message=str(serializer.errors)) if not request.user.check_password(serializer.validated_data['old_password']): return error(message='原密码错误') request.user.set_password(serializer.validated_data['new_password']) request.user.save() return success(message='密码修改成功') @extend_schema(tags=['管理员-账号管理']) class AdminUserManageViewSet(viewsets.ModelViewSet): """管理员用户管理视图集(仅超级管理员可用)""" queryset = AdminUser.objects.all().order_by('-created_at') authentication_classes = [AdminJWTAuthentication] permission_classes = [IsSuperAdmin] def get_serializer_class(self): if self.action == 'create': return CreateAdminUserSerializer return AdminUserSerializer def list(self, request, *args, **kwargs): """ 管理员列表 GET /api/admin/admins """ queryset = self.filter_queryset(self.get_queryset()) # 搜索 username = request.query_params.get('username') role = request.query_params.get('role') if username: queryset = queryset.filter(username__contains=username) if role: queryset = queryset.filter(role=role) page = self.paginate_queryset(queryset) if page is not None: serializer = AdminUserSerializer(page, many=True) return success(data={ 'total': self.paginator.page.paginator.count, 'items': serializer.data }) serializer = AdminUserSerializer(queryset, many=True) return success(data={'items': serializer.data}) def create(self, request, *args, **kwargs): """ 创建管理员 POST /api/admin/admins """ serializer = CreateAdminUserSerializer(data=request.data) if serializer.is_valid(): admin = serializer.save() return success(data=AdminUserSerializer(admin).data, message='创建成功') return error(message=str(serializer.errors)) def retrieve(self, request, *args, **kwargs): """ 管理员详情 GET /api/admin/admins/{id} """ instance = self.get_object() serializer = AdminUserSerializer(instance) return success(data=serializer.data) def update(self, request, *args, **kwargs): """ 更新管理员 PUT /api/admin/admins/{id} """ instance = self.get_object() serializer = AdminUserSerializer(instance, data=request.data, partial=True) if serializer.is_valid(): serializer.save() return success(data=serializer.data, message='更新成功') return error(message=str(serializer.errors)) @action(detail=True, methods=['post'], url_path='toggle-status') def toggle_status(self, request, pk=None): """ 启用/禁用管理员 POST /api/admin/admins/{id}/toggle-status """ admin = self.get_object() # 不能禁用自己 if admin.id == request.user.id: return error(message='不能禁用自己的账户') admin.is_active = not admin.is_active admin.save() return success( data=AdminUserSerializer(admin).data, message='已启用' if admin.is_active else '已禁用' ) @action(detail=True, methods=['post'], url_path='reset-password') def reset_password(self, request, pk=None): """ 重置管理员密码 POST /api/admin/admins/{id}/reset-password """ admin = self.get_object() new_password = request.data.get('new_password', '123456') admin.set_password(new_password) admin.save() return success(message=f'密码已重置为: {new_password}')