diff --git a/daily_report/db.py b/daily_report/db.py index 85a19ab..6938de3 100644 --- a/daily_report/db.py +++ b/daily_report/db.py @@ -30,6 +30,7 @@ class Database: name TEXT NOT NULL, department TEXT NOT NULL DEFAULT '', manager TEXT NOT NULL DEFAULT '', + role TEXT NOT NULL DEFAULT 'staff', active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP @@ -50,6 +51,11 @@ class Database: ); """ ) + try: + self.connection.execute("ALTER TABLE employees ADD COLUMN role TEXT NOT NULL DEFAULT 'staff'") + except sqlite3.OperationalError as error: + if "duplicate column name" not in str(error).lower(): + raise self.connection.commit() def load_employees(self, employee_seed_path: Path) -> None: @@ -64,12 +70,13 @@ class Database: for employee in employees: self.connection.execute( """ - INSERT INTO employees (feishu_user_id, name, department, manager, active, updated_at) - VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + INSERT INTO employees (feishu_user_id, name, department, manager, role, active, updated_at) + VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(feishu_user_id) DO UPDATE SET name = excluded.name, department = excluded.department, manager = excluded.manager, + role = excluded.role, active = excluded.active, updated_at = CURRENT_TIMESTAMP """, @@ -78,6 +85,7 @@ class Database: employee["name"], employee.get("department", ""), employee.get("manager", ""), + employee.get("role", "staff"), 0 if employee.get("active") is False else 1, ), ) @@ -89,7 +97,7 @@ class Database: with self._lock: rows = self.connection.execute( """ - SELECT feishu_user_id, name, department, manager, active + SELECT feishu_user_id, name, department, manager, role, active FROM employees WHERE active = 1 ORDER BY department, name @@ -101,7 +109,7 @@ class Database: with self._lock: row = self.connection.execute( """ - SELECT feishu_user_id, name, department, manager, active + SELECT feishu_user_id, name, department, manager, role, active FROM employees WHERE feishu_user_id = ? """, diff --git a/daily_report/report_service.py b/daily_report/report_service.py index 612748e..0cfca8a 100644 --- a/daily_report/report_service.py +++ b/daily_report/report_service.py @@ -98,3 +98,8 @@ class ReportService: "employee": employee, "reports": self.database.list_reports_for_employee(user_id, limit), } + + def is_admin(self, feishu_user_id: str) -> bool: + user_id = _required_text(feishu_user_id, "feishu_user_id") + employee = self.database.find_employee(user_id) + return bool(employee and employee.get("active") == 1 and employee.get("role") == "admin") diff --git a/daily_report/web.py b/daily_report/web.py index 2743f6c..ddd1ce7 100644 --- a/daily_report/web.py +++ b/daily_report/web.py @@ -170,6 +170,20 @@ def manager_page(current_date: str) -> bytes: ) +def forbidden_page() -> bytes: + return page( + "无权限", + """ +
+
+

无权限

+

你没有查看日报汇总的权限。请联系管理员开通。

+

返回日报填写页

+
+
""", + ) + + class DailyReportHandler(BaseHTTPRequestHandler): report_service: ReportService config: Config @@ -209,6 +223,16 @@ class DailyReportHandler(BaseHTTPRequestHandler): def _oauth_enabled(self) -> bool: return bool(self.config.feishu_app_id and self.config.feishu_app_secret) + def _is_admin_session(self) -> bool: + session = self._session() + return bool(session and self.report_service.is_admin(session["feishu_user_id"])) + + def _require_admin(self) -> bool: + if self._is_admin_session(): + return True + self._send(403, forbidden_page()) + return False + def do_GET(self) -> None: parsed = urlparse(self.path) query = parse_qs(parsed.query) @@ -260,9 +284,13 @@ class DailyReportHandler(BaseHTTPRequestHandler): self.end_headers() return if parsed.path == "/manager": + if not self._require_admin(): + return self._send(200, manager_page(query.get("date", [today_string()])[0])) return if parsed.path == "/api/reports": + if not self._require_admin(): + return self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0])) return if parsed.path == "/api/reports/history": @@ -273,6 +301,8 @@ class DailyReportHandler(BaseHTTPRequestHandler): self._json(400, {"error": str(error)}) return if parsed.path == "/api/reports/export": + if not self._require_admin(): + return report_date = query.get("date", [today_string()])[0] csv_text = self.report_service.export_reports_csv(report_date) self.send_response(200) diff --git a/data/employees.json b/data/employees.json index 7ac30b8..b37da11 100644 --- a/data/employees.json +++ b/data/employees.json @@ -4,6 +4,7 @@ "name": "Alice", "department": "Operations", "manager": "Manager", + "role": "staff", "active": true }, { @@ -11,6 +12,7 @@ "name": "Bob", "department": "Operations", "manager": "Manager", + "role": "staff", "active": true } ] diff --git a/tests/test_web.py b/tests/test_web.py index db390fd..6b844b6 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -13,11 +13,11 @@ from daily_report.feishu_auth import create_session_cookie from daily_report.web import create_server -def make_server() -> tuple: +def make_server(employees: list[dict] | None = None) -> tuple: temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-")) seed_path = temp_dir / "employees.json" seed_path.write_text( - json.dumps([{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]), + json.dumps(employees or [{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]), encoding="utf-8", ) config = Config( @@ -45,6 +45,13 @@ def get(url: str) -> tuple[int, str]: return response.status, response.read().decode("utf-8") +def get_with_cookie(url: str, cookie: str) -> tuple[int, str]: + opener = urllib.request.build_opener() + opener.addheaders = [("Cookie", cookie)] + with opener.open(url, timeout=5) as response: + return response.status, response.read().decode("utf-8") + + class WebTest(unittest.TestCase): def test_serves_submit_and_manager_pages(self) -> None: server, db, base_url = make_server() @@ -55,7 +62,20 @@ class WebTest(unittest.TestCase): self.assertIn("1. \n2. \n3. \n4.", submit) self.assertIn("我的历史日报", submit) - status, manager = get(f"{base_url}/manager") + admin_cookie = "daily_report_session=" + create_session_cookie( + {"feishu_user_id": "u_1", "name": "Lin"}, "session-secret" + ) + db.upsert_employee( + { + "feishu_user_id": "u_1", + "name": "Lin", + "department": "Design", + "manager": "", + "active": True, + "role": "admin", + } + ) + status, manager = get_with_cookie(f"{base_url}/manager", admin_cookie) self.assertEqual(status, 200) self.assertIn("日报浏览", manager) finally: @@ -83,7 +103,20 @@ class WebTest(unittest.TestCase): with urllib.request.urlopen(request, timeout=5) as response: self.assertEqual(response.status, 200) - status, body = get(f"{base_url}/api/reports?date=2026-05-07") + db.upsert_employee( + { + "feishu_user_id": "u_1", + "name": "Lin", + "department": "Design", + "manager": "", + "active": True, + "role": "admin", + } + ) + admin_cookie = "daily_report_session=" + create_session_cookie( + {"feishu_user_id": "u_1", "name": "Lin"}, "session-secret" + ) + status, body = get_with_cookie(f"{base_url}/api/reports?date=2026-05-07", admin_cookie) summary = json.loads(body) self.assertEqual(status, 200) self.assertEqual(summary["submittedCount"], 1) @@ -122,6 +155,32 @@ class WebTest(unittest.TestCase): server.server_close() db.close() + def test_manager_page_requires_admin_role(self) -> None: + server, db, base_url = make_server( + [ + {"feishu_user_id": "u_admin", "name": "Admin", "active": True, "role": "admin"}, + {"feishu_user_id": "u_staff", "name": "Staff", "active": True, "role": "staff"}, + ] + ) + try: + staff_cookie = "daily_report_session=" + create_session_cookie( + {"feishu_user_id": "u_staff", "name": "Staff"}, "session-secret" + ) + with self.assertRaises(urllib.error.HTTPError) as error: + get_with_cookie(f"{base_url}/manager", staff_cookie) + self.assertEqual(error.exception.code, 403) + + admin_cookie = "daily_report_session=" + create_session_cookie( + {"feishu_user_id": "u_admin", "name": "Admin"}, "session-secret" + ) + status, body = get_with_cookie(f"{base_url}/manager", admin_cookie) + self.assertEqual(status, 200) + self.assertIn("日报浏览", body) + finally: + server.shutdown() + server.server_close() + db.close() + if __name__ == "__main__": unittest.main()