"""飞书自建应用消息推送服务""" import time import json import logging import httpx from config import FEISHU_APP_ID, FEISHU_APP_SECRET, REPORT_RECEIVERS logger = logging.getLogger(__name__) FEISHU_BASE = "https://open.feishu.cn/open-apis" class FeishuService: def __init__(self): self.app_id = FEISHU_APP_ID self.app_secret = FEISHU_APP_SECRET self._tenant_token: str = "" self._token_expires: float = 0 self._user_id_cache: dict[str, str] = {} async def _get_tenant_token(self) -> str: """获取 tenant_access_token(有效期 2 小时,自动缓存)""" if self._tenant_token and time.time() < self._token_expires: return self._tenant_token if not self.app_id or not self.app_secret: logger.warning("飞书 App ID/Secret 未配置") return "" async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal", json={ "app_id": self.app_id, "app_secret": self.app_secret, }, ) data = resp.json() if data.get("code") != 0: logger.error(f"获取飞书 token 失败: {data}") return "" self._tenant_token = data["tenant_access_token"] self._token_expires = time.time() + data.get("expire", 7200) - 60 logger.info("飞书 tenant_access_token 获取成功") return self._tenant_token async def get_user_id_by_mobile(self, mobile: str) -> str: """通过手机号查飞书 user_id""" if mobile in self._user_id_cache: return self._user_id_cache[mobile] token = await self._get_tenant_token() if not token: return "" async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{FEISHU_BASE}/contact/v3/users/batch_get_id", headers={"Authorization": f"Bearer {token}"}, json={"mobiles": [mobile]}, params={"user_id_type": "open_id"}, ) data = resp.json() if data.get("code") != 0: logger.error(f"查询用户 {mobile} 失败: {data}") return "" user_list = data.get("data", {}).get("user_list", []) if user_list and user_list[0].get("user_id"): uid = user_list[0]["user_id"] self._user_id_cache[mobile] = uid return uid logger.warning(f"未找到手机号 {mobile} 对应的飞书用户") return "" async def send_card_message(self, user_id: str, title: str, content: str): """发送飞书交互式卡片消息给个人""" token = await self._get_tenant_token() if not token: return False card = { "header": { "title": {"tag": "plain_text", "content": title}, "template": "blue", }, "elements": [ {"tag": "markdown", "content": content}, ], } payload = { "receive_id": user_id, "msg_type": "interactive", "content": json.dumps(card), } async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( f"{FEISHU_BASE}/im/v1/messages", headers={"Authorization": f"Bearer {token}"}, params={"receive_id_type": "open_id"}, json=payload, ) data = resp.json() if data.get("code") != 0: logger.error(f"发送消息给 {user_id} 失败: {data}") return False logger.info(f"飞书消息发送成功: {user_id}") return True async def send_report_to_all(self, title: str, content: str) -> dict: """ 给所有配置的接收人发送报告 返回 {"success": [...], "failed": [...]} """ results = {"success": [], "failed": []} if not REPORT_RECEIVERS: logger.warning("未配置报告接收人") return results for mobile in REPORT_RECEIVERS: user_id = await self.get_user_id_by_mobile(mobile) if not user_id: results["failed"].append({"mobile": mobile, "reason": "未找到用户"}) continue ok = await self.send_card_message(user_id, title, content) if ok: results["success"].append(mobile) else: results["failed"].append({"mobile": mobile, "reason": "发送失败"}) logger.info(f"报告推送完成: 成功 {len(results['success'])},失败 {len(results['failed'])}") return results # 全局单例 feishu = FeishuService()