"""登录异常检测引擎 — R1-R5 规则检测 + 封禁 + 告警冷却。""" import logging import threading from datetime import timedelta from django.utils import timezone logger = logging.getLogger(__name__) def _is_domestic(record) -> bool: """国内 IP:有国家且为中国。""" return record.geo_country in ('中国', 'CN', 'China') and record.geo_city != '' def _is_overseas(record) -> bool: """海外 IP:有国家且不是中国。""" return record.geo_country != '' and record.geo_country not in ('中国', 'CN', 'China', '') def _is_skip(record) -> bool: """内网 IP 或归属地解析失败。""" return record.geo_source in ('skip', 'failed', '') def _get_config(team, global_config): """获取团队级或全局默认的阈值配置。""" from apps.accounts.models import TeamAnomalyConfig team_cfg = None try: team_cfg = team.anomaly_config except TeamAnomalyConfig.DoesNotExist: pass def _val(team_field, global_field): if team_cfg: v = getattr(team_cfg, team_field, None) if v is not None: return v return getattr(global_config, global_field) return { 'r1_enabled': _val('r1_enabled', 'r1_enabled_default'), 'r2_enabled': _val('r2_enabled', 'r2_enabled_default'), 'r2_window': _val('r2_window_seconds', 'r2_window_seconds'), 'r3_enabled': _val('r3_enabled', 'r3_enabled_default'), 'r3_window': _val('r3_window_seconds', 'r3_window_seconds'), 'r3_max_count': _val('r3_max_count', 'r3_max_count'), 'r4_enabled': _val('r4_enabled', 'r4_enabled_default'), 'r4_window': _val('r4_window_seconds', 'r4_window_seconds'), 'r4_city_count': _val('r4_city_count', 'r4_city_count'), 'r5_enabled': _val('r5_enabled', 'r5_enabled_default'), 'r5_days': _val('r5_days', 'r5_days'), 'r5_country_count': _val('r5_country_count', 'r5_country_count'), } def check_login_anomaly(login_record): """检测登录异常,返回 [(level, rule, detail), ...]。""" from apps.accounts.models import LoginRecord from apps.generation.models import QuotaConfig user = login_record.user team = login_record.team if not team: return [] # 用户或团队已被封禁 → 跳过检测 if not user.is_active or not team.is_active: return [] try: global_config = QuotaConfig.objects.get(pk=1) except QuotaConfig.DoesNotExist: return [] if not global_config.anomaly_detection_enabled: return [] cfg = _get_config(team, global_config) anomalies = [] is_domestic = _is_domestic(login_record) is_overseas = _is_overseas(login_record) is_skip_ip = _is_skip(login_record) if is_skip_ip: # 内网 IP 跳过所有规则 return [] # ── R1:登录地区不对 ── if cfg['r1_enabled'] and is_domestic: expected = team.expected_regions if expected: expected_cities = [c.strip() for c in expected.split(',') if c.strip()] if expected_cities and login_record.geo_city not in expected_cities: anomalies.append(( 'warning', 'region_mismatch', { 'ip': login_record.ip_address, 'city': login_record.geo_city, 'province': login_record.geo_province, 'expected': expected_cities, } )) # ── R2:不可能的旅行 ── if cfg['r2_enabled'] and is_domestic: window = timezone.now() - timedelta(seconds=cfg['r2_window']) recent = LoginRecord.objects.filter( user=user, created_at__gte=window, geo_source__in=['online', 'offline'], ).exclude(pk=login_record.pk).exclude( geo_city='' ).values_list('geo_city', flat=True).distinct() for prev_city in recent: if prev_city and login_record.geo_city and prev_city != login_record.geo_city: # 只在双方都是国内 IP 时比较 anomalies.append(( 'critical', 'impossible_travel', { 'ip': login_record.ip_address, 'current_city': login_record.geo_city, 'previous_city': prev_city, 'window_seconds': cfg['r2_window'], } )) break # 只报一次 # ── R3:登录太频繁 ── if cfg['r3_enabled']: window = timezone.now() - timedelta(seconds=cfg['r3_window']) count = LoginRecord.objects.filter( user=user, created_at__gte=window, ).count() if count > cfg['r3_max_count']: anomalies.append(( 'warning', 'login_frequency', { 'ip': login_record.ip_address, 'count': count, 'window_seconds': cfg['r3_window'], 'threshold': cfg['r3_max_count'], } )) # ── R4:团队遍地开花 ── if cfg['r4_enabled'] and is_domestic: expected = team.expected_regions expected_cities = [c.strip() for c in expected.split(',') if c.strip()] if expected else [] window = timezone.now() - timedelta(seconds=cfg['r4_window']) team_cities = LoginRecord.objects.filter( team=team, created_at__gte=window, geo_source__in=['online', 'offline'], ).exclude( geo_city='' ).exclude( geo_country__in=['', '0'] ).filter( geo_country__in=['中国', 'CN', 'China'] ).values_list('geo_city', flat=True).distinct() unexpected_cities = [c for c in team_cities if c not in expected_cities] if len(unexpected_cities) >= cfg['r4_city_count']: anomalies.append(( 'critical', 'multi_city', { 'unexpected_cities': unexpected_cities, 'expected_cities': expected_cities, 'count': len(unexpected_cities), 'threshold': cfg['r4_city_count'], 'window_seconds': cfg['r4_window'], } )) # ── R5:海外IP太杂 ── if cfg['r5_enabled'] and is_overseas: since = timezone.now() - timedelta(days=cfg['r5_days']) overseas_countries = LoginRecord.objects.filter( team=team, created_at__gte=since, geo_source__in=['online', 'offline'], ).exclude( geo_country__in=['中国', 'CN', 'China', '', '0'] ).values_list('geo_country', flat=True).distinct() country_list = list(overseas_countries) if len(country_list) >= cfg['r5_country_count']: anomalies.append(( 'warning', 'overseas_ip_diversity', { 'countries': country_list, 'count': len(country_list), 'threshold': cfg['r5_country_count'], 'days': cfg['r5_days'], } )) return anomalies def _disable_user(user): """封禁用户 — 设 is_active=False + 清除所有会话。""" from apps.accounts.models import ActiveSession user.is_active = False user.disabled_by = 'system' user.save(update_fields=['is_active', 'disabled_by']) ActiveSession.objects.filter(user=user).delete() logger.info('User %s disabled by anomaly detection', user.username) def _disable_team(team): """封禁团队 — 团队 is_active=False + 全员踢下线。""" from apps.accounts.models import ActiveSession team.is_active = False team.disabled_by = 'system' team.save(update_fields=['is_active', 'disabled_by']) ActiveSession.objects.filter(user__team=team).delete() logger.info('Team %s disabled by anomaly detection', team.name) def _is_in_cooldown(team, rule, cooldown_seconds): """检查告警冷却:同团队+同规则在冷却窗口内是否已告警。""" from apps.accounts.models import LoginAnomaly since = timezone.now() - timedelta(seconds=cooldown_seconds) return LoginAnomaly.objects.filter( team=team, rule=rule, alerted=True, created_at__gte=since, ).exists() def process_anomalies(login_record, anomalies): """保存异常记录 + 发告警 + 封禁。""" from apps.accounts.models import LoginAnomaly from apps.generation.models import QuotaConfig if not anomalies: return try: global_config = QuotaConfig.objects.get(pk=1) except QuotaConfig.DoesNotExist: return cooldown = global_config.alert_cooldown_seconds team = login_record.team user = login_record.user for level, rule, detail in anomalies: # 确定是否需要封禁 auto_disabled = False disabled_target = '' if rule == 'impossible_travel': _disable_user(user) auto_disabled = True disabled_target = 'user' elif rule == 'multi_city': _disable_team(team) auto_disabled = True disabled_target = 'team' # 检查告警冷却 should_alert = not _is_in_cooldown(team, rule, cooldown) # 保存异常记录 anomaly = LoginAnomaly.objects.create( team=team, user=user, login_record=login_record, level=level, rule=rule, detail=detail, alerted=should_alert, auto_disabled=auto_disabled, disabled_target=disabled_target, ) # 异步发送告警(不阻塞登录) if should_alert: thread = threading.Thread( target=_send_alert_safe, args=(anomaly.pk,), daemon=True, ) thread.start() def _send_alert_safe(anomaly_pk): """安全地发送告警,捕获所有异常。""" try: from apps.accounts.models import LoginAnomaly anomaly = LoginAnomaly.objects.select_related('team', 'user', 'login_record').get(pk=anomaly_pk) from utils.alert_service import send_feishu_alert, send_sms_alert send_feishu_alert(anomaly) send_sms_alert(anomaly) except Exception as e: logger.error('Failed to send alert for anomaly %s: %s', anomaly_pk, e)