"""Custom JWT authentication — validates session_id against ActiveSession table.""" from rest_framework_simplejwt.authentication import JWTAuthentication from rest_framework_simplejwt.exceptions import InvalidToken class SessionJWTAuthentication(JWTAuthentication): """ Extends JWTAuthentication to check that the session_id in the token still exists in the ActiveSession table. Legacy tokens (without session_id) are allowed through for backward compatibility. """ def get_user(self, validated_token): user = super().get_user(validated_token) session_id = validated_token.get('session_id') if session_id is None: # Legacy token without session_id — allow through return user from .models import ActiveSession if not ActiveSession.objects.filter(user=user, session_id=session_id).exists(): raise InvalidToken({ 'detail': '您的账号已在其他设备登录', 'code': 'session_expired_other_device', }) return user