""" 管理员模块自定义JWT认证 """ from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.authentication import JWTAuthentication from rest_framework_simplejwt.exceptions import InvalidToken, AuthenticationFailed from django.conf import settings def get_admin_tokens(admin_user): """ 为管理员生成JWT Token 在token中添加 user_type='admin' 以区分App用户 """ refresh = RefreshToken.for_user(admin_user) # 添加自定义声明 refresh['user_type'] = 'admin' refresh['username'] = admin_user.username refresh['role'] = admin_user.role return { 'access': str(refresh.access_token), 'refresh': str(refresh), } class AdminJWTAuthentication(JWTAuthentication): """ 管理员专用JWT认证 验证token中的user_type必须为'admin' """ def get_user(self, validated_token): """ 重写get_user方法,从AdminUser模型获取用户 """ from apps.admins.models import AdminUser # 验证user_type user_type = validated_token.get('user_type') if user_type != 'admin': raise AuthenticationFailed('无效的管理员Token') try: user_id = validated_token.get('user_id') user = AdminUser.objects.get(id=user_id) except AdminUser.DoesNotExist: raise AuthenticationFailed('管理员用户不存在') if not user.is_active: raise AuthenticationFailed('管理员账户已被禁用') return user class AppJWTAuthentication(JWTAuthentication): """ App端专用JWT认证 验证token中的user_type必须为'app'或不存在(兼容旧token) """ def get_user(self, validated_token): """ 重写get_user方法,从User模型获取用户 """ from apps.users.models import User # 验证user_type(兼容旧token,默认为app) user_type = validated_token.get('user_type', 'app') if user_type not in ['app', None]: raise AuthenticationFailed('无效的用户Token') try: user_id = validated_token.get('user_id') user = User.objects.get(id=user_id) except User.DoesNotExist: raise AuthenticationFailed('用户不存在') if not user.is_active: raise AuthenticationFailed('用户账户已被禁用') return user