video-shuoshan/backend/utils/anomaly_detector.py
seaislee1209 c53144b2ac feat(notification): 站内通知系统 — Notification 模型 + 4 个 API + Sidebar 铃铛 + 通知中心页
后端 — 新建 app apps.notifications:
- Notification model:type/title/content/link_url/is_read,索引 (recipient, is_read, -created_at)
- 4 个 endpoint:
  - GET    /api/v1/notifications/         (列表 + 总未读数,unread_only/page/page_size)
  - GET    /api/v1/notifications/unread-count  (轻量,前端 60s 轮询用)
  - PATCH  /api/v1/notifications/<id>/read     (标单条已读)
  - POST   /api/v1/notifications/read-all      (一键全部已读)
- 严格守 user 隔离:所有查询都 filter(recipient=request.user)
- INSTALLED_APPS 注册 + urls.py include
- migration 0001_initial 应用成功
- MySQL 严格模式:所有 CharField 加 default=''(memory feedback_mysql_default)

后端 — anomaly_detector 集成:
- _RULE_LABELS / _team_admin_recipients() / _notify_user_disabled() / _notify_team_disabled() helper
- process_anomalies 里 _disable_user/_disable_team 之后调对应 notify
- 接收人 = 同团队的主管+副管(is_team_admin OR is_team_owner)
- 用 bulk_create 一次写多条
- try/except 保护:通知失败不阻断封禁主流程

前端:
- types/index.ts:AppNotification / NotificationListResponse(避开浏览器 Web API Notification 冲突)
- lib/api.ts:notificationApi (list/getUnreadCount/markRead/markAllRead)
- store/notification.ts:Zustand store 乐观更新(markRead 先动 UI 再发请求)
- pages/NotificationsPage.tsx:标题 + 全部标记已读按钮 + 未读蓝点 + 相对时间 + 点击跳 link_url + 分页
- App.tsx:/notifications 路由(ProtectedRoute 不限 role)
- Sidebar.tsx(用户 76px):铃铛 SVG + 红点 + 60s 轮询 + visibilitychange 立即刷新
- AdminLayout.tsx(超管 220px):同步加铃铛(本来 sub-agent 只加了用户侧 sidebar,我补全 admin 侧)

测试:
- 新建 web/test/v0.20.1-smoke.mjs:11 项 — 铃铛/红点/跳页/标题/100dvh/min-height:0/调试折叠/poster
- 11/11 通过 + v2-smoke 25/25 + modal-interaction 8/8 全部基线 OK
- 后端 4 endpoint 用 curl 验过:list / unread-count / PATCH read / POST read-all 都正常

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:32:29 +08:00

433 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""登录异常检测引擎 — R1-R5 规则检测 + 封禁 + 告警冷却。"""
import logging
import threading
from datetime import timedelta
from django.utils import timezone
logger = logging.getLogger(__name__)
def _is_domestic(record) -> bool:
"""国内 IP有国家且为中国。"""
return record.geo_country in ('中国', 'CN', 'China') and record.geo_city != ''
def _is_overseas(record) -> bool:
"""海外 IP有国家且不是中国。"""
return record.geo_country != '' and record.geo_country not in ('中国', 'CN', 'China', '')
def _is_skip(record) -> bool:
"""内网 IP 或归属地解析失败。"""
return record.geo_source in ('skip', 'failed', '')
def _get_config(team, global_config):
"""获取团队级或全局默认的阈值配置。"""
from apps.accounts.models import TeamAnomalyConfig
team_cfg = None
try:
team_cfg = team.anomaly_config
except TeamAnomalyConfig.DoesNotExist:
pass
def _val(team_field, global_field):
if team_cfg:
v = getattr(team_cfg, team_field, None)
if v is not None:
return v
return getattr(global_config, global_field)
return {
'r1_enabled': _val('r1_enabled', 'r1_enabled_default'),
'r2_enabled': _val('r2_enabled', 'r2_enabled_default'),
'r2_window': _val('r2_window_seconds', 'r2_window_seconds'),
'r3_enabled': _val('r3_enabled', 'r3_enabled_default'),
'r3_window': _val('r3_window_seconds', 'r3_window_seconds'),
'r3_max_count': _val('r3_max_count', 'r3_max_count'),
'r4_enabled': _val('r4_enabled', 'r4_enabled_default'),
'r4_window': _val('r4_window_seconds', 'r4_window_seconds'),
'r4_city_count': _val('r4_city_count', 'r4_city_count'),
'r5_enabled': _val('r5_enabled', 'r5_enabled_default'),
'r5_days': _val('r5_days', 'r5_days'),
'r5_country_count': _val('r5_country_count', 'r5_country_count'),
}
def check_login_anomaly(login_record):
"""检测登录异常,返回 [(level, rule, detail), ...]。"""
from apps.accounts.models import LoginRecord
from apps.generation.models import QuotaConfig
user = login_record.user
team = login_record.team
if not team:
return []
# 用户或团队已被封禁 → 跳过检测
if not user.is_active or not team.is_active:
return []
try:
global_config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
return []
if not global_config.anomaly_detection_enabled:
return []
cfg = _get_config(team, global_config)
anomalies = []
is_domestic = _is_domestic(login_record)
is_overseas = _is_overseas(login_record)
is_skip_ip = _is_skip(login_record)
if is_skip_ip:
# 内网 IP 跳过所有规则
return []
# ── R1登录地区不对 ──
if cfg['r1_enabled'] and is_domestic:
expected = team.expected_regions
if expected:
expected_cities = [c.strip() for c in expected.split(',') if c.strip()]
if expected_cities and login_record.geo_city not in expected_cities:
anomalies.append((
'warning', 'region_mismatch',
{
'ip': login_record.ip_address,
'city': login_record.geo_city,
'province': login_record.geo_province,
'expected': expected_cities,
}
))
# ── R2不可能的旅行 ──
if cfg['r2_enabled'] and is_domestic:
window = timezone.now() - timedelta(seconds=cfg['r2_window'])
recent = LoginRecord.objects.filter(
user=user,
created_at__gte=window,
geo_source__in=['online', 'offline'],
).exclude(pk=login_record.pk).exclude(
geo_city=''
).values_list('geo_city', flat=True).distinct()
for prev_city in recent:
if prev_city and login_record.geo_city and prev_city != login_record.geo_city:
# 只在双方都是国内 IP 时比较
anomalies.append((
'critical', 'impossible_travel',
{
'ip': login_record.ip_address,
'current_city': login_record.geo_city,
'previous_city': prev_city,
'window_seconds': cfg['r2_window'],
}
))
break # 只报一次
# ── R3登录太频繁 ──
if cfg['r3_enabled']:
window = timezone.now() - timedelta(seconds=cfg['r3_window'])
count = LoginRecord.objects.filter(
user=user,
created_at__gte=window,
).count()
if count > cfg['r3_max_count']:
anomalies.append((
'warning', 'login_frequency',
{
'ip': login_record.ip_address,
'count': count,
'window_seconds': cfg['r3_window'],
'threshold': cfg['r3_max_count'],
}
))
# ── R4团队遍地开花 ──
if cfg['r4_enabled'] and is_domestic:
expected = team.expected_regions
expected_cities = [c.strip() for c in expected.split(',') if c.strip()] if expected else []
window = timezone.now() - timedelta(seconds=cfg['r4_window'])
team_cities = LoginRecord.objects.filter(
team=team,
created_at__gte=window,
geo_source__in=['online', 'offline'],
).exclude(
geo_city=''
).exclude(
geo_country__in=['', '0']
).filter(
geo_country__in=['中国', 'CN', 'China']
).values_list('geo_city', flat=True).distinct()
unexpected_cities = [c for c in team_cities if c not in expected_cities]
if len(unexpected_cities) >= cfg['r4_city_count']:
anomalies.append((
'critical', 'multi_city',
{
'unexpected_cities': unexpected_cities,
'expected_cities': expected_cities,
'count': len(unexpected_cities),
'threshold': cfg['r4_city_count'],
'window_seconds': cfg['r4_window'],
}
))
# ── R5海外IP太杂 ──
if cfg['r5_enabled'] and is_overseas:
since = timezone.now() - timedelta(days=cfg['r5_days'])
overseas_countries = LoginRecord.objects.filter(
team=team,
created_at__gte=since,
geo_source__in=['online', 'offline'],
).exclude(
geo_country__in=['中国', 'CN', 'China', '', '0']
).values_list('geo_country', flat=True).distinct()
country_list = list(overseas_countries)
if len(country_list) >= cfg['r5_country_count']:
anomalies.append((
'warning', 'overseas_ip_diversity',
{
'countries': country_list,
'count': len(country_list),
'threshold': cfg['r5_country_count'],
'days': cfg['r5_days'],
}
))
return anomalies
def _disable_user(user):
"""封禁用户 — 设 is_active=False + 清除所有会话。"""
from apps.accounts.models import ActiveSession
user.is_active = False
user.disabled_by = 'system'
user.save(update_fields=['is_active', 'disabled_by'])
ActiveSession.objects.filter(user=user).delete()
logger.info('User %s disabled by anomaly detection', user.username)
def _disable_team(team):
"""封禁团队 — 团队 is_active=False + 全员踢下线。"""
from apps.accounts.models import ActiveSession
team.is_active = False
team.disabled_by = 'system'
team.save(update_fields=['is_active', 'disabled_by'])
ActiveSession.objects.filter(user__team=team).delete()
logger.info('Team %s disabled by anomaly detection', team.name)
# ─────────────────────────────────────────────────────────────
# 站内通知:异常封禁后,通知该团队的主管+副管(主管理员/管理员)
# ─────────────────────────────────────────────────────────────
# 规则 label 中文映射 — 用于通知正文里展示触发的规则名,避免给非技术用户看到英文 rule key
_RULE_LABELS = {
'region_mismatch': '地区不匹配',
'impossible_travel': '不可能旅行',
'login_frequency': '登录频次异常',
'multi_city': '多城市登录',
'overseas_ip_diversity': '海外IP多样性',
}
def _team_admin_recipients(team):
"""返回团队的主管+副管(is_team_admin=True OR is_team_owner=True)。
team 为 None 时返回空 list (无人可通知)。
"""
if team is None:
return []
from django.db.models import Q
from django.contrib.auth import get_user_model
User = get_user_model()
return list(
User.objects.filter(
team=team,
).filter(
Q(is_team_admin=True) | Q(is_team_owner=True)
)
)
def _notify_user_disabled(disabled_user, rule, created_at):
"""用户被封禁 → 通知该团队的主管+副管。
所有失败都吞掉(log warning),不能阻断封禁主流程。
"""
try:
from apps.notifications.models import Notification
team = disabled_user.team
if team is None:
# 无团队 → 无人需要通知
return
recipients = _team_admin_recipients(team)
if not recipients:
return
rule_label = _RULE_LABELS.get(rule, rule)
# 时间格式化为本地可读 — settings USE_TZ=False,这里直接 strftime
time_str = created_at.strftime('%Y-%m-%d %H:%M')
title = f'您团队成员 {disabled_user.username} 因登录异常被自动封禁'
content = (
f'{disabled_user.username} ({disabled_user.email}) 在 {time_str} '
f'触发{rule_label}规则,系统已自动封禁该账号。请前往安全日志查看详情。'
)
link_url = '/admin/security'
notifications = [
Notification(
recipient=r,
type='anomaly_disabled_user',
title=title,
content=content,
link_url=link_url,
is_read=False,
)
for r in recipients
]
Notification.objects.bulk_create(notifications)
except Exception as e:
logger.warning('Failed to create user-disabled notifications: %s', e)
def _notify_team_disabled(team, rule, created_at):
"""团队被封禁 → 通知该团队主管+副管。
所有失败都吞掉(log warning),不能阻断封禁主流程。
"""
try:
from apps.notifications.models import Notification
if team is None:
return
recipients = _team_admin_recipients(team)
if not recipients:
return
rule_label = _RULE_LABELS.get(rule, rule)
time_str = created_at.strftime('%Y-%m-%d %H:%M')
title = f'您所在团队 {team.name} 因登录异常被自动封禁'
content = (
f'团队 {team.name}{time_str} 触发{rule_label}规则,'
f'系统已自动封禁整个团队。请前往安全日志查看详情。'
)
link_url = '/admin/security'
notifications = [
Notification(
recipient=r,
type='anomaly_disabled_team',
title=title,
content=content,
link_url=link_url,
is_read=False,
)
for r in recipients
]
Notification.objects.bulk_create(notifications)
except Exception as e:
logger.warning('Failed to create team-disabled notifications: %s', e)
def _is_in_cooldown(team, rule, cooldown_seconds):
"""检查告警冷却:同团队+同规则在冷却窗口内是否已告警。"""
from apps.accounts.models import LoginAnomaly
since = timezone.now() - timedelta(seconds=cooldown_seconds)
return LoginAnomaly.objects.filter(
team=team,
rule=rule,
alerted=True,
created_at__gte=since,
).exists()
def process_anomalies(login_record, anomalies):
"""保存异常记录 + 发告警 + 封禁。"""
from apps.accounts.models import LoginAnomaly
from apps.generation.models import QuotaConfig
if not anomalies:
return
try:
global_config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
return
cooldown = global_config.alert_cooldown_seconds
team = login_record.team
user = login_record.user
for level, rule, detail in anomalies:
# 确定是否需要封禁
auto_disabled = False
disabled_target = ''
if rule == 'impossible_travel':
_disable_user(user)
_notify_user_disabled(user, rule, login_record.created_at)
auto_disabled = True
disabled_target = 'user'
elif rule == 'multi_city':
_disable_team(team)
_notify_team_disabled(team, rule, login_record.created_at)
auto_disabled = True
disabled_target = 'team'
# 检查告警冷却
should_alert = not _is_in_cooldown(team, rule, cooldown)
# 保存异常记录
anomaly = LoginAnomaly.objects.create(
team=team,
user=user,
login_record=login_record,
level=level,
rule=rule,
detail=detail,
alerted=should_alert,
auto_disabled=auto_disabled,
disabled_target=disabled_target,
)
# 异步发送告警(不阻塞登录)
if should_alert:
thread = threading.Thread(
target=_send_alert_safe,
args=(anomaly.pk,),
daemon=True,
)
thread.start()
def _send_alert_safe(anomaly_pk):
"""安全地发送告警,捕获所有异常。"""
try:
from apps.accounts.models import LoginAnomaly
anomaly = LoginAnomaly.objects.select_related('team', 'user', 'login_record').get(pk=anomaly_pk)
from utils.alert_service import send_feishu_alert, send_sms_alert
send_feishu_alert(anomaly)
send_sms_alert(anomaly)
except Exception as e:
logger.error('Failed to send alert for anomaly %s: %s', anomaly_pk, e)