"""告警服务 — 飞书 interactive 卡片私信 + 辅助指标。""" import json import logging from datetime import timedelta import requests from django.utils import timezone logger = logging.getLogger(__name__) # 小毛球机器人 FEISHU_APP_ID = 'cli_a90478156bf85bd7' FEISHU_APP_SECRET = '87N2nnx6Yv56TPjl2GraLdKOjFiGOSGp' _RULE_NAMES = { 'region_mismatch': '登录地区不对 (R1)', 'impossible_travel': '不可能的旅行 (R2)', 'login_frequency': '登录太频繁 (R3)', 'multi_city': '团队遍地开花 (R4)', 'overseas_ip_diversity': '海外IP太杂 (R5)', } _LEVEL_COLORS = { 'warning': 'orange', 'critical': 'red', } _LEVEL_LABELS = { 'warning': '⚠️ 警告', 'critical': '🚨 严重', } def _get_tenant_access_token(): """获取飞书 tenant_access_token。""" import os app_secret = os.environ.get('FEISHU_APP_SECRET', FEISHU_APP_SECRET) if not app_secret: raise RuntimeError('FEISHU_APP_SECRET not configured') resp = requests.post( 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', json={'app_id': FEISHU_APP_ID, 'app_secret': app_secret}, timeout=5, ) data = resp.json() if data.get('code') != 0: raise RuntimeError(f'Feishu token error: {data}') return data['tenant_access_token'] def _get_open_id_by_mobile(token, mobile): """通过手机号查询飞书 open_id。""" resp = requests.post( 'https://open.feishu.cn/open-apis/contact/v3/users/batch_get_id', headers={'Authorization': f'Bearer {token}'}, json={'mobiles': [mobile]}, timeout=5, ) data = resp.json() if data.get('code') != 0: raise RuntimeError(f'Feishu user lookup error: {data}') user_list = data.get('data', {}).get('user_list', []) if user_list and user_list[0].get('user_id'): return user_list[0]['user_id'] return None def _compute_auxiliary_metrics(team): """计算辅助指标:最近 7 天并发踢出次数 + 非工作时间登录占比。""" from apps.accounts.models import LoginRecord since = timezone.now() - timedelta(days=7) # 并发踢出次数:ActiveSession 被删除的次数无法直接统计, # 用 LoginAnomaly 中 rule=impossible_travel 的次数近似 from apps.accounts.models import LoginAnomaly kick_count = LoginAnomaly.objects.filter( team=team, auto_disabled=True, created_at__gte=since, ).count() # 非工作时间登录占比 (22:00-08:00) total_logins = LoginRecord.objects.filter( team=team, created_at__gte=since, ).count() if total_logins > 0: night_logins = 0 for record in LoginRecord.objects.filter(team=team, created_at__gte=since).only('created_at'): hour = record.created_at.hour if hour >= 22 or hour < 8: night_logins += 1 night_ratio = round(night_logins / total_logins * 100, 1) else: night_ratio = 0 return kick_count, night_ratio def _build_card(anomaly): """构建飞书 interactive 卡片。""" team = anomaly.team user = anomaly.user record = anomaly.login_record level = anomaly.level rule = anomaly.rule detail = anomaly.detail color = _LEVEL_COLORS.get(level, 'blue') level_label = _LEVEL_LABELS.get(level, level) rule_name = _RULE_NAMES.get(rule, rule) kick_count, night_ratio = _compute_auxiliary_metrics(team) # 基本信息行 info_lines = [ f'**团队:** {team.name}', f'**用户:** {user.username}', f'**IP:** {record.ip_address}', f'**归属地:** {record.geo_country} {record.geo_province} {record.geo_city}', f'**规则:** {rule_name}', ] # 根据规则添加详情 if rule == 'region_mismatch': info_lines.append(f'**预期城市:** {", ".join(detail.get("expected", []))}') info_lines.append(f'**实际城市:** {detail.get("city", "")}') elif rule == 'impossible_travel': info_lines.append(f'**当前城市:** {detail.get("current_city", "")}') info_lines.append(f'**之前城市:** {detail.get("previous_city", "")}') elif rule == 'login_frequency': info_lines.append(f'**登录次数:** {detail.get("count", 0)} 次 / {detail.get("window_seconds", 0)}s') elif rule == 'multi_city': info_lines.append(f'**预期外城市:** {", ".join(detail.get("unexpected_cities", []))}') elif rule == 'overseas_ip_diversity': info_lines.append(f'**海外国家:** {", ".join(detail.get("countries", []))}') # 自动封禁标注 if anomaly.auto_disabled: target_label = '该用户' if anomaly.disabled_target == 'user' else '整个团队' info_lines.append(f'\n🔒 **已自动封禁{target_label}**') # 辅助指标 info_lines.append(f'\n---\n📊 **辅助指标(近7天):**') info_lines.append(f'并发踢出次数:{kick_count}') info_lines.append(f'非工作时间登录占比:{night_ratio}%') card = { 'config': {'wide_screen_mode': True}, 'header': { 'title': {'tag': 'plain_text', 'content': f'{level_label} {rule_name}'}, 'template': color, }, 'elements': [ { 'tag': 'div', 'text': { 'tag': 'lark_md', 'content': '\n'.join(info_lines), }, }, ], } return card def send_feishu_alert(anomaly): """发送飞书告警卡片到配置的接收人。""" from apps.generation.models import QuotaConfig try: config = QuotaConfig.objects.get(pk=1) except QuotaConfig.DoesNotExist: logger.warning('QuotaConfig not found, skip alert') return mobiles_str = config.feishu_alert_mobiles if not mobiles_str: logger.info('No feishu alert mobiles configured, skip alert') return mobiles = [m.strip() for m in mobiles_str.split(',') if m.strip()] if not mobiles: return try: token = _get_tenant_access_token() except Exception as e: logger.error('Failed to get feishu token: %s', e) return card = _build_card(anomaly) for mobile in mobiles: try: open_id = _get_open_id_by_mobile(token, mobile) if not open_id: logger.warning('No feishu user found for mobile %s', mobile) continue resp = requests.post( 'https://open.feishu.cn/open-apis/im/v1/messages', headers={'Authorization': f'Bearer {token}'}, params={'receive_id_type': 'open_id'}, json={ 'receive_id': open_id, 'msg_type': 'interactive', 'content': json.dumps(card, ensure_ascii=False), }, timeout=5, ) data = resp.json() if data.get('code') != 0: logger.error('Feishu send failed to %s: %s', mobile, data) else: logger.info('Feishu alert sent to %s for rule %s', mobile, anomaly.rule) except Exception as e: logger.error('Feishu alert error for %s: %s', mobile, e) def send_sms_alert(anomaly): """发送短信告警到配置的接收人。""" from apps.generation.models import QuotaConfig from django.conf import settings as django_settings try: config = QuotaConfig.objects.get(pk=1) except QuotaConfig.DoesNotExist: logger.warning('QuotaConfig not found, skip SMS alert') return mobiles = [m.strip() for m in config.sms_alert_mobiles.split(',') if m.strip()] if not mobiles: return access_key = django_settings.ALIYUN_SMS_ACCESS_KEY access_secret = django_settings.ALIYUN_SMS_ACCESS_SECRET sign_name = django_settings.ALIYUN_SMS_SIGN_NAME template_code = django_settings.ALIYUN_SMS_TEMPLATE_CODE if not all([access_key, access_secret, template_code]): logger.warning('Aliyun SMS credentials not configured, skip SMS alert') return rule_name = _RULE_NAMES.get(anomaly.rule, anomaly.rule) auto_action = '已自动封禁' if anomaly.auto_disabled else '仅告警' template_param = json.dumps({ 'team_name': anomaly.team.name[:20], 'rule_name': rule_name[:20], 'username': anomaly.user.username[:20], 'city': anomaly.login_record.geo_city or '未知', 'auto_action': auto_action, }, ensure_ascii=False) # 使用阿里云 SMS HTTP API import hashlib import hmac import base64 import urllib.parse import uuid from datetime import datetime def _percent_encode(s): return urllib.parse.quote(s, safe='', encoding='utf-8') for mobile in mobiles: try: params = { 'AccessKeyId': access_key, 'Action': 'SendSms', 'Format': 'JSON', 'PhoneNumbers': mobile, 'RegionId': 'cn-hangzhou', 'SignName': sign_name, 'SignatureMethod': 'HMAC-SHA1', 'SignatureNonce': str(uuid.uuid4()), 'SignatureVersion': '1.0', 'TemplateCode': template_code, 'TemplateParam': template_param, 'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), 'Version': '2017-05-25', } sorted_params = sorted(params.items()) query_string = '&'.join(f'{_percent_encode(k)}={_percent_encode(v)}' for k, v in sorted_params) string_to_sign = f'GET&{_percent_encode("/")}&{_percent_encode(query_string)}' sign_key = (access_secret + '&').encode('utf-8') signature = base64.b64encode( hmac.new(sign_key, string_to_sign.encode('utf-8'), hashlib.sha1).digest() ).decode('utf-8') params['Signature'] = signature resp = requests.get( 'https://dysmsapi.aliyuncs.com/', params=params, timeout=10, ) data = resp.json() if data.get('Code') == 'OK': logger.info('SMS alert sent to %s for rule %s', mobile, anomaly.rule) else: logger.error('SMS send failed to %s: %s', mobile, data) except Exception as e: logger.error('SMS alert error for %s: %s', mobile, e) def send_feishu_test(mobile): """发送测试消息到指定手机号。Returns (success, message)。""" try: token = _get_tenant_access_token() open_id = _get_open_id_by_mobile(token, mobile) if not open_id: return False, f'未找到手机号 {mobile} 对应的飞书用户' card = { 'config': {'wide_screen_mode': True}, 'header': { 'title': {'tag': 'plain_text', 'content': '🔔 AirDrama 告警测试'}, 'template': 'blue', }, 'elements': [ { 'tag': 'div', 'text': { 'tag': 'lark_md', 'content': '这是一条测试消息,说明飞书告警通道配置正常。', }, }, ], } import json resp = requests.post( 'https://open.feishu.cn/open-apis/im/v1/messages', headers={'Authorization': f'Bearer {token}'}, params={'receive_id_type': 'open_id'}, json={ 'receive_id': open_id, 'msg_type': 'interactive', 'content': json.dumps(card, ensure_ascii=False), }, timeout=5, ) data = resp.json() if data.get('code') != 0: return False, f'发送失败: {data.get("msg", "")}' return True, '测试消息已发送' except Exception as e: return False, str(e)