Compare commits

...

2 Commits

Author SHA1 Message Date
Codex
92adf268b6 feat: restrict manager access to admins 2026-05-07 19:06:50 +08:00
Codex
70e11dd1e7 fix: sign Feishu webhook messages 2026-05-07 18:12:43 +08:00
9 changed files with 164 additions and 16 deletions

View File

@ -30,6 +30,7 @@ class Database:
name TEXT NOT NULL, name TEXT NOT NULL,
department TEXT NOT NULL DEFAULT '', department TEXT NOT NULL DEFAULT '',
manager TEXT NOT NULL DEFAULT '', manager TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT 'staff',
active INTEGER NOT NULL DEFAULT 1, active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
@ -50,6 +51,11 @@ class Database:
); );
""" """
) )
try:
self.connection.execute("ALTER TABLE employees ADD COLUMN role TEXT NOT NULL DEFAULT 'staff'")
except sqlite3.OperationalError as error:
if "duplicate column name" not in str(error).lower():
raise
self.connection.commit() self.connection.commit()
def load_employees(self, employee_seed_path: Path) -> None: def load_employees(self, employee_seed_path: Path) -> None:
@ -64,12 +70,13 @@ class Database:
for employee in employees: for employee in employees:
self.connection.execute( self.connection.execute(
""" """
INSERT INTO employees (feishu_user_id, name, department, manager, active, updated_at) INSERT INTO employees (feishu_user_id, name, department, manager, role, active, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(feishu_user_id) DO UPDATE SET ON CONFLICT(feishu_user_id) DO UPDATE SET
name = excluded.name, name = excluded.name,
department = excluded.department, department = excluded.department,
manager = excluded.manager, manager = excluded.manager,
role = excluded.role,
active = excluded.active, active = excluded.active,
updated_at = CURRENT_TIMESTAMP updated_at = CURRENT_TIMESTAMP
""", """,
@ -78,6 +85,7 @@ class Database:
employee["name"], employee["name"],
employee.get("department", ""), employee.get("department", ""),
employee.get("manager", ""), employee.get("manager", ""),
employee.get("role", "staff"),
0 if employee.get("active") is False else 1, 0 if employee.get("active") is False else 1,
), ),
) )
@ -89,7 +97,7 @@ class Database:
with self._lock: with self._lock:
rows = self.connection.execute( rows = self.connection.execute(
""" """
SELECT feishu_user_id, name, department, manager, active SELECT feishu_user_id, name, department, manager, role, active
FROM employees FROM employees
WHERE active = 1 WHERE active = 1
ORDER BY department, name ORDER BY department, name
@ -101,7 +109,7 @@ class Database:
with self._lock: with self._lock:
row = self.connection.execute( row = self.connection.execute(
""" """
SELECT feishu_user_id, name, department, manager, active SELECT feishu_user_id, name, department, manager, role, active
FROM employees FROM employees
WHERE feishu_user_id = ? WHERE feishu_user_id = ?
""", """,

View File

@ -98,3 +98,8 @@ class ReportService:
"employee": employee, "employee": employee,
"reports": self.database.list_reports_for_employee(user_id, limit), "reports": self.database.list_reports_for_employee(user_id, limit),
} }
def is_admin(self, feishu_user_id: str) -> bool:
user_id = _required_text(feishu_user_id, "feishu_user_id")
employee = self.database.find_employee(user_id)
return bool(employee and employee.get("active") == 1 and employee.get("role") == "admin")

View File

@ -1,6 +1,10 @@
from __future__ import annotations from __future__ import annotations
import json import json
import base64
import hashlib
import hmac
import time
import urllib.error import urllib.error
import urllib.request import urllib.request
from typing import Any from typing import Any
@ -54,13 +58,25 @@ def create_summary_payload(manager_url: str, summary: dict[str, Any]) -> dict[st
} }
def send_webhook(webhook_url: str, payload: dict[str, Any]) -> dict[str, Any]: def create_webhook_sign(timestamp: str, secret: str) -> str:
string_to_sign = f"{timestamp}\n{secret}".encode("utf-8")
sign = hmac.new(string_to_sign, digestmod=hashlib.sha256).digest()
return base64.b64encode(sign).decode("utf-8")
def send_webhook(webhook_url: str, payload: dict[str, Any], secret: str = "") -> dict[str, Any]:
if not webhook_url: if not webhook_url:
raise ValueError("FEISHU_WEBHOOK_URL is required") raise ValueError("FEISHU_WEBHOOK_URL is required")
request_payload = dict(payload)
if secret:
timestamp = str(int(time.time()))
request_payload["timestamp"] = timestamp
request_payload["sign"] = create_webhook_sign(timestamp, secret)
request = urllib.request.Request( request = urllib.request.Request(
webhook_url, webhook_url,
data=json.dumps(payload).encode("utf-8"), data=json.dumps(request_payload).encode("utf-8"),
headers={"content-type": "application/json"}, headers={"content-type": "application/json"},
method="POST", method="POST",
) )

View File

@ -15,7 +15,7 @@ def send_reminder() -> dict:
if not is_workday(date.today(), config.workday_calendar_path): if not is_workday(date.today(), config.workday_calendar_path):
return {"skipped": True, "reason": "not a workday"} return {"skipped": True, "reason": "not a workday"}
payload = create_reminder_payload(f"{config.base_url}/submit") payload = create_reminder_payload(f"{config.base_url}/submit")
return send_webhook(config.feishu_webhook_url, payload) return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
def send_summary(report_date: str | None = None) -> dict: def send_summary(report_date: str | None = None) -> dict:
@ -30,7 +30,7 @@ def send_summary(report_date: str | None = None) -> dict:
selected_date_text = selected_date.isoformat() selected_date_text = selected_date.isoformat()
summary = service.list_reports_for_date(selected_date_text) summary = service.list_reports_for_date(selected_date_text)
payload = create_summary_payload(f"{config.base_url}/manager?date={selected_date_text}", summary) payload = create_summary_payload(f"{config.base_url}/manager?date={selected_date_text}", summary)
return send_webhook(config.feishu_webhook_url, payload) return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
finally: finally:
database.close() database.close()

View File

@ -170,6 +170,20 @@ def manager_page(current_date: str) -> bytes:
) )
def forbidden_page() -> bytes:
return page(
"无权限",
"""
<main class="shell narrow">
<section class="panel">
<h1>无权限</h1>
<p>你没有查看日报汇总的权限请联系管理员开通</p>
<p><a href="/submit">返回日报填写页</a></p>
</section>
</main>""",
)
class DailyReportHandler(BaseHTTPRequestHandler): class DailyReportHandler(BaseHTTPRequestHandler):
report_service: ReportService report_service: ReportService
config: Config config: Config
@ -209,6 +223,16 @@ class DailyReportHandler(BaseHTTPRequestHandler):
def _oauth_enabled(self) -> bool: def _oauth_enabled(self) -> bool:
return bool(self.config.feishu_app_id and self.config.feishu_app_secret) return bool(self.config.feishu_app_id and self.config.feishu_app_secret)
def _is_admin_session(self) -> bool:
session = self._session()
return bool(session and self.report_service.is_admin(session["feishu_user_id"]))
def _require_admin(self) -> bool:
if self._is_admin_session():
return True
self._send(403, forbidden_page())
return False
def do_GET(self) -> None: def do_GET(self) -> None:
parsed = urlparse(self.path) parsed = urlparse(self.path)
query = parse_qs(parsed.query) query = parse_qs(parsed.query)
@ -260,9 +284,13 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
return return
if parsed.path == "/manager": if parsed.path == "/manager":
if not self._require_admin():
return
self._send(200, manager_page(query.get("date", [today_string()])[0])) self._send(200, manager_page(query.get("date", [today_string()])[0]))
return return
if parsed.path == "/api/reports": if parsed.path == "/api/reports":
if not self._require_admin():
return
self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0])) self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0]))
return return
if parsed.path == "/api/reports/history": if parsed.path == "/api/reports/history":
@ -273,6 +301,8 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self._json(400, {"error": str(error)}) self._json(400, {"error": str(error)})
return return
if parsed.path == "/api/reports/export": if parsed.path == "/api/reports/export":
if not self._require_admin():
return
report_date = query.get("date", [today_string()])[0] report_date = query.get("date", [today_string()])[0]
csv_text = self.report_service.export_reports_csv(report_date) csv_text = self.report_service.export_reports_csv(report_date)
self.send_response(200) self.send_response(200)
@ -294,14 +324,34 @@ class DailyReportHandler(BaseHTTPRequestHandler):
return return
if parsed.path == "/api/robot/send-reminder": if parsed.path == "/api/robot/send-reminder":
payload = create_reminder_payload(f"{self.config.base_url}/submit") payload = create_reminder_payload(f"{self.config.base_url}/submit")
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)}) self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return return
if parsed.path == "/api/robot/send-summary": if parsed.path == "/api/robot/send-summary":
body = self._read_json() body = self._read_json()
report_date = body.get("date") or today_string() report_date = body.get("date") or today_string()
summary = self.report_service.list_reports_for_date(report_date) summary = self.report_service.list_reports_for_date(report_date)
payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary) payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary)
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)}) self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return return
self._json(404, {"error": "not found"}) self._json(404, {"error": "not found"})
except (ValueError, json.JSONDecodeError) as error: except (ValueError, json.JSONDecodeError) as error:

View File

@ -4,6 +4,7 @@
"name": "Alice", "name": "Alice",
"department": "Operations", "department": "Operations",
"manager": "Manager", "manager": "Manager",
"role": "staff",
"active": true "active": true
}, },
{ {
@ -11,6 +12,7 @@
"name": "Bob", "name": "Bob",
"department": "Operations", "department": "Operations",
"manager": "Manager", "manager": "Manager",
"role": "staff",
"active": true "active": true
} }
] ]

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import json import json
import unittest import unittest
from daily_report.robot_service import create_reminder_payload, create_summary_payload from daily_report.robot_service import create_reminder_payload, create_summary_payload, create_webhook_sign
class RobotServiceTest(unittest.TestCase): class RobotServiceTest(unittest.TestCase):
@ -33,6 +33,12 @@ class RobotServiceTest(unittest.TestCase):
self.assertIn("Chen", text) self.assertIn("Chen", text)
self.assertIn("http://localhost:8787/manager", text) self.assertIn("http://localhost:8787/manager", text)
def test_creates_webhook_signature(self) -> None:
signature = create_webhook_sign("1599360473", "test-secret")
self.assertIsInstance(signature, str)
self.assertGreater(len(signature), 20)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -36,9 +36,10 @@ class ScheduledTest(unittest.TestCase):
captured = {} captured = {}
def fake_send(webhook_url: str, payload: dict) -> dict: def fake_send(webhook_url: str, payload: dict, secret: str = "") -> dict:
captured["webhook_url"] = webhook_url captured["webhook_url"] = webhook_url
captured["payload"] = payload captured["payload"] = payload
captured["secret"] = secret
return {"ok": True} return {"ok": True}
with patch("daily_report.config.__file__", str(fake_file)), patch.dict("os.environ", env, clear=True), patch( with patch("daily_report.config.__file__", str(fake_file)), patch.dict("os.environ", env, clear=True), patch(
@ -48,6 +49,7 @@ class ScheduledTest(unittest.TestCase):
self.assertEqual(result, {"ok": True}) self.assertEqual(result, {"ok": True})
self.assertEqual(captured["webhook_url"], "https://example.test/webhook") self.assertEqual(captured["webhook_url"], "https://example.test/webhook")
self.assertEqual(captured["secret"], "")
text = json.dumps(captured["payload"], ensure_ascii=False) text = json.dumps(captured["payload"], ensure_ascii=False)
self.assertIn("2026-05-07 日报提交汇总", text) self.assertIn("2026-05-07 日报提交汇总", text)
self.assertIn("0/2", text) self.assertIn("0/2", text)

View File

@ -13,11 +13,11 @@ from daily_report.feishu_auth import create_session_cookie
from daily_report.web import create_server from daily_report.web import create_server
def make_server() -> tuple: def make_server(employees: list[dict] | None = None) -> tuple:
temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-")) temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-"))
seed_path = temp_dir / "employees.json" seed_path = temp_dir / "employees.json"
seed_path.write_text( seed_path.write_text(
json.dumps([{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]), json.dumps(employees or [{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
encoding="utf-8", encoding="utf-8",
) )
config = Config( config = Config(
@ -45,6 +45,13 @@ def get(url: str) -> tuple[int, str]:
return response.status, response.read().decode("utf-8") return response.status, response.read().decode("utf-8")
def get_with_cookie(url: str, cookie: str) -> tuple[int, str]:
opener = urllib.request.build_opener()
opener.addheaders = [("Cookie", cookie)]
with opener.open(url, timeout=5) as response:
return response.status, response.read().decode("utf-8")
class WebTest(unittest.TestCase): class WebTest(unittest.TestCase):
def test_serves_submit_and_manager_pages(self) -> None: def test_serves_submit_and_manager_pages(self) -> None:
server, db, base_url = make_server() server, db, base_url = make_server()
@ -55,7 +62,20 @@ class WebTest(unittest.TestCase):
self.assertIn("1. \n2. \n3. \n4.", submit) self.assertIn("1. \n2. \n3. \n4.", submit)
self.assertIn("我的历史日报", submit) self.assertIn("我的历史日报", submit)
status, manager = get(f"{base_url}/manager") admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
status, manager = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200) self.assertEqual(status, 200)
self.assertIn("日报浏览", manager) self.assertIn("日报浏览", manager)
finally: finally:
@ -83,7 +103,20 @@ class WebTest(unittest.TestCase):
with urllib.request.urlopen(request, timeout=5) as response: with urllib.request.urlopen(request, timeout=5) as response:
self.assertEqual(response.status, 200) self.assertEqual(response.status, 200)
status, body = get(f"{base_url}/api/reports?date=2026-05-07") db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/api/reports?date=2026-05-07", admin_cookie)
summary = json.loads(body) summary = json.loads(body)
self.assertEqual(status, 200) self.assertEqual(status, 200)
self.assertEqual(summary["submittedCount"], 1) self.assertEqual(summary["submittedCount"], 1)
@ -122,6 +155,32 @@ class WebTest(unittest.TestCase):
server.server_close() server.server_close()
db.close() db.close()
def test_manager_page_requires_admin_role(self) -> None:
server, db, base_url = make_server(
[
{"feishu_user_id": "u_admin", "name": "Admin", "active": True, "role": "admin"},
{"feishu_user_id": "u_staff", "name": "Staff", "active": True, "role": "staff"},
]
)
try:
staff_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_staff", "name": "Staff"}, "session-secret"
)
with self.assertRaises(urllib.error.HTTPError) as error:
get_with_cookie(f"{base_url}/manager", staff_cookie)
self.assertEqual(error.exception.code, 403)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_admin", "name": "Admin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200)
self.assertIn("日报浏览", body)
finally:
server.shutdown()
server.server_close()
db.close()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()