airlabs-manage/backend/services/feishu_service.py
seaislee1209 530f02a66a
Some checks failed
Build and Deploy Web / build-and-deploy (push) Has been cancelled
Build and Deploy Backend / build-and-deploy (push) Has been cancelled
feat: 飞书报告卡片化 + 报告权限系统 + 产出过滤优化
- 日报/周报/月报改为结构化卡片推送(column_set布局)
- 新增 report:daily/weekly/monthly 权限到角色管理
- 产出统计只算中期制作阶段动画秒数
- 效率之星改为跨项目加权通过率
- AI点评补充风险数据源
- 禁用多余admin账号,股东角色加报告权限

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 20:43:35 +08:00

439 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""飞书自建应用消息推送服务"""
import time
import json
import logging
import httpx
from config import FEISHU_APP_ID, FEISHU_APP_SECRET, REPORT_RECEIVERS
logger = logging.getLogger(__name__)
FEISHU_BASE = "https://open.feishu.cn/open-apis"
# ──────────────────────── 卡片构建工具 ────────────────────────
def _col(weight: int, content: str) -> dict:
"""快捷构建一个 column"""
return {
"tag": "column",
"width": "weighted",
"weight": weight,
"vertical_align": "top",
"elements": [{"tag": "markdown", "content": content}],
}
def _column_set(columns: list, bg: str = "grey") -> dict:
"""构建 column_set 多列布局"""
return {
"tag": "column_set",
"flex_mode": "none",
"background_style": bg,
"columns": columns,
}
def _hr() -> dict:
return {"tag": "hr"}
def _md(content: str) -> dict:
return {"tag": "markdown", "content": content}
def _progress_bar(pct: float) -> str:
"""用 unicode 方块生成进度条文本"""
filled = min(int(pct / 10), 10)
empty = 10 - filled
bar = "" * filled + "" * empty
return bar
def build_daily_card(title: str, data: dict) -> dict:
"""从结构化数据构建日报卡片"""
is_weekend = data.get("is_weekend", False)
has_risks = bool(data.get("risks"))
elements = []
# ── 顶部 KPI 指标栏3列灰底 ──
if is_weekend:
elements.append(_column_set([
_col(1, f"**进行中项目**\n{data['active_project_count']}"),
_col(1, f"**周末加班**\n{data['submitter_count']} 人提交"),
_col(1, f"**加班产出**\n{data['total_output']}"),
]))
else:
elements.append(_column_set([
_col(1, f"**进行中项目**\n{data['active_project_count']}"),
_col(1, f"**今日提交**\n{data['submitter_count']} 人次"),
_col(1, f"**总产出**\n{data['total_output']}"),
]))
# ── 各项目进展(表格式对齐) ──
elements.append(_hr())
projects = data.get("projects", [])
if projects:
# 表头
elements.append(_column_set([
_col(3, "**项目名称**"),
_col(2, "**进度**"),
_col(1, "**今日产出**"),
], bg="default"))
# 每个项目一行
for p in projects:
pct = p['progress']
bar = _progress_bar(pct)
elements.append(_column_set([
_col(3, f"{p['name']}"),
_col(2, f"{bar} {pct}%"),
_col(1, f"{p['today_output']}"),
], bg="default"))
else:
elements.append(_md("暂无进行中项目"))
# ── 风险预警(始终显示) ──
elements.append(_hr())
risks = data.get("risks", [])
if risks:
level_icon = {"high": "🔴", "medium": "🟡", "low": "🟢"}
risk_lines = "\n".join(
f"{level_icon.get(r.get('level','medium'), '⚠️')} **{r['name']}**{r['detail']}"
for r in risks
)
elements.append(_md(f"**🚨 风险预警({len(risks)}项)**\n{risk_lines}"))
else:
elements.append(_md("**✅ 风险预警**\n当前无风险项目"))
# ── 未提交人员(仅工作日显示) ──
not_submitted = data.get("not_submitted", [])
if not is_weekend and not_submitted:
elements.append(_hr())
names = "".join(not_submitted)
elements.append(_md(f"**📝 未提交({len(not_submitted)}人)**\n{names}"))
elif not is_weekend:
elements.append(_hr())
elements.append(_md("**✅ 提交情况**\n今日全员已提交"))
# ── AI 点评 ──
ai_summary = data.get("ai_summary")
if ai_summary:
elements.append(_hr())
elements.append(_md(f"**💡 AI 点评**\n{ai_summary}"))
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": "orange" if has_risks else "blue",
},
"elements": elements,
}
def build_weekly_card(title: str, data: dict) -> dict:
"""从结构化数据构建周报卡片"""
elements = []
# ── 顶部 KPI3列灰底 ──
top_cols = [
_col(1, f"**总产出**\n{data['total_output']}"),
_col(1, f"**人均日产出**\n{data['avg_daily_output']}"),
]
if data.get("top_producer"):
top_cols.append(_col(1, f"**效率之星**\n{data['top_producer']}"))
elements.append(_column_set(top_cols))
# ── 项目进展(表格式) ──
elements.append(_hr())
projects = data.get("projects", [])
if projects:
elements.append(_column_set([
_col(3, "**项目名称**"),
_col(2, "**进度**"),
_col(1, "**本周产出**"),
], bg="default"))
for p in projects:
pct = p['progress']
bar = _progress_bar(pct)
elements.append(_column_set([
_col(3, f"{p['name']}"),
_col(2, f"{bar} {pct}%"),
_col(1, f"{p['week_output']}"),
], bg="default"))
else:
elements.append(_md("暂无进行中项目"))
# ── 成本概览2列灰底 ──
elements.append(_hr())
elements.append(_column_set([
_col(1, f"**人力成本**\n{data['labor_cost']}"),
_col(1, f"**AI 工具**\n{data['ai_tool_cost']}"),
]))
# ── 损耗排行 ──
waste_ranking = data.get("waste_ranking", [])
if waste_ranking:
elements.append(_hr())
elements.append(_column_set([
_col(3, "**项目**"),
_col(2, "**损耗率**"),
], bg="default"))
for i, w in enumerate(waste_ranking):
elements.append(_column_set([
_col(3, f"{i+1}. {w['name']}"),
_col(2, f"{w['rate']}%"),
], bg="default"))
# ── AI 分析 ──
ai_summary = data.get("ai_summary")
if ai_summary:
elements.append(_hr())
elements.append(_md(f"**💡 AI 分析与建议**\n{ai_summary}"))
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": "blue",
},
"elements": elements,
}
def build_monthly_card(title: str, data: dict) -> dict:
"""从结构化数据构建月报卡片"""
elements = []
# ── 顶部 KPI4列灰底 ──
elements.append(_column_set([
_col(1, f"**进行中**\n{data['active_count']}"),
_col(1, f"**已完成**\n{data['completed_count']}"),
_col(1, f"**总产出**\n{data['total_output']}"),
_col(1, f"**总成本**\n{data['total_cost']}"),
]))
# ── 各项目成本明细(表格式) ──
project_costs = data.get("project_costs", [])
if project_costs:
elements.append(_hr())
elements.append(_column_set([
_col(2, "**项目**"),
_col(1, "**人力**"),
_col(1, "**AI**"),
_col(1, "**外包**"),
_col(1, "**合计**"),
], bg="default"))
for pc in project_costs:
elements.append(_column_set([
_col(2, f"{pc['name']}"),
_col(1, f"{pc['labor']}"),
_col(1, f"{pc['ai_tool']}"),
_col(1, f"{pc['outsource']}"),
_col(1, f"**{pc['total']}**"),
], bg="default"))
# ── 盈亏概览 ──
profit_items = data.get("profit_items", [])
if profit_items:
elements.append(_hr())
elements.append(_column_set([
_col(2, "**项目**"),
_col(1, "**回款**"),
_col(1, "**成本**"),
_col(1, "**利润**"),
], bg="default"))
for pi in profit_items:
sign = "+" if pi['is_positive'] else ""
elements.append(_column_set([
_col(2, f"{pi['name']}"),
_col(1, f"{pi['contract']}"),
_col(1, f"{pi['cost']}"),
_col(1, f"{sign}{pi['profit']}"),
], bg="default"))
if data.get("profit_rate") is not None:
elements.append(_md(f"总利润率:**{data['profit_rate']}%**"))
# ── 损耗 + 人均2列灰底 ──
elements.append(_hr())
elements.append(_column_set([
_col(1, f"**总损耗**\n{data['waste_total']}{data['waste_rate']}%"),
_col(1, f"**人均产出**\n{data['avg_per_person']}{data['participant_count']}人)"),
]))
# ── AI 深度分析 ──
ai_summary = data.get("ai_summary")
if ai_summary:
elements.append(_hr())
elements.append(_md(f"**💡 AI 深度分析**\n{ai_summary}"))
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": title},
"template": "blue",
},
"elements": elements,
}
# ──────────────────────── 飞书 API 服务 ────────────────────────
class FeishuService:
def __init__(self):
self.app_id = FEISHU_APP_ID
self.app_secret = FEISHU_APP_SECRET
self._tenant_token: str = ""
self._token_expires: float = 0
self._user_id_cache: dict[str, str] = {}
async def _get_tenant_token(self) -> str:
"""获取 tenant_access_token有效期 2 小时,自动缓存)"""
if self._tenant_token and time.time() < self._token_expires:
return self._tenant_token
if not self.app_id or not self.app_secret:
logger.warning("飞书 App ID/Secret 未配置")
return ""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{FEISHU_BASE}/auth/v3/tenant_access_token/internal",
json={
"app_id": self.app_id,
"app_secret": self.app_secret,
},
)
data = resp.json()
if data.get("code") != 0:
logger.error(f"获取飞书 token 失败: {data}")
return ""
self._tenant_token = data["tenant_access_token"]
self._token_expires = time.time() + data.get("expire", 7200) - 60
logger.info("飞书 tenant_access_token 获取成功")
return self._tenant_token
async def get_user_id_by_mobile(self, mobile: str) -> str:
"""通过手机号查飞书 user_id"""
if mobile in self._user_id_cache:
return self._user_id_cache[mobile]
token = await self._get_tenant_token()
if not token:
return ""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{FEISHU_BASE}/contact/v3/users/batch_get_id",
headers={"Authorization": f"Bearer {token}"},
json={"mobiles": [mobile]},
params={"user_id_type": "open_id"},
)
data = resp.json()
if data.get("code") != 0:
logger.error(f"查询用户 {mobile} 失败: {data}")
return ""
user_list = data.get("data", {}).get("user_list", [])
if user_list and user_list[0].get("user_id"):
uid = user_list[0]["user_id"]
self._user_id_cache[mobile] = uid
return uid
logger.warning(f"未找到手机号 {mobile} 对应的飞书用户")
return ""
async def send_card(self, user_id: str, card: dict) -> bool:
"""发送构建好的卡片 JSON 给个人"""
token = await self._get_tenant_token()
if not token:
return False
payload = {
"receive_id": user_id,
"msg_type": "interactive",
"content": json.dumps(card),
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{FEISHU_BASE}/im/v1/messages",
headers={"Authorization": f"Bearer {token}"},
params={"receive_id_type": "open_id"},
json=payload,
)
data = resp.json()
if data.get("code") != 0:
logger.error(f"发送消息给 {user_id} 失败: {data}")
return False
logger.info(f"飞书消息发送成功: {user_id}")
return True
async def send_card_message(self, user_id: str, title: str, content: str):
"""兼容旧接口:发送简单 title+content 卡片"""
card = {
"header": {
"title": {"tag": "plain_text", "content": title},
"template": "blue",
},
"elements": [
{"tag": "markdown", "content": content},
],
}
return await self.send_card(user_id, card)
async def send_report_card_to_all(self, card: dict) -> dict:
"""给所有配置的接收人发送卡片报告"""
results = {"success": [], "failed": []}
if not REPORT_RECEIVERS:
logger.warning("未配置报告接收人")
return results
for mobile in REPORT_RECEIVERS:
user_id = await self.get_user_id_by_mobile(mobile)
if not user_id:
results["failed"].append({"mobile": mobile, "reason": "未找到用户"})
continue
ok = await self.send_card(user_id, card)
if ok:
results["success"].append(mobile)
else:
results["failed"].append({"mobile": mobile, "reason": "发送失败"})
logger.info(f"报告推送完成: 成功 {len(results['success'])},失败 {len(results['failed'])}")
return results
async def send_report_to_all(self, title: str, content: str) -> dict:
"""兼容旧接口"""
results = {"success": [], "failed": []}
if not REPORT_RECEIVERS:
logger.warning("未配置报告接收人")
return results
for mobile in REPORT_RECEIVERS:
user_id = await self.get_user_id_by_mobile(mobile)
if not user_id:
results["failed"].append({"mobile": mobile, "reason": "未找到用户"})
continue
ok = await self.send_card_message(user_id, title, content)
if ok:
results["success"].append(mobile)
else:
results["failed"].append({"mobile": mobile, "reason": "发送失败"})
logger.info(f"报告推送完成: 成功 {len(results['success'])},失败 {len(results['failed'])}")
return results
# 全局单例
feishu = FeishuService()