video-shuoshan/backend/utils/anomaly_detector.py
zyc e04712cc79
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m10s
feat: 接入阿里云短信告警通知(dysmsapi)
异常检测触发时,在飞书告警基础上同时发送短信通知。
签名:广州气元科技,模板:SMS_503445109。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 17:23:36 +08:00

313 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""登录异常检测引擎 — R1-R5 规则检测 + 封禁 + 告警冷却。"""
import logging
import threading
from datetime import timedelta
from django.utils import timezone
logger = logging.getLogger(__name__)
def _is_domestic(record) -> bool:
"""国内 IP有国家且为中国。"""
return record.geo_country in ('中国', 'CN', 'China') and record.geo_city != ''
def _is_overseas(record) -> bool:
"""海外 IP有国家且不是中国。"""
return record.geo_country != '' and record.geo_country not in ('中国', 'CN', 'China', '')
def _is_skip(record) -> bool:
"""内网 IP 或归属地解析失败。"""
return record.geo_source in ('skip', 'failed', '')
def _get_config(team, global_config):
"""获取团队级或全局默认的阈值配置。"""
from apps.accounts.models import TeamAnomalyConfig
team_cfg = None
try:
team_cfg = team.anomaly_config
except TeamAnomalyConfig.DoesNotExist:
pass
def _val(team_field, global_field):
if team_cfg:
v = getattr(team_cfg, team_field, None)
if v is not None:
return v
return getattr(global_config, global_field)
return {
'r1_enabled': _val('r1_enabled', 'r1_enabled_default'),
'r2_enabled': _val('r2_enabled', 'r2_enabled_default'),
'r2_window': _val('r2_window_seconds', 'r2_window_seconds'),
'r3_enabled': _val('r3_enabled', 'r3_enabled_default'),
'r3_window': _val('r3_window_seconds', 'r3_window_seconds'),
'r3_max_count': _val('r3_max_count', 'r3_max_count'),
'r4_enabled': _val('r4_enabled', 'r4_enabled_default'),
'r4_window': _val('r4_window_seconds', 'r4_window_seconds'),
'r4_city_count': _val('r4_city_count', 'r4_city_count'),
'r5_enabled': _val('r5_enabled', 'r5_enabled_default'),
'r5_days': _val('r5_days', 'r5_days'),
'r5_country_count': _val('r5_country_count', 'r5_country_count'),
}
def check_login_anomaly(login_record):
"""检测登录异常,返回 [(level, rule, detail), ...]。"""
from apps.accounts.models import LoginRecord
from apps.generation.models import QuotaConfig
user = login_record.user
team = login_record.team
if not team:
return []
# 用户或团队已被封禁 → 跳过检测
if not user.is_active or not team.is_active:
return []
try:
global_config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
return []
if not global_config.anomaly_detection_enabled:
return []
cfg = _get_config(team, global_config)
anomalies = []
is_domestic = _is_domestic(login_record)
is_overseas = _is_overseas(login_record)
is_skip_ip = _is_skip(login_record)
if is_skip_ip:
# 内网 IP 跳过所有规则
return []
# ── R1登录地区不对 ──
if cfg['r1_enabled'] and is_domestic:
expected = team.expected_regions
if expected:
expected_cities = [c.strip() for c in expected.split(',') if c.strip()]
if expected_cities and login_record.geo_city not in expected_cities:
anomalies.append((
'warning', 'region_mismatch',
{
'ip': login_record.ip_address,
'city': login_record.geo_city,
'province': login_record.geo_province,
'expected': expected_cities,
}
))
# ── R2不可能的旅行 ──
if cfg['r2_enabled'] and is_domestic:
window = timezone.now() - timedelta(seconds=cfg['r2_window'])
recent = LoginRecord.objects.filter(
user=user,
created_at__gte=window,
geo_source__in=['online', 'offline'],
).exclude(pk=login_record.pk).exclude(
geo_city=''
).values_list('geo_city', flat=True).distinct()
for prev_city in recent:
if prev_city and login_record.geo_city and prev_city != login_record.geo_city:
# 只在双方都是国内 IP 时比较
anomalies.append((
'critical', 'impossible_travel',
{
'ip': login_record.ip_address,
'current_city': login_record.geo_city,
'previous_city': prev_city,
'window_seconds': cfg['r2_window'],
}
))
break # 只报一次
# ── R3登录太频繁 ──
if cfg['r3_enabled']:
window = timezone.now() - timedelta(seconds=cfg['r3_window'])
count = LoginRecord.objects.filter(
user=user,
created_at__gte=window,
).count()
if count > cfg['r3_max_count']:
anomalies.append((
'warning', 'login_frequency',
{
'ip': login_record.ip_address,
'count': count,
'window_seconds': cfg['r3_window'],
'threshold': cfg['r3_max_count'],
}
))
# ── R4团队遍地开花 ──
if cfg['r4_enabled'] and is_domestic:
expected = team.expected_regions
expected_cities = [c.strip() for c in expected.split(',') if c.strip()] if expected else []
window = timezone.now() - timedelta(seconds=cfg['r4_window'])
team_cities = LoginRecord.objects.filter(
team=team,
created_at__gte=window,
geo_source__in=['online', 'offline'],
).exclude(
geo_city=''
).exclude(
geo_country__in=['', '0']
).filter(
geo_country__in=['中国', 'CN', 'China']
).values_list('geo_city', flat=True).distinct()
unexpected_cities = [c for c in team_cities if c not in expected_cities]
if len(unexpected_cities) >= cfg['r4_city_count']:
anomalies.append((
'critical', 'multi_city',
{
'unexpected_cities': unexpected_cities,
'expected_cities': expected_cities,
'count': len(unexpected_cities),
'threshold': cfg['r4_city_count'],
'window_seconds': cfg['r4_window'],
}
))
# ── R5海外IP太杂 ──
if cfg['r5_enabled'] and is_overseas:
since = timezone.now() - timedelta(days=cfg['r5_days'])
overseas_countries = LoginRecord.objects.filter(
team=team,
created_at__gte=since,
geo_source__in=['online', 'offline'],
).exclude(
geo_country__in=['中国', 'CN', 'China', '', '0']
).values_list('geo_country', flat=True).distinct()
country_list = list(overseas_countries)
if len(country_list) >= cfg['r5_country_count']:
anomalies.append((
'warning', 'overseas_ip_diversity',
{
'countries': country_list,
'count': len(country_list),
'threshold': cfg['r5_country_count'],
'days': cfg['r5_days'],
}
))
return anomalies
def _disable_user(user):
"""封禁用户 — 设 is_active=False + 清除所有会话。"""
from apps.accounts.models import ActiveSession
user.is_active = False
user.disabled_by = 'system'
user.save(update_fields=['is_active', 'disabled_by'])
ActiveSession.objects.filter(user=user).delete()
logger.info('User %s disabled by anomaly detection', user.username)
def _disable_team(team):
"""封禁团队 — 团队 is_active=False + 全员踢下线。"""
from apps.accounts.models import ActiveSession
team.is_active = False
team.disabled_by = 'system'
team.save(update_fields=['is_active', 'disabled_by'])
ActiveSession.objects.filter(user__team=team).delete()
logger.info('Team %s disabled by anomaly detection', team.name)
def _is_in_cooldown(team, rule, cooldown_seconds):
"""检查告警冷却:同团队+同规则在冷却窗口内是否已告警。"""
from apps.accounts.models import LoginAnomaly
since = timezone.now() - timedelta(seconds=cooldown_seconds)
return LoginAnomaly.objects.filter(
team=team,
rule=rule,
alerted=True,
created_at__gte=since,
).exists()
def process_anomalies(login_record, anomalies):
"""保存异常记录 + 发告警 + 封禁。"""
from apps.accounts.models import LoginAnomaly
from apps.generation.models import QuotaConfig
if not anomalies:
return
try:
global_config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
return
cooldown = global_config.alert_cooldown_seconds
team = login_record.team
user = login_record.user
for level, rule, detail in anomalies:
# 确定是否需要封禁
auto_disabled = False
disabled_target = ''
if rule == 'impossible_travel':
_disable_user(user)
auto_disabled = True
disabled_target = 'user'
elif rule == 'multi_city':
_disable_team(team)
auto_disabled = True
disabled_target = 'team'
# 检查告警冷却
should_alert = not _is_in_cooldown(team, rule, cooldown)
# 保存异常记录
anomaly = LoginAnomaly.objects.create(
team=team,
user=user,
login_record=login_record,
level=level,
rule=rule,
detail=detail,
alerted=should_alert,
auto_disabled=auto_disabled,
disabled_target=disabled_target,
)
# 异步发送告警(不阻塞登录)
if should_alert:
thread = threading.Thread(
target=_send_alert_safe,
args=(anomaly.pk,),
daemon=True,
)
thread.start()
def _send_alert_safe(anomaly_pk):
"""安全地发送告警,捕获所有异常。"""
try:
from apps.accounts.models import LoginAnomaly
anomaly = LoginAnomaly.objects.select_related('team', 'user', 'login_record').get(pk=anomaly_pk)
from utils.alert_service import send_feishu_alert, send_sms_alert
send_feishu_alert(anomaly)
send_sms_alert(anomaly)
except Exception as e:
logger.error('Failed to send alert for anomaly %s: %s', anomaly_pk, e)