from __future__ import annotations import argparse from datetime import date from .config import read_config from .db import Database from .report_service import ReportService from .robot_service import ( create_reminder_payload, create_summary_payload, get_tenant_access_token, send_bot_interactive_message, send_webhook, ) from .workday import is_workday def send_reminder() -> dict: config = read_config() if not is_workday(date.today(), config.workday_calendar_path): return {"skipped": True, "reason": "not a workday"} payload = create_reminder_payload(f"{config.base_url}/submit") database = Database(config.database_path, config.employee_seed_path) try: service = ReportService(database) employees = service.list_reports_for_date(date.today().isoformat())["missing"] if config.feishu_app_id and config.feishu_app_secret: token = get_tenant_access_token(config.feishu_app_id, config.feishu_app_secret) sent = [] for employee in employees: send_bot_interactive_message(token, employee["feishu_user_id"], payload) sent.append(employee["name"]) return {"ok": True, "mode": "bot_private", "sent": sent} return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret) finally: database.close() def send_summary(report_date: str | None = None) -> dict: config = read_config() selected_date = date.fromisoformat(report_date) if report_date else date.today() if not is_workday(selected_date, config.workday_calendar_path): return {"skipped": True, "reason": "not a workday", "date": selected_date.isoformat()} database = Database(config.database_path, config.employee_seed_path) try: service = ReportService(database) selected_date_text = selected_date.isoformat() summary = service.list_reports_for_date(selected_date_text) payload = create_summary_payload(f"{config.base_url}/manager?date={selected_date_text}", summary) if config.feishu_app_id and config.feishu_app_secret: token = get_tenant_access_token(config.feishu_app_id, config.feishu_app_secret) admins = [employee for employee in database.list_active_employees() if employee.get("role") == "admin"] sent = [] for admin in admins: send_bot_interactive_message(token, admin["feishu_user_id"], payload) sent.append(admin["name"]) return {"ok": True, "mode": "bot_private", "sent": sent} return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret) finally: database.close() def main() -> None: parser = argparse.ArgumentParser(description="发送飞书日报定时消息") parser.add_argument("action", choices=["reminder", "summary"], help="reminder 发送提醒,summary 发送汇总") parser.add_argument("--date", help="汇总日期,格式 YYYY-MM-DD;不传则使用当天") args = parser.parse_args() if args.action == "reminder": result = send_reminder() else: result = send_summary(args.date) print(result) if __name__ == "__main__": main()