AirGate/backend/utils/feishu.py
seaislee1209 555c86ce76 feat: initialize AirGate - Volcengine IAM sub-account management platform
Backend (Django 4.2 + DRF):
- Admin auth with SimpleJWT
- Volcengine API client with HMAC-SHA256 signing
- IAM user management (create/sync/import/disable/enable)
- Billing query with pagination
- Feishu webhook notifications (async)
- APScheduler for periodic spending checks
- AES-256 encrypted credential storage
- API key auth for external system integration

Frontend (Vue 3 + Element Plus):
- Login page
- Dashboard with stats overview
- IAM user list with per-user threshold config
- Billing view with spending progress bars
- Alert history with type filtering
- Settings page for global config and Volcengine account management

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 13:03:30 +08:00

43 lines
1.3 KiB
Python

"""飞书机器人通知"""
import logging
import threading
import requests
logger = logging.getLogger(__name__)
def send_feishu_alert(webhook_url: str, title: str, content: str,
template: str = "red"):
"""发送飞书卡片消息(非阻塞)"""
if not webhook_url:
logger.warning(f"飞书 Webhook 未配置,跳过通知: {title}")
return
def _send():
payload = {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": template,
},
"elements": [
{
"tag": "div",
"text": {"tag": "lark_md", "content": content},
}
],
},
}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
logger.info(f"飞书通知已发送: {title}")
except Exception as e:
logger.error(f"飞书通知发送失败: {e}")
thread = threading.Thread(target=_send, daemon=True)
thread.start()