video-shuoshan/backend/utils/alert_service.py
seaislee1209 afcff9455f feat: v0.12.1 安全加固补充 + 短信测试按钮
①Refresh Token 轮换(ROTATE_REFRESH_TOKENS + BLACKLIST_AFTER_ROTATION)
②前端 token 刷新时保存新 refresh token(auth store + axios 拦截器)
③短信告警测试按钮(/admin/test-sms + 系统设置页按钮)
④安全审查完成:S2 git 历史无泄露、S4 无攻击面、S7 nginx 已配、S10 全接口有权限

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 19:38:42 +08:00

428 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""告警服务 — 飞书 interactive 卡片私信 + 辅助指标。"""
import json
import logging
from datetime import timedelta
import requests
from django.utils import timezone
logger = logging.getLogger(__name__)
# 小毛球机器人
FEISHU_APP_ID = 'cli_a90478156bf85bd7'
FEISHU_APP_SECRET = '87N2nnx6Yv56TPjl2GraLdKOjFiGOSGp'
_RULE_NAMES = {
'region_mismatch': '登录地区不对 (R1)',
'impossible_travel': '不可能的旅行 (R2)',
'login_frequency': '登录太频繁 (R3)',
'multi_city': '团队遍地开花 (R4)',
'overseas_ip_diversity': '海外IP太杂 (R5)',
}
_LEVEL_COLORS = {
'warning': 'orange',
'critical': 'red',
}
_LEVEL_LABELS = {
'warning': '⚠️ 警告',
'critical': '🚨 严重',
}
def _get_tenant_access_token():
"""获取飞书 tenant_access_token。"""
import os
app_secret = os.environ.get('FEISHU_APP_SECRET', FEISHU_APP_SECRET)
if not app_secret:
raise RuntimeError('FEISHU_APP_SECRET not configured')
resp = requests.post(
'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
json={'app_id': FEISHU_APP_ID, 'app_secret': app_secret},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
raise RuntimeError(f'Feishu token error: {data}')
return data['tenant_access_token']
def _get_open_id_by_mobile(token, mobile):
"""通过手机号查询飞书 open_id。"""
resp = requests.post(
'https://open.feishu.cn/open-apis/contact/v3/users/batch_get_id',
headers={'Authorization': f'Bearer {token}'},
json={'mobiles': [mobile]},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
raise RuntimeError(f'Feishu user lookup error: {data}')
user_list = data.get('data', {}).get('user_list', [])
if user_list and user_list[0].get('user_id'):
return user_list[0]['user_id']
return None
def _compute_auxiliary_metrics(team):
"""计算辅助指标:最近 7 天并发踢出次数 + 非工作时间登录占比。"""
from apps.accounts.models import LoginRecord
since = timezone.now() - timedelta(days=7)
# 并发踢出次数ActiveSession 被删除的次数无法直接统计,
# 用 LoginAnomaly 中 rule=impossible_travel 的次数近似
from apps.accounts.models import LoginAnomaly
kick_count = LoginAnomaly.objects.filter(
team=team,
auto_disabled=True,
created_at__gte=since,
).count()
# 非工作时间登录占比 (22:00-08:00)
total_logins = LoginRecord.objects.filter(
team=team,
created_at__gte=since,
).count()
if total_logins > 0:
night_logins = 0
for record in LoginRecord.objects.filter(team=team, created_at__gte=since).only('created_at'):
hour = record.created_at.hour
if hour >= 22 or hour < 8:
night_logins += 1
night_ratio = round(night_logins / total_logins * 100, 1)
else:
night_ratio = 0
return kick_count, night_ratio
def _build_card(anomaly):
"""构建飞书 interactive 卡片。"""
team = anomaly.team
user = anomaly.user
record = anomaly.login_record
level = anomaly.level
rule = anomaly.rule
detail = anomaly.detail
color = _LEVEL_COLORS.get(level, 'blue')
level_label = _LEVEL_LABELS.get(level, level)
rule_name = _RULE_NAMES.get(rule, rule)
kick_count, night_ratio = _compute_auxiliary_metrics(team)
# 基本信息行
info_lines = [
f'**团队:** {team.name}',
f'**用户:** {user.username}',
f'**IP** {record.ip_address}',
f'**归属地:** {record.geo_country} {record.geo_province} {record.geo_city}',
f'**规则:** {rule_name}',
]
# 根据规则添加详情
if rule == 'region_mismatch':
info_lines.append(f'**预期城市:** {", ".join(detail.get("expected", []))}')
info_lines.append(f'**实际城市:** {detail.get("city", "")}')
elif rule == 'impossible_travel':
info_lines.append(f'**当前城市:** {detail.get("current_city", "")}')
info_lines.append(f'**之前城市:** {detail.get("previous_city", "")}')
elif rule == 'login_frequency':
info_lines.append(f'**登录次数:** {detail.get("count", 0)} 次 / {detail.get("window_seconds", 0)}s')
elif rule == 'multi_city':
info_lines.append(f'**预期外城市:** {", ".join(detail.get("unexpected_cities", []))}')
elif rule == 'overseas_ip_diversity':
info_lines.append(f'**海外国家:** {", ".join(detail.get("countries", []))}')
# 自动封禁标注
if anomaly.auto_disabled:
target_label = '该用户' if anomaly.disabled_target == 'user' else '整个团队'
info_lines.append(f'\n🔒 **已自动封禁{target_label}**')
# 辅助指标
info_lines.append(f'\n---\n📊 **辅助指标近7天**')
info_lines.append(f'并发踢出次数:{kick_count}')
info_lines.append(f'非工作时间登录占比:{night_ratio}%')
card = {
'config': {'wide_screen_mode': True},
'header': {
'title': {'tag': 'plain_text', 'content': f'{level_label} {rule_name}'},
'template': color,
},
'elements': [
{
'tag': 'div',
'text': {
'tag': 'lark_md',
'content': '\n'.join(info_lines),
},
},
],
}
return card
def send_feishu_alert(anomaly):
"""发送飞书告警卡片到配置的接收人。"""
from apps.generation.models import QuotaConfig
try:
config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
logger.warning('QuotaConfig not found, skip alert')
return
mobiles_str = config.feishu_alert_mobiles
if not mobiles_str:
logger.info('No feishu alert mobiles configured, skip alert')
return
mobiles = [m.strip() for m in mobiles_str.split(',') if m.strip()]
if not mobiles:
return
try:
token = _get_tenant_access_token()
except Exception as e:
logger.error('Failed to get feishu token: %s', e)
return
card = _build_card(anomaly)
for mobile in mobiles:
try:
open_id = _get_open_id_by_mobile(token, mobile)
if not open_id:
logger.warning('No feishu user found for mobile %s', mobile)
continue
resp = requests.post(
'https://open.feishu.cn/open-apis/im/v1/messages',
headers={'Authorization': f'Bearer {token}'},
params={'receive_id_type': 'open_id'},
json={
'receive_id': open_id,
'msg_type': 'interactive',
'content': json.dumps(card, ensure_ascii=False),
},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
logger.error('Feishu send failed to %s: %s', mobile, data)
else:
logger.info('Feishu alert sent to %s for rule %s', mobile, anomaly.rule)
except Exception as e:
logger.error('Feishu alert error for %s: %s', mobile, e)
def send_sms_alert(anomaly):
"""发送短信告警到配置的接收人。"""
from apps.generation.models import QuotaConfig
from django.conf import settings as django_settings
try:
config = QuotaConfig.objects.get(pk=1)
except QuotaConfig.DoesNotExist:
logger.warning('QuotaConfig not found, skip SMS alert')
return
mobiles = [m.strip() for m in config.sms_alert_mobiles.split(',') if m.strip()]
if not mobiles:
return
access_key = django_settings.ALIYUN_SMS_ACCESS_KEY
access_secret = django_settings.ALIYUN_SMS_ACCESS_SECRET
sign_name = django_settings.ALIYUN_SMS_SIGN_NAME
template_code = django_settings.ALIYUN_SMS_TEMPLATE_CODE
if not all([access_key, access_secret, template_code]):
logger.warning('Aliyun SMS credentials not configured, skip SMS alert')
return
rule_name = _RULE_NAMES.get(anomaly.rule, anomaly.rule)
auto_action = '已自动封禁' if anomaly.auto_disabled else '仅告警'
template_param = json.dumps({
'team_name': anomaly.team.name[:20],
'rule_name': rule_name[:20],
'username': anomaly.user.username[:20],
'city': anomaly.login_record.geo_city or '未知',
'auto_action': auto_action,
}, ensure_ascii=False)
# 使用阿里云 SMS HTTP API
import hashlib
import hmac
import base64
import urllib.parse
import uuid
from datetime import datetime
def _percent_encode(s):
return urllib.parse.quote(s, safe='', encoding='utf-8')
for mobile in mobiles:
try:
params = {
'AccessKeyId': access_key,
'Action': 'SendSms',
'Format': 'JSON',
'PhoneNumbers': mobile,
'RegionId': 'cn-hangzhou',
'SignName': sign_name,
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': str(uuid.uuid4()),
'SignatureVersion': '1.0',
'TemplateCode': template_code,
'TemplateParam': template_param,
'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'Version': '2017-05-25',
}
sorted_params = sorted(params.items())
query_string = '&'.join(f'{_percent_encode(k)}={_percent_encode(v)}' for k, v in sorted_params)
string_to_sign = f'GET&{_percent_encode("/")}&{_percent_encode(query_string)}'
sign_key = (access_secret + '&').encode('utf-8')
signature = base64.b64encode(
hmac.new(sign_key, string_to_sign.encode('utf-8'), hashlib.sha1).digest()
).decode('utf-8')
params['Signature'] = signature
resp = requests.get(
'https://dysmsapi.aliyuncs.com/',
params=params,
timeout=10,
)
data = resp.json()
if data.get('Code') == 'OK':
logger.info('SMS alert sent to %s for rule %s', mobile, anomaly.rule)
else:
logger.error('SMS send failed to %s: %s', mobile, data)
except Exception as e:
logger.error('SMS alert error for %s: %s', mobile, e)
def send_sms_test(mobile):
"""发送短信测试到指定手机号。Returns (success, message)。"""
from django.conf import settings as django_settings
access_key = django_settings.ALIYUN_SMS_ACCESS_KEY
access_secret = django_settings.ALIYUN_SMS_ACCESS_SECRET
sign_name = django_settings.ALIYUN_SMS_SIGN_NAME
template_code = django_settings.ALIYUN_SMS_TEMPLATE_CODE
if not all([access_key, access_secret, template_code]):
return False, '阿里云短信密钥未配置ALIYUN_SMS_ACCESS_KEY / ALIYUN_SMS_ACCESS_SECRET'
template_param = json.dumps({
'team_name': '测试团队',
'rule_name': '告警测试',
'username': '测试用户',
'city': '测试城市',
'auto_action': '仅测试',
}, ensure_ascii=False)
import hashlib
import hmac
import base64
import urllib.parse
import uuid
from datetime import datetime
def _percent_encode(s):
return urllib.parse.quote(s, safe='', encoding='utf-8')
try:
params = {
'AccessKeyId': access_key,
'Action': 'SendSms',
'Format': 'JSON',
'PhoneNumbers': mobile,
'RegionId': 'cn-hangzhou',
'SignName': sign_name,
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': str(uuid.uuid4()),
'SignatureVersion': '1.0',
'TemplateCode': template_code,
'TemplateParam': template_param,
'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'Version': '2017-05-25',
}
sorted_params = sorted(params.items())
query_string = '&'.join(f'{_percent_encode(k)}={_percent_encode(v)}' for k, v in sorted_params)
string_to_sign = f'GET&{_percent_encode("/")}&{_percent_encode(query_string)}'
sign_key = (access_secret + '&').encode('utf-8')
signature = base64.b64encode(
hmac.new(sign_key, string_to_sign.encode('utf-8'), hashlib.sha1).digest()
).decode('utf-8')
params['Signature'] = signature
resp = requests.get(
'https://dysmsapi.aliyuncs.com/',
params=params,
timeout=10,
)
data = resp.json()
if data.get('Code') == 'OK':
return True, '测试短信已发送'
return False, f'发送失败: {data.get("Message", data.get("Code", "未知错误"))}'
except Exception as e:
return False, str(e)
def send_feishu_test(mobile):
"""发送测试消息到指定手机号。Returns (success, message)。"""
try:
token = _get_tenant_access_token()
open_id = _get_open_id_by_mobile(token, mobile)
if not open_id:
return False, f'未找到手机号 {mobile} 对应的飞书用户'
card = {
'config': {'wide_screen_mode': True},
'header': {
'title': {'tag': 'plain_text', 'content': '🔔 AirDrama 告警测试'},
'template': 'blue',
},
'elements': [
{
'tag': 'div',
'text': {
'tag': 'lark_md',
'content': '这是一条测试消息,说明飞书告警通道配置正常。',
},
},
],
}
import json
resp = requests.post(
'https://open.feishu.cn/open-apis/im/v1/messages',
headers={'Authorization': f'Bearer {token}'},
params={'receive_id_type': 'open_id'},
json={
'receive_id': open_id,
'msg_type': 'interactive',
'content': json.dumps(card, ensure_ascii=False),
},
timeout=5,
)
data = resp.json()
if data.get('code') != 0:
return False, f'发送失败: {data.get("msg", "")}'
return True, '测试消息已发送'
except Exception as e:
return False, str(e)