"""飞书自建应用消息推送服务""" import time import json import logging import httpx from config import FEISHU_APP_ID, FEISHU_APP_SECRET, REPORT_RECEIVERS, DAILY_REPORT_RECEIVERS logger = logging.getLogger(__name__) FEISHU_BASE = "https://open.feishu.cn/open-apis" # ──────────────────────── 卡片构建工具 ──────────────────────── def _col(weight: int, content: str) -> dict: """快捷构建一个 column""" return { "tag": "column", "width": "weighted", "weight": weight, "vertical_align": "top", "elements": [{"tag": "markdown", "content": content}], } def _column_set(columns: list, bg: str = "grey") -> dict: """构建 column_set 多列布局""" return { "tag": "column_set", "flex_mode": "none", "background_style": bg, "columns": columns, } def _hr() -> dict: return {"tag": "hr"} def _md(content: str) -> dict: return {"tag": "markdown", "content": content} def _progress_bar(pct: float) -> str: """用 unicode 方块生成进度条文本""" filled = min(int(pct / 10), 10) empty = 10 - filled bar = "█" * filled + "░" * empty return bar def build_daily_card(title: str, data: dict) -> dict: """从结构化数据构建日报卡片""" is_weekend = data.get("is_weekend", False) has_risks = bool(data.get("risks")) elements = [] # ── 顶部 KPI 指标栏(3列灰底) ── if is_weekend: elements.append(_column_set([ _col(1, f"**进行中项目**\n{data['active_project_count']} 个"), _col(1, f"**周末加班**\n{data['submitter_count']} 人提交"), _col(1, f"**加班产出**\n{data['total_output']}"), ])) else: elements.append(_column_set([ _col(1, f"**进行中项目**\n{data['active_project_count']} 个"), _col(1, f"**今日提交**\n{data['submitter_count']} 人次"), _col(1, f"**总产出**\n{data['total_output']}"), ])) # ── 各项目进展(表格式对齐) ── elements.append(_hr()) projects = data.get("projects", []) if projects: # 表头 elements.append(_column_set([ _col(3, "**项目名称**"), _col(2, "**进度**"), _col(1, "**今日产出**"), ], bg="default")) # 每个项目一行 for p in projects: pct = p['progress'] bar = _progress_bar(pct) elements.append(_column_set([ _col(3, f"{p['name']}"), _col(2, f"{bar} {pct}%"), _col(1, f"{p['today_output']}"), ], bg="default")) else: elements.append(_md("暂无进行中项目")) # ── 风险预警(始终显示) ── elements.append(_hr()) risks = data.get("risks", []) if risks: level_icon = {"high": "🔴", "medium": "🟡", "low": "🟢"} risk_lines = "\n".join( f"{level_icon.get(r.get('level','medium'), '⚠️')} **{r['name']}**:{r['detail']}" for r in risks ) elements.append(_md(f"**🚨 风险预警({len(risks)}项)**\n{risk_lines}")) else: elements.append(_md("**✅ 风险预警**\n当前无风险项目")) # ── 未提交人员(仅工作日显示) ── not_submitted = data.get("not_submitted", []) if not is_weekend and not_submitted: elements.append(_hr()) names = "、".join(not_submitted) elements.append(_md(f"**📝 未提交({len(not_submitted)}人)**\n{names}")) elif not is_weekend: elements.append(_hr()) elements.append(_md("**✅ 提交情况**\n今日全员已提交")) # ── AI 点评 ── ai_summary = data.get("ai_summary") if ai_summary: elements.append(_hr()) elements.append(_md(f"**💡 AI 点评**\n{ai_summary}")) return { "config": {"wide_screen_mode": True}, "header": { "title": {"tag": "plain_text", "content": title}, "template": "orange" if has_risks else "blue", }, "elements": elements, } def build_weekly_card(title: str, data: dict) -> dict: """从结构化数据构建周报卡片""" elements = [] # ── 顶部 KPI(3列灰底) ── top_cols = [ _col(1, f"**总产出**\n{data['total_output']}"), _col(1, f"**人均日产出**\n{data['avg_daily_output']}"), ] if data.get("top_producer"): top_cols.append(_col(1, f"**效率之星**\n{data['top_producer']}")) elements.append(_column_set(top_cols)) # ── 项目进展(表格式) ── elements.append(_hr()) projects = data.get("projects", []) if projects: elements.append(_column_set([ _col(3, "**项目名称**"), _col(2, "**进度**"), _col(1, "**本周产出**"), ], bg="default")) for p in projects: pct = p['progress'] bar = _progress_bar(pct) elements.append(_column_set([ _col(3, f"{p['name']}"), _col(2, f"{bar} {pct}%"), _col(1, f"{p['week_output']}"), ], bg="default")) else: elements.append(_md("暂无进行中项目")) # ── 成本概览(2列灰底) ── elements.append(_hr()) elements.append(_column_set([ _col(1, f"**人力成本**\n{data['labor_cost']}"), _col(1, f"**AI 工具**\n{data['ai_tool_cost']}"), ])) # ── 损耗排行 ── waste_ranking = data.get("waste_ranking", []) if waste_ranking: elements.append(_hr()) elements.append(_column_set([ _col(3, "**项目**"), _col(2, "**损耗率**"), ], bg="default")) for i, w in enumerate(waste_ranking): elements.append(_column_set([ _col(3, f"{i+1}. {w['name']}"), _col(2, f"{w['rate']}%"), ], bg="default")) # ── AI 分析 ── ai_summary = data.get("ai_summary") if ai_summary: elements.append(_hr()) elements.append(_md(f"**💡 AI 分析与建议**\n{ai_summary}")) return { "config": {"wide_screen_mode": True}, "header": { "title": {"tag": "plain_text", "content": title}, "template": "blue", }, "elements": elements, } def build_monthly_card(title: str, data: dict) -> dict: """从结构化数据构建月报卡片""" elements = [] # ── 顶部 KPI(4列灰底) ── elements.append(_column_set([ _col(1, f"**进行中**\n{data['active_count']} 个"), _col(1, f"**已完成**\n{data['completed_count']} 个"), _col(1, f"**总产出**\n{data['total_output']}"), _col(1, f"**总成本**\n{data['total_cost']}"), ])) # ── 各项目成本明细(表格式) ── project_costs = data.get("project_costs", []) if project_costs: elements.append(_hr()) elements.append(_column_set([ _col(2, "**项目**"), _col(1, "**人力**"), _col(1, "**AI**"), _col(1, "**外包**"), _col(1, "**合计**"), ], bg="default")) for pc in project_costs: elements.append(_column_set([ _col(2, f"{pc['name']}"), _col(1, f"{pc['labor']}"), _col(1, f"{pc['ai_tool']}"), _col(1, f"{pc['outsource']}"), _col(1, f"**{pc['total']}**"), ], bg="default")) # ── 盈亏概览 ── profit_items = data.get("profit_items", []) if profit_items: elements.append(_hr()) elements.append(_column_set([ _col(2, "**项目**"), _col(1, "**回款**"), _col(1, "**成本**"), _col(1, "**利润**"), ], bg="default")) for pi in profit_items: sign = "+" if pi['is_positive'] else "" elements.append(_column_set([ _col(2, f"{pi['name']}"), _col(1, f"{pi['contract']}"), _col(1, f"{pi['cost']}"), _col(1, f"{sign}{pi['profit']}"), ], bg="default")) if data.get("profit_rate") is not None: elements.append(_md(f"总利润率:**{data['profit_rate']}%**")) # ── 损耗 + 人均(2列灰底) ── elements.append(_hr()) elements.append(_column_set([ _col(1, f"**总损耗**\n{data['waste_total']}({data['waste_rate']}%)"), _col(1, f"**人均产出**\n{data['avg_per_person']}({data['participant_count']}人)"), ])) # ── AI 深度分析 ── ai_summary = data.get("ai_summary") if ai_summary: elements.append(_hr()) elements.append(_md(f"**💡 AI 深度分析**\n{ai_summary}")) return { "config": {"wide_screen_mode": True}, "header": { "title": {"tag": "plain_text", "content": title}, "template": "blue", }, "elements": elements, } # ──────────────────────── 飞书 API 服务 ──────────────────────── class FeishuService: def __init__(self): self.app_id = FEISHU_APP_ID self.app_secret = FEISHU_APP_SECRET self._tenant_token: str = "" self._token_expires: float = 0 self._user_id_cache: dict[str, str] = {} async def _get_tenant_token(self) -> str: """获取 tenant_access_token(有效期 2 小时,自动缓存)""" if self._tenant_token and time.time() < self._token_expires: return self._tenant_token if not self.app_id or not self.app_secret: logger.warning("飞书 App ID/Secret 未配置") return "" async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal", json={ "app_id": self.app_id, "app_secret": self.app_secret, }, ) data = resp.json() if data.get("code") != 0: logger.error(f"获取飞书 token 失败: {data}") return "" self._tenant_token = data["tenant_access_token"] self._token_expires = time.time() + data.get("expire", 7200) - 60 logger.info("飞书 tenant_access_token 获取成功") return self._tenant_token async def get_user_id_by_mobile(self, mobile: str) -> str: """通过手机号查飞书 user_id""" if mobile in self._user_id_cache: return self._user_id_cache[mobile] token = await self._get_tenant_token() if not token: return "" async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( f"{FEISHU_BASE}/contact/v3/users/batch_get_id", headers={"Authorization": f"Bearer {token}"}, json={"mobiles": [mobile]}, params={"user_id_type": "open_id"}, ) data = resp.json() if data.get("code") != 0: logger.error(f"查询用户 {mobile} 失败: {data}") return "" user_list = data.get("data", {}).get("user_list", []) if user_list and user_list[0].get("user_id"): uid = user_list[0]["user_id"] self._user_id_cache[mobile] = uid return uid logger.warning(f"未找到手机号 {mobile} 对应的飞书用户") return "" async def send_card(self, user_id: str, card: dict) -> bool: """发送构建好的卡片 JSON 给个人""" token = await self._get_tenant_token() if not token: return False payload = { "receive_id": user_id, "msg_type": "interactive", "content": json.dumps(card), } async with httpx.AsyncClient(timeout=15) as client: resp = await client.post( f"{FEISHU_BASE}/im/v1/messages", headers={"Authorization": f"Bearer {token}"}, params={"receive_id_type": "open_id"}, json=payload, ) data = resp.json() if data.get("code") != 0: logger.error(f"发送消息给 {user_id} 失败: {data}") return False logger.info(f"飞书消息发送成功: {user_id}") return True async def send_card_message(self, user_id: str, title: str, content: str): """兼容旧接口:发送简单 title+content 卡片""" card = { "header": { "title": {"tag": "plain_text", "content": title}, "template": "blue", }, "elements": [ {"tag": "markdown", "content": content}, ], } return await self.send_card(user_id, card) async def send_report_card_to_all(self, card: dict, receivers: list = None) -> dict: """给指定接收人发送卡片报告,默认使用 REPORT_RECEIVERS""" results = {"success": [], "failed": []} receiver_list = receivers or REPORT_RECEIVERS if not receiver_list: logger.warning("未配置报告接收人") return results for mobile in receiver_list: user_id = await self.get_user_id_by_mobile(mobile) if not user_id: results["failed"].append({"mobile": mobile, "reason": "未找到用户"}) continue ok = await self.send_card(user_id, card) if ok: results["success"].append(mobile) else: results["failed"].append({"mobile": mobile, "reason": "发送失败"}) logger.info(f"报告推送完成: 成功 {len(results['success'])},失败 {len(results['failed'])}") return results async def send_report_to_all(self, title: str, content: str) -> dict: """兼容旧接口""" results = {"success": [], "failed": []} if not REPORT_RECEIVERS: logger.warning("未配置报告接收人") return results for mobile in REPORT_RECEIVERS: user_id = await self.get_user_id_by_mobile(mobile) if not user_id: results["failed"].append({"mobile": mobile, "reason": "未找到用户"}) continue ok = await self.send_card_message(user_id, title, content) if ok: results["success"].append(mobile) else: results["failed"].append({"mobile": mobile, "reason": "发送失败"}) logger.info(f"报告推送完成: 成功 {len(results['success'])},失败 {len(results['failed'])}") return results # 全局单例 feishu = FeishuService()