video-shuoshan/backend/apps/accounts/authentication.py
seaislee1209 be656900c0
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m13s
feat: v0.9.7 登录风控第二期 — IP归属地解析 + 异常检测(R1-R5) + 飞书告警 + 自动封禁
- IP138 在线 API + ip2region 离线库双通道归属地解析,60 秒熔断降级
- 5 条异常检测规则:地区不对/不可能旅行/频繁登录/团队遍地开花/海外IP太杂
- 飞书 interactive 卡片告警(红色严重/橙色警告),含辅助指标
- R2 自动封禁用户、R4 自动封禁团队,封禁即踢下线
- 系统设置页全局配置 + 团队详情页独立阈值覆盖
- 安全日志页面 + 管理员修改密码入口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 00:02:56 +08:00

51 lines
1.8 KiB
Python

"""Custom JWT authentication — validates session_id against ActiveSession table."""
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework_simplejwt.exceptions import InvalidToken
class SessionJWTAuthentication(JWTAuthentication):
"""
Extends JWTAuthentication to check that the session_id in the token
still exists in the ActiveSession table.
Legacy tokens (without session_id) are allowed through for backward compatibility.
"""
def get_user(self, validated_token):
user = super().get_user(validated_token)
# 检查用户是否被封禁
if not user.is_active:
raise InvalidToken({
'detail': '您的账号已被禁用,请联系团队管理员',
'code': 'user_disabled',
})
# 检查团队是否被封禁
if user.team_id:
try:
from .models import Team
team = Team.objects.get(pk=user.team_id)
if not team.is_active:
raise InvalidToken({
'detail': '您所在的团队已被禁用,请联系平台管理员',
'code': 'team_disabled',
})
except Team.DoesNotExist:
pass
session_id = validated_token.get('session_id')
if session_id is None:
# Legacy token without session_id — allow through
return user
from .models import ActiveSession
if not ActiveSession.objects.filter(user=user, session_id=session_id).exists():
raise InvalidToken({
'detail': '您的账号已在其他设备登录',
'code': 'session_expired_other_device',
})
return user