Compare commits

...

2 Commits

Author SHA1 Message Date
Codex
92adf268b6 feat: restrict manager access to admins 2026-05-07 19:06:50 +08:00
Codex
70e11dd1e7 fix: sign Feishu webhook messages 2026-05-07 18:12:43 +08:00
9 changed files with 164 additions and 16 deletions

View File

@ -30,6 +30,7 @@ class Database:
name TEXT NOT NULL,
department TEXT NOT NULL DEFAULT '',
manager TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT 'staff',
active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
@ -50,6 +51,11 @@ class Database:
);
"""
)
try:
self.connection.execute("ALTER TABLE employees ADD COLUMN role TEXT NOT NULL DEFAULT 'staff'")
except sqlite3.OperationalError as error:
if "duplicate column name" not in str(error).lower():
raise
self.connection.commit()
def load_employees(self, employee_seed_path: Path) -> None:
@ -64,12 +70,13 @@ class Database:
for employee in employees:
self.connection.execute(
"""
INSERT INTO employees (feishu_user_id, name, department, manager, active, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
INSERT INTO employees (feishu_user_id, name, department, manager, role, active, updated_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(feishu_user_id) DO UPDATE SET
name = excluded.name,
department = excluded.department,
manager = excluded.manager,
role = excluded.role,
active = excluded.active,
updated_at = CURRENT_TIMESTAMP
""",
@ -78,6 +85,7 @@ class Database:
employee["name"],
employee.get("department", ""),
employee.get("manager", ""),
employee.get("role", "staff"),
0 if employee.get("active") is False else 1,
),
)
@ -89,7 +97,7 @@ class Database:
with self._lock:
rows = self.connection.execute(
"""
SELECT feishu_user_id, name, department, manager, active
SELECT feishu_user_id, name, department, manager, role, active
FROM employees
WHERE active = 1
ORDER BY department, name
@ -101,7 +109,7 @@ class Database:
with self._lock:
row = self.connection.execute(
"""
SELECT feishu_user_id, name, department, manager, active
SELECT feishu_user_id, name, department, manager, role, active
FROM employees
WHERE feishu_user_id = ?
""",

View File

@ -98,3 +98,8 @@ class ReportService:
"employee": employee,
"reports": self.database.list_reports_for_employee(user_id, limit),
}
def is_admin(self, feishu_user_id: str) -> bool:
user_id = _required_text(feishu_user_id, "feishu_user_id")
employee = self.database.find_employee(user_id)
return bool(employee and employee.get("active") == 1 and employee.get("role") == "admin")

View File

@ -1,6 +1,10 @@
from __future__ import annotations
import json
import base64
import hashlib
import hmac
import time
import urllib.error
import urllib.request
from typing import Any
@ -54,13 +58,25 @@ def create_summary_payload(manager_url: str, summary: dict[str, Any]) -> dict[st
}
def send_webhook(webhook_url: str, payload: dict[str, Any]) -> dict[str, Any]:
def create_webhook_sign(timestamp: str, secret: str) -> str:
string_to_sign = f"{timestamp}\n{secret}".encode("utf-8")
sign = hmac.new(string_to_sign, digestmod=hashlib.sha256).digest()
return base64.b64encode(sign).decode("utf-8")
def send_webhook(webhook_url: str, payload: dict[str, Any], secret: str = "") -> dict[str, Any]:
if not webhook_url:
raise ValueError("FEISHU_WEBHOOK_URL is required")
request_payload = dict(payload)
if secret:
timestamp = str(int(time.time()))
request_payload["timestamp"] = timestamp
request_payload["sign"] = create_webhook_sign(timestamp, secret)
request = urllib.request.Request(
webhook_url,
data=json.dumps(payload).encode("utf-8"),
data=json.dumps(request_payload).encode("utf-8"),
headers={"content-type": "application/json"},
method="POST",
)

View File

@ -15,7 +15,7 @@ def send_reminder() -> dict:
if not is_workday(date.today(), config.workday_calendar_path):
return {"skipped": True, "reason": "not a workday"}
payload = create_reminder_payload(f"{config.base_url}/submit")
return send_webhook(config.feishu_webhook_url, payload)
return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
def send_summary(report_date: str | None = None) -> dict:
@ -30,7 +30,7 @@ def send_summary(report_date: str | None = None) -> dict:
selected_date_text = selected_date.isoformat()
summary = service.list_reports_for_date(selected_date_text)
payload = create_summary_payload(f"{config.base_url}/manager?date={selected_date_text}", summary)
return send_webhook(config.feishu_webhook_url, payload)
return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
finally:
database.close()

View File

@ -170,6 +170,20 @@ def manager_page(current_date: str) -> bytes:
)
def forbidden_page() -> bytes:
return page(
"无权限",
"""
<main class="shell narrow">
<section class="panel">
<h1>无权限</h1>
<p>你没有查看日报汇总的权限请联系管理员开通</p>
<p><a href="/submit">返回日报填写页</a></p>
</section>
</main>""",
)
class DailyReportHandler(BaseHTTPRequestHandler):
report_service: ReportService
config: Config
@ -209,6 +223,16 @@ class DailyReportHandler(BaseHTTPRequestHandler):
def _oauth_enabled(self) -> bool:
return bool(self.config.feishu_app_id and self.config.feishu_app_secret)
def _is_admin_session(self) -> bool:
session = self._session()
return bool(session and self.report_service.is_admin(session["feishu_user_id"]))
def _require_admin(self) -> bool:
if self._is_admin_session():
return True
self._send(403, forbidden_page())
return False
def do_GET(self) -> None:
parsed = urlparse(self.path)
query = parse_qs(parsed.query)
@ -260,9 +284,13 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self.end_headers()
return
if parsed.path == "/manager":
if not self._require_admin():
return
self._send(200, manager_page(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports":
if not self._require_admin():
return
self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports/history":
@ -273,6 +301,8 @@ class DailyReportHandler(BaseHTTPRequestHandler):
self._json(400, {"error": str(error)})
return
if parsed.path == "/api/reports/export":
if not self._require_admin():
return
report_date = query.get("date", [today_string()])[0]
csv_text = self.report_service.export_reports_csv(report_date)
self.send_response(200)
@ -294,14 +324,34 @@ class DailyReportHandler(BaseHTTPRequestHandler):
return
if parsed.path == "/api/robot/send-reminder":
payload = create_reminder_payload(f"{self.config.base_url}/submit")
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)})
self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return
if parsed.path == "/api/robot/send-summary":
body = self._read_json()
report_date = body.get("date") or today_string()
summary = self.report_service.list_reports_for_date(report_date)
payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary)
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)})
self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return
self._json(404, {"error": "not found"})
except (ValueError, json.JSONDecodeError) as error:

View File

@ -4,6 +4,7 @@
"name": "Alice",
"department": "Operations",
"manager": "Manager",
"role": "staff",
"active": true
},
{
@ -11,6 +12,7 @@
"name": "Bob",
"department": "Operations",
"manager": "Manager",
"role": "staff",
"active": true
}
]

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import json
import unittest
from daily_report.robot_service import create_reminder_payload, create_summary_payload
from daily_report.robot_service import create_reminder_payload, create_summary_payload, create_webhook_sign
class RobotServiceTest(unittest.TestCase):
@ -33,6 +33,12 @@ class RobotServiceTest(unittest.TestCase):
self.assertIn("Chen", text)
self.assertIn("http://localhost:8787/manager", text)
def test_creates_webhook_signature(self) -> None:
signature = create_webhook_sign("1599360473", "test-secret")
self.assertIsInstance(signature, str)
self.assertGreater(len(signature), 20)
if __name__ == "__main__":
unittest.main()

View File

@ -36,9 +36,10 @@ class ScheduledTest(unittest.TestCase):
captured = {}
def fake_send(webhook_url: str, payload: dict) -> dict:
def fake_send(webhook_url: str, payload: dict, secret: str = "") -> dict:
captured["webhook_url"] = webhook_url
captured["payload"] = payload
captured["secret"] = secret
return {"ok": True}
with patch("daily_report.config.__file__", str(fake_file)), patch.dict("os.environ", env, clear=True), patch(
@ -48,6 +49,7 @@ class ScheduledTest(unittest.TestCase):
self.assertEqual(result, {"ok": True})
self.assertEqual(captured["webhook_url"], "https://example.test/webhook")
self.assertEqual(captured["secret"], "")
text = json.dumps(captured["payload"], ensure_ascii=False)
self.assertIn("2026-05-07 日报提交汇总", text)
self.assertIn("0/2", text)

View File

@ -13,11 +13,11 @@ from daily_report.feishu_auth import create_session_cookie
from daily_report.web import create_server
def make_server() -> tuple:
def make_server(employees: list[dict] | None = None) -> tuple:
temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-"))
seed_path = temp_dir / "employees.json"
seed_path.write_text(
json.dumps([{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
json.dumps(employees or [{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
encoding="utf-8",
)
config = Config(
@ -45,6 +45,13 @@ def get(url: str) -> tuple[int, str]:
return response.status, response.read().decode("utf-8")
def get_with_cookie(url: str, cookie: str) -> tuple[int, str]:
opener = urllib.request.build_opener()
opener.addheaders = [("Cookie", cookie)]
with opener.open(url, timeout=5) as response:
return response.status, response.read().decode("utf-8")
class WebTest(unittest.TestCase):
def test_serves_submit_and_manager_pages(self) -> None:
server, db, base_url = make_server()
@ -55,7 +62,20 @@ class WebTest(unittest.TestCase):
self.assertIn("1. \n2. \n3. \n4.", submit)
self.assertIn("我的历史日报", submit)
status, manager = get(f"{base_url}/manager")
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
status, manager = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200)
self.assertIn("日报浏览", manager)
finally:
@ -83,7 +103,20 @@ class WebTest(unittest.TestCase):
with urllib.request.urlopen(request, timeout=5) as response:
self.assertEqual(response.status, 200)
status, body = get(f"{base_url}/api/reports?date=2026-05-07")
db.upsert_employee(
{
"feishu_user_id": "u_1",
"name": "Lin",
"department": "Design",
"manager": "",
"active": True,
"role": "admin",
}
)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/api/reports?date=2026-05-07", admin_cookie)
summary = json.loads(body)
self.assertEqual(status, 200)
self.assertEqual(summary["submittedCount"], 1)
@ -122,6 +155,32 @@ class WebTest(unittest.TestCase):
server.server_close()
db.close()
def test_manager_page_requires_admin_role(self) -> None:
server, db, base_url = make_server(
[
{"feishu_user_id": "u_admin", "name": "Admin", "active": True, "role": "admin"},
{"feishu_user_id": "u_staff", "name": "Staff", "active": True, "role": "staff"},
]
)
try:
staff_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_staff", "name": "Staff"}, "session-secret"
)
with self.assertRaises(urllib.error.HTTPError) as error:
get_with_cookie(f"{base_url}/manager", staff_cookie)
self.assertEqual(error.exception.code, 403)
admin_cookie = "daily_report_session=" + create_session_cookie(
{"feishu_user_id": "u_admin", "name": "Admin"}, "session-secret"
)
status, body = get_with_cookie(f"{base_url}/manager", admin_cookie)
self.assertEqual(status, 200)
self.assertIn("日报浏览", body)
finally:
server.shutdown()
server.server_close()
db.close()
if __name__ == "__main__":
unittest.main()