from __future__ import annotations import json from datetime import date from html import escape from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import parse_qs, urlparse from .config import Config, read_config from .db import Database from .feishu_auth import ( authorize_url, create_session_cookie, create_state, exchange_code_for_user_access_token, fetch_user_info, parse_session_cookie, ) from .report_service import ReportService from .robot_service import create_reminder_payload, create_summary_payload, send_webhook def today_string() -> str: return date.today().isoformat() def page(title: str, body: str, scripts: str = "") -> bytes: html = f""" {escape(title)} {body} {scripts} """ return html.encode("utf-8") def submit_page(current_date: str, session: dict[str, str] | None = None) -> bytes: if session: identity_html = f"""
当前填写人 {escape(session.get("name", session["feishu_user_id"]))}
""" history_hint = "系统已自动识别你的飞书身份,这里会显示你最近提交过的日报。" else: identity_html = '' history_hint = "输入员工 ID 后,可以查看自己最近提交过的日报。" return page( "每日报告", f"""

每日工作汇报

{identity_html}
今日状态

今日完成

明日计划

我的历史日报

{history_hint}

暂无历史记录。
""", ) def manager_page(current_date: str) -> bytes: return page( "日报浏览", f"""

日报浏览

按日期、员工和关键词浏览所有日报记录。

导出 CSV

未提交人员

""", '', ) def forbidden_page() -> bytes: return page( "无权限", """

无权限

你没有查看日报汇总的权限,请联系管理员开通。

返回日报填写页

""", ) class DailyReportHandler(BaseHTTPRequestHandler): report_service: ReportService config: Config static_dir: Path def log_message(self, format: str, *args: object) -> None: return def _send(self, status: int, body: bytes, content_type: str = "text/html; charset=utf-8") -> None: self.send_response(status) self.send_header("content-type", content_type) self.send_header("content-length", str(len(body))) self.end_headers() self.wfile.write(body) def _json(self, status: int, data: dict) -> None: self._send(status, json.dumps(data, ensure_ascii=False).encode("utf-8"), "application/json; charset=utf-8") def _read_json(self) -> dict: length = int(self.headers.get("content-length", "0")) if length == 0: return {} return json.loads(self.rfile.read(length).decode("utf-8")) def _cookie(self, name: str) -> str | None: for item in self.headers.get("cookie", "").split(";"): if "=" not in item: continue key, value = item.strip().split("=", 1) if key == name: return value return None def _session(self) -> dict[str, str] | None: return parse_session_cookie(self._cookie("daily_report_session"), self.config.session_secret) def _oauth_enabled(self) -> bool: return bool(self.config.feishu_app_id and self.config.feishu_app_secret) def _is_admin_session(self) -> bool: session = self._session() return bool(session and self.report_service.is_admin(session["feishu_user_id"])) def _require_admin(self) -> bool: if self._is_admin_session(): return True self._send(403, forbidden_page()) return False def do_GET(self) -> None: parsed = urlparse(self.path) query = parse_qs(parsed.query) if parsed.path == "/": self.send_response(302) self.send_header("location", "/manager") self.end_headers() return if parsed.path == "/submit": session = self._session() if self._oauth_enabled() and not session: self.send_response(302) self.send_header("location", "/auth/feishu/start") self.end_headers() return self._send(200, submit_page(today_string(), session)) return if parsed.path == "/auth/feishu/start": state = create_state() redirect_uri = f"{self.config.base_url}/auth/feishu/callback" self.send_response(302) self.send_header("set-cookie", f"daily_report_oauth_state={state}; Path=/; HttpOnly; SameSite=Lax") self.send_header("location", authorize_url(self.config.feishu_app_id, redirect_uri, state)) self.end_headers() return if parsed.path == "/auth/feishu/callback": state = query.get("state", [""])[0] code = query.get("code", [""])[0] if not code or not state or state != self._cookie("daily_report_oauth_state"): self._json(400, {"error": "invalid feishu oauth callback"}) return redirect_uri = f"{self.config.base_url}/auth/feishu/callback" token = exchange_code_for_user_access_token( self.config.feishu_app_id, self.config.feishu_app_secret, code, redirect_uri, ) employee = fetch_user_info(token) self.report_service.database.upsert_employee(employee) cookie = create_session_cookie( {"feishu_user_id": employee["feishu_user_id"], "name": employee["name"]}, self.config.session_secret, ) self.send_response(302) self.send_header("set-cookie", f"daily_report_session={cookie}; Path=/; HttpOnly; SameSite=Lax") self.send_header("set-cookie", "daily_report_oauth_state=; Path=/; Max-Age=0") self.send_header("location", "/submit") self.end_headers() return if parsed.path == "/manager": if not self._require_admin(): return self._send(200, manager_page(query.get("date", [today_string()])[0])) return if parsed.path == "/api/reports": if not self._require_admin(): return self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0])) return if parsed.path == "/api/reports/history": user_id = query.get("feishu_user_id", [""])[0] try: self._json(200, self.report_service.list_employee_history(user_id)) except ValueError as error: self._json(400, {"error": str(error)}) return if parsed.path == "/api/reports/previous": user_id = query.get("feishu_user_id", [""])[0] report_date = query.get("date", [today_string()])[0] try: self._json(200, self.report_service.previous_report_reference(user_id, report_date)) except ValueError as error: self._json(400, {"error": str(error)}) return if parsed.path == "/api/reports/export": if not self._require_admin(): return report_date = query.get("date", [today_string()])[0] csv_text = self.report_service.export_reports_csv(report_date) self.send_response(200) self.send_header("content-type", "text/csv; charset=utf-8") self.send_header("content-disposition", f'attachment; filename="daily-reports-{report_date}.csv"') self.end_headers() self.wfile.write(csv_text.encode("utf-8-sig")) return if parsed.path.startswith("/static/"): self._serve_static(parsed.path.removeprefix("/static/")) return self._json(404, {"error": "not found"}) def do_POST(self) -> None: parsed = urlparse(self.path) try: if parsed.path == "/api/reports": self._json(200, {"report": self.report_service.upsert_report(self._read_json())}) return if parsed.path == "/api/robot/send-reminder": payload = create_reminder_payload(f"{self.config.base_url}/submit") self._json( 200, { "ok": True, "result": send_webhook( self.config.feishu_webhook_url, payload, self.config.feishu_webhook_secret, ), }, ) return if parsed.path == "/api/robot/send-summary": body = self._read_json() report_date = body.get("date") or today_string() summary = self.report_service.list_reports_for_date(report_date) payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary) self._json( 200, { "ok": True, "result": send_webhook( self.config.feishu_webhook_url, payload, self.config.feishu_webhook_secret, ), }, ) return self._json(404, {"error": "not found"}) except (ValueError, json.JSONDecodeError) as error: self._json(400, {"error": str(error)}) except Exception as error: self._json(500, {"error": str(error)}) def _serve_static(self, name: str) -> None: safe_name = name.replace("\\", "/").split("/")[-1] file_path = self.static_dir / safe_name if not file_path.exists(): self._json(404, {"error": "not found"}) return content_type = "application/javascript; charset=utf-8" if file_path.suffix == ".js" else "text/css; charset=utf-8" self._send(200, file_path.read_bytes(), content_type) def create_server(config: Config, database: Database | None = None) -> ThreadingHTTPServer: db = database or Database(config.database_path, config.employee_seed_path) service = ReportService(db) class Handler(DailyReportHandler): report_service = service static_dir = config.root_dir / "daily_report" / "static" Handler.config = config server = ThreadingHTTPServer(("", config.port), Handler) server.database = db # type: ignore[attr-defined] return server def main() -> None: config = read_config() server = create_server(config) print(f"Daily report app listening on {config.base_url}") try: server.serve_forever() finally: server.server_close() server.database.close() # type: ignore[attr-defined] if __name__ == "__main__": main()