feat: restrict manager access to admins

This commit is contained in:
Codex 2026-05-07 19:06:50 +08:00
parent 70e11dd1e7
commit 92adf268b6
5 changed files with 112 additions and 8 deletions

View File

@ -30,6 +30,7 @@ class Database:
name TEXT NOT NULL,
department TEXT NOT NULL DEFAULT '',
manager TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT 'staff',
active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
@ -50,6 +51,11 @@ class Database:
);
"""
)
try:
self.connection.execute("ALTER TABLE employees ADD COLUMN role TEXT NOT NULL DEFAULT 'staff'")
except sqlite3.OperationalError as error:
if "duplicate column name" not in str(error).lower():
raise
self.connection.commit()
def load_employees(self, employee_seed_path: Path) -> None:
@ -64,12 +70,13 @@ class Database:
for employee in employees:
self.connection.execute(
"""
INSERT INTO employees (feishu_user_id, name, department, manager, active, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
INSERT INTO employees (feishu_user_id, name, department, manager, role, active, updated_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(feishu_user_id) DO UPDATE SET
name = excluded.name,
department = excluded.department,
manager = excluded.manager,
role = excluded.role,
active = excluded.active,
updated_at = CURRENT_TIMESTAMP
""",
@ -78,6 +85,7 @@ class Database:
employee["name"],
employee.get("department", ""),
employee.get("manager", ""),
employee.get("role", "staff"),
0 if employee.get("active") is False else 1,
),
)
@ -89,7 +97,7 @@ class Database:
with self._lock:
rows = self.connection.execute(
"""
SELECT feishu_user_id, name, department, manager, active
SELECT feishu_user_id, name, department, manager, role, active
FROM employees
WHERE active = 1
ORDER BY department, name
@ -101,7 +109,7 @@ class Database:
with self._lock:
row = self.connection.execute(
"""
SELECT feishu_user_id, name, department, manager, active
SELECT feishu_user_id, name, department, manager, role, active
FROM employees
WHERE feishu_user_id = ?
""",

View File

@ -98,3 +98,8 @@ class ReportService:
"employee": employee,
"reports": self.database.list_reports_for_employee(user_id, limit),
}
def is_admin(self, feishu_user_id: str) -> bool:
user_id = _required_text(feishu_user_id, "feishu_user_id")
employee = self.database.find_employee(user_id)
return bool(employee and employee.get("active") == 1 and employee.get("role") == "admin")

View File

@ -170,6 +170,20 @@ def manager_page(current_date: str) -> bytes:
)
def forbidden_page() -> bytes:
return page(
"无权限",
"""
<main class="shell narrow">
<section class="panel">
<h1>无权限</h1>
<p>你没有查看日报汇总的权限请联系管理员开通</p>
<p><a href="/submit">返回日报填写页</a></p>
</section>
</main>""",
)
class DailyReportHandler(BaseHTTPRequestHandler):
report_service: ReportService
config: Config
@ -209,6 +223,16 @@ class DailyReportHandler(BaseHTTPRequestHandler):
def _oauth_enabled(self) -> bool:
return bool(self.config.feishu_app_id and self.config.feishu_app_secret)
def _is_admin_session(self) -> bool:
session = self._session()
return bool(session and self.report_service.is_admin(session["feishu_user_id"]))
def _require_admin(self) -> bool:
if self._is_admin_session():
return True
self._send(403, forbidden_page())
return False
def do_GET(self) -> None:
parsed = urlparse(self.path)
query = parse_qs(parsed.query)
@ -260,9 +284,13 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self.end_headers()
return
if parsed.path == "/manager":
if not self._require_admin():
return
self._send(200, manager_page(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports":
if not self._require_admin():
return
self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports/history":
@ -273,6 +301,8 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self._json(400, {"error": str(error)})
return
if parsed.path == "/api/reports/export":
if not self._require_admin():
return
report_date = query.get("date", [today_string()])[0]
csv_text = self.report_service.export_reports_csv(report_date)
self.send_response(200)

View File

@ -4,6 +4,7 @@
"name": "Alice",
"department": "Operations",
"manager": "Manager",
"role": "staff",
"active": true
},
{
@ -11,6 +12,7 @@
"name": "Bob",
"department": "Operations",
"manager": "Manager",
"role": "staff",
"active": true
}
]

View File

@ -13,11 +13,11 @@ from daily_report.feishu_auth import create_session_cookie
from daily_report.web import create_server
def make_server() -> tuple:
def make_server(employees: list[dict] | None = None) -> tuple:
temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-"))
seed_path = temp_dir / "employees.json"
seed_path.write_text(
json.dumps([{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
json.dumps(employees or [{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
encoding="utf-8",
)
config = Config(
@ -45,6 +45,13 @@ def get(url: str) -> tuple[int, str]:
return response.status, response.read().decode("utf-8")
def get_with_cookie(url: str, cookie: str) -> tuple[int, str]:
opener = urllib.request.build_opener()
opener.addheaders = [("Cookie", cookie)]
with opener.open(url, timeout=5) as response:
return response.status, response.read().decode("utf-8")
class WebTest(unittest.TestCase):
def test_serves_submit_and_manager_pages(self) -> None:
server, db, base_url = make_server()
@ -55,7 +62,20 @@ class WebTest(unittest.TestCase):
self.assertIn("1. \n2. \n3. \n4.", submit)
self.assertIn("我的历史日报", submit)
status, manager = get(f"{base_url}/manager")
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
status, manager = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200)
self.assertIn("日报浏览", manager)
finally:
@ -83,7 +103,20 @@ class WebTest(unittest.TestCase):
with urllib.request.urlopen(request, timeout=5) as response:
self.assertEqual(response.status, 200)
status, body = get(f"{base_url}/api/reports?date=2026-05-07")
db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/api/reports?date=2026-05-07", admin_cookie)
summary = json.loads(body)
self.assertEqual(status, 200)
self.assertEqual(summary["submittedCount"], 1)
@ -122,6 +155,32 @@ class WebTest(unittest.TestCase):
server.server_close()
db.close()
def test_manager_page_requires_admin_role(self) -> None:
server, db, base_url = make_server(
[
{"feishu_user_id": "u_admin", "name": "Admin", "active": True, "role": "admin"},
{"feishu_user_id": "u_staff", "name": "Staff", "active": True, "role": "staff"},
]
)
try:
staff_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_staff", "name": "Staff"}, "session-secret"
)
with self.assertRaises(urllib.error.HTTPError) as error:
get_with_cookie(f"{base_url}/manager", staff_cookie)
self.assertEqual(error.exception.code, 403)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_admin", "name": "Admin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200)
self.assertIn("日报浏览", body)
finally:
server.shutdown()
server.server_close()
db.close()
if __name__ == "__main__":
unittest.main()