video-shuoshan/backend/utils/alert_service.py
seaislee1209 be656900c0
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m13s
feat: v0.9.7 登录风控第二期 — IP归属地解析 + 异常检测(R1-R5) + 飞书告警 + 自动封禁
- IP138 在线 API + ip2region 离线库双通道归属地解析,60 秒熔断降级
- 5 条异常检测规则:地区不对/不可能旅行/频繁登录/团队遍地开花/海外IP太杂
- 飞书 interactive 卡片告警(红色严重/橙色警告),含辅助指标
- R2 自动封禁用户、R4 自动封禁团队,封禁即踢下线
- 系统设置页全局配置 + 团队详情页独立阈值覆盖
- 安全日志页面 + 管理员修改密码入口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 00:02:56 +08:00

268 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""告警服务 — 飞书 interactive 卡片私信 + 辅助指标。"""
import json
import logging
from datetime import timedelta
import requests
from django.utils import timezone
logger = logging.getLogger(__name__)
# 小毛球机器人
FEISHU_APP_ID = 'cli_a90478156bf85bd7'
FEISHU_APP_SECRET = '87N2nnx6Yv56TPjl2GraLdKOjFiGOSGp'
_RULE_NAMES = {
'region_mismatch': '登录地区不对 (R1)',
'impossible_travel': '不可能的旅行 (R2)',
'login_frequency': '登录太频繁 (R3)',
'multi_city': '团队遍地开花 (R4)',
'overseas_ip_diversity': '海外IP太杂 (R5)',
}
_LEVEL_COLORS = {
'warning': 'orange',
'critical': 'red',
}
_LEVEL_LABELS = {
'warning': '⚠️ 警告',
'critical': '🚨 严重',
}
def _get_tenant_access_token():
"""获取飞书 tenant_access_token。"""
import os
app_secret = os.environ.get('FEISHU_APP_SECRET', FEISHU_APP_SECRET)
if not app_secret:
raise RuntimeError('FEISHU_APP_SECRET not configured')
resp = requests.post(
'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
json={'app_id': FEISHU_APP_ID, 'app_secret': app_secret},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
raise RuntimeError(f'Feishu token error: {data}')
return data['tenant_access_token']
def _get_open_id_by_mobile(token, mobile):
"""通过手机号查询飞书 open_id。"""
resp = requests.post(
'https://open.feishu.cn/open-apis/contact/v3/users/batch_get_id',
headers={'Authorization': f'Bearer {token}'},
json={'mobiles': [mobile]},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
raise RuntimeError(f'Feishu user lookup error: {data}')
user_list = data.get('data', {}).get('user_list', [])
if user_list and user_list[0].get('user_id'):
return user_list[0]['user_id']
return None
def _compute_auxiliary_metrics(team):
"""计算辅助指标:最近 7 天并发踢出次数 + 非工作时间登录占比。"""
from apps.accounts.models import LoginRecord
since = timezone.now() - timedelta(days=7)
# 并发踢出次数ActiveSession 被删除的次数无法直接统计,
# 用 LoginAnomaly 中 rule=impossible_travel 的次数近似
from apps.accounts.models import LoginAnomaly
kick_count = LoginAnomaly.objects.filter(
team=team,
auto_disabled=True,
created_at__gte=since,
).count()
# 非工作时间登录占比 (22:00-08:00)
total_logins = LoginRecord.objects.filter(
team=team,
created_at__gte=since,
).count()
if total_logins > 0:
night_logins = 0
for record in LoginRecord.objects.filter(team=team, created_at__gte=since).only('created_at'):
hour = record.created_at.hour
if hour >= 22 or hour < 8:
night_logins += 1
night_ratio = round(night_logins / total_logins * 100, 1)
else:
night_ratio = 0
return kick_count, night_ratio
def _build_card(anomaly):
"""构建飞书 interactive 卡片。"""
team = anomaly.team
user = anomaly.user
record = anomaly.login_record
level = anomaly.level
rule = anomaly.rule
detail = anomaly.detail
color = _LEVEL_COLORS.get(level, 'blue')
level_label = _LEVEL_LABELS.get(level, level)
rule_name = _RULE_NAMES.get(rule, rule)
kick_count, night_ratio = _compute_auxiliary_metrics(team)
# 基本信息行
info_lines = [
f'**团队:** {team.name}',
f'**用户:** {user.username}',
f'**IP** {record.ip_address}',
f'**归属地:** {record.geo_country} {record.geo_province} {record.geo_city}',
f'**规则:** {rule_name}',
]
# 根据规则添加详情
if rule == 'region_mismatch':
info_lines.append(f'**预期城市:** {", ".join(detail.get("expected", []))}')
info_lines.append(f'**实际城市:** {detail.get("city", "")}')
elif rule == 'impossible_travel':
info_lines.append(f'**当前城市:** {detail.get("current_city", "")}')
info_lines.append(f'**之前城市:** {detail.get("previous_city", "")}')
elif rule == 'login_frequency':
info_lines.append(f'**登录次数:** {detail.get("count", 0)} 次 / {detail.get("window_seconds", 0)}s')
elif rule == 'multi_city':
info_lines.append(f'**预期外城市:** {", ".join(detail.get("unexpected_cities", []))}')
elif rule == 'overseas_ip_diversity':
info_lines.append(f'**海外国家:** {", ".join(detail.get("countries", []))}')
# 自动封禁标注
if anomaly.auto_disabled:
target_label = '该用户' if anomaly.disabled_target == 'user' else '整个团队'
info_lines.append(f'\n🔒 **已自动封禁{target_label}**')
# 辅助指标
info_lines.append(f'\n---\n📊 **辅助指标近7天**')
info_lines.append(f'并发踢出次数:{kick_count}')
info_lines.append(f'非工作时间登录占比:{night_ratio}%')
card = {
'config': {'wide_screen_mode': True},
'header': {
'title': {'tag': 'plain_text', 'content': f'{level_label} {rule_name}'},
'template': color,
},
'elements': [
{
'tag': 'div',
'text': {
'tag': 'lark_md',
'content': '\n'.join(info_lines),
},
},
],
}
return card
def send_feishu_alert(anomaly):
"""发送飞书告警卡片到配置的接收人。"""
from apps.generation.models import QuotaConfig
try:
config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
logger.warning('QuotaConfig not found, skip alert')
return
mobiles_str = config.feishu_alert_mobiles
if not mobiles_str:
logger.info('No feishu alert mobiles configured, skip alert')
return
mobiles = [m.strip() for m in mobiles_str.split(',') if m.strip()]
if not mobiles:
return
try:
token = _get_tenant_access_token()
except Exception as e:
logger.error('Failed to get feishu token: %s', e)
return
card = _build_card(anomaly)
for mobile in mobiles:
try:
open_id = _get_open_id_by_mobile(token, mobile)
if not open_id:
logger.warning('No feishu user found for mobile %s', mobile)
continue
resp = requests.post(
'https://open.feishu.cn/open-apis/im/v1/messages',
headers={'Authorization': f'Bearer {token}'},
params={'receive_id_type': 'open_id'},
json={
'receive_id': open_id,
'msg_type': 'interactive',
'content': json.dumps(card, ensure_ascii=False),
},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
logger.error('Feishu send failed to %s: %s', mobile, data)
else:
logger.info('Feishu alert sent to %s for rule %s', mobile, anomaly.rule)
except Exception as e:
logger.error('Feishu alert error for %s: %s', mobile, e)
def send_feishu_test(mobile):
"""发送测试消息到指定手机号。Returns (success, message)。"""
try:
token = _get_tenant_access_token()
open_id = _get_open_id_by_mobile(token, mobile)
if not open_id:
return False, f'未找到手机号 {mobile} 对应的飞书用户'
card = {
'config': {'wide_screen_mode': True},
'header': {
'title': {'tag': 'plain_text', 'content': '🔔 AirDrama 告警测试'},
'template': 'blue',
},
'elements': [
{
'tag': 'div',
'text': {
'tag': 'lark_md',
'content': '这是一条测试消息,说明飞书告警通道配置正常。',
},
},
],
}
import json
resp = requests.post(
'https://open.feishu.cn/open-apis/im/v1/messages',
headers={'Authorization': f'Bearer {token}'},
params={'receive_id_type': 'open_id'},
json={
'receive_id': open_id,
'msg_type': 'interactive',
'content': json.dumps(card, ensure_ascii=False),
},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
return False, f'发送失败: {data.get("msg", "")}'
return True, '测试消息已发送'
except Exception as e:
return False, str(e)