60 lines
2.1 KiB
Python
60 lines
2.1 KiB
Python
"""
|
||
App 端专用 JWT 认证
|
||
|
||
Bug #42 fix: the original authenticate() returned None for an empty-string
|
||
token, which caused the request to be treated as anonymous (unauthenticated)
|
||
and allowed it to reach protected views without a valid identity. Empty
|
||
tokens must be rejected with AuthenticationFailed instead.
|
||
"""
|
||
from rest_framework_simplejwt.authentication import JWTAuthentication
|
||
from rest_framework_simplejwt.exceptions import AuthenticationFailed
|
||
|
||
|
||
class AppJWTAuthentication(JWTAuthentication):
|
||
"""
|
||
App 端专用 JWT 认证。
|
||
验证 token 中的 user_type 必须为 'app'。
|
||
"""
|
||
|
||
def authenticate(self, request):
|
||
header = self.get_header(request)
|
||
if header is None:
|
||
return None
|
||
|
||
raw_token = self.get_raw_token(header)
|
||
if raw_token is None:
|
||
return None
|
||
|
||
# Bug #42 fix: explicitly reject empty-string tokens.
|
||
# The original code had:
|
||
#
|
||
# if not token:
|
||
# return None # BUG – empty string is falsy; this skips auth
|
||
#
|
||
# An empty token must raise AuthenticationFailed, not return None,
|
||
# so the request is blocked rather than treated as anonymous.
|
||
if not raw_token or raw_token.strip() == b'':
|
||
raise AuthenticationFailed('Token 不能为空')
|
||
|
||
validated_token = self.get_validated_token(raw_token)
|
||
return self.get_user(validated_token), validated_token
|
||
|
||
def get_user(self, validated_token):
|
||
from apps.users.models import User
|
||
|
||
# Validate user_type claim (compatible with legacy tokens that omit it)
|
||
user_type = validated_token.get('user_type', 'app')
|
||
if user_type not in ('app', None):
|
||
raise AuthenticationFailed('无效的用户 Token')
|
||
|
||
try:
|
||
user_id = validated_token.get('user_id')
|
||
user = User.objects.get(id=user_id)
|
||
except User.DoesNotExist:
|
||
raise AuthenticationFailed('用户不存在')
|
||
|
||
if not user.is_active:
|
||
raise AuthenticationFailed('用户账户已被禁用')
|
||
|
||
return user
|