Compare commits
2 Commits
ebe9d5684a
...
92adf268b6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92adf268b6 | ||
|
|
70e11dd1e7 |
@ -30,6 +30,7 @@ class Database:
|
||||
name TEXT NOT NULL,
|
||||
department TEXT NOT NULL DEFAULT '',
|
||||
manager TEXT NOT NULL DEFAULT '',
|
||||
role TEXT NOT NULL DEFAULT 'staff',
|
||||
active INTEGER NOT NULL DEFAULT 1,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
@ -50,6 +51,11 @@ class Database:
|
||||
);
|
||||
"""
|
||||
)
|
||||
try:
|
||||
self.connection.execute("ALTER TABLE employees ADD COLUMN role TEXT NOT NULL DEFAULT 'staff'")
|
||||
except sqlite3.OperationalError as error:
|
||||
if "duplicate column name" not in str(error).lower():
|
||||
raise
|
||||
self.connection.commit()
|
||||
|
||||
def load_employees(self, employee_seed_path: Path) -> None:
|
||||
@ -64,12 +70,13 @@ class Database:
|
||||
for employee in employees:
|
||||
self.connection.execute(
|
||||
"""
|
||||
INSERT INTO employees (feishu_user_id, name, department, manager, active, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
||||
INSERT INTO employees (feishu_user_id, name, department, manager, role, active, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT(feishu_user_id) DO UPDATE SET
|
||||
name = excluded.name,
|
||||
department = excluded.department,
|
||||
manager = excluded.manager,
|
||||
role = excluded.role,
|
||||
active = excluded.active,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""",
|
||||
@ -78,6 +85,7 @@ class Database:
|
||||
employee["name"],
|
||||
employee.get("department", ""),
|
||||
employee.get("manager", ""),
|
||||
employee.get("role", "staff"),
|
||||
0 if employee.get("active") is False else 1,
|
||||
),
|
||||
)
|
||||
@ -89,7 +97,7 @@ class Database:
|
||||
with self._lock:
|
||||
rows = self.connection.execute(
|
||||
"""
|
||||
SELECT feishu_user_id, name, department, manager, active
|
||||
SELECT feishu_user_id, name, department, manager, role, active
|
||||
FROM employees
|
||||
WHERE active = 1
|
||||
ORDER BY department, name
|
||||
@ -101,7 +109,7 @@ class Database:
|
||||
with self._lock:
|
||||
row = self.connection.execute(
|
||||
"""
|
||||
SELECT feishu_user_id, name, department, manager, active
|
||||
SELECT feishu_user_id, name, department, manager, role, active
|
||||
FROM employees
|
||||
WHERE feishu_user_id = ?
|
||||
""",
|
||||
|
||||
@ -98,3 +98,8 @@ class ReportService:
|
||||
"employee": employee,
|
||||
"reports": self.database.list_reports_for_employee(user_id, limit),
|
||||
}
|
||||
|
||||
def is_admin(self, feishu_user_id: str) -> bool:
|
||||
user_id = _required_text(feishu_user_id, "feishu_user_id")
|
||||
employee = self.database.find_employee(user_id)
|
||||
return bool(employee and employee.get("active") == 1 and employee.get("role") == "admin")
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
@ -54,13 +58,25 @@ def create_summary_payload(manager_url: str, summary: dict[str, Any]) -> dict[st
|
||||
}
|
||||
|
||||
|
||||
def send_webhook(webhook_url: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
def create_webhook_sign(timestamp: str, secret: str) -> str:
|
||||
string_to_sign = f"{timestamp}\n{secret}".encode("utf-8")
|
||||
sign = hmac.new(string_to_sign, digestmod=hashlib.sha256).digest()
|
||||
return base64.b64encode(sign).decode("utf-8")
|
||||
|
||||
|
||||
def send_webhook(webhook_url: str, payload: dict[str, Any], secret: str = "") -> dict[str, Any]:
|
||||
if not webhook_url:
|
||||
raise ValueError("FEISHU_WEBHOOK_URL is required")
|
||||
|
||||
request_payload = dict(payload)
|
||||
if secret:
|
||||
timestamp = str(int(time.time()))
|
||||
request_payload["timestamp"] = timestamp
|
||||
request_payload["sign"] = create_webhook_sign(timestamp, secret)
|
||||
|
||||
request = urllib.request.Request(
|
||||
webhook_url,
|
||||
data=json.dumps(payload).encode("utf-8"),
|
||||
data=json.dumps(request_payload).encode("utf-8"),
|
||||
headers={"content-type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
|
||||
@ -15,7 +15,7 @@ def send_reminder() -> dict:
|
||||
if not is_workday(date.today(), config.workday_calendar_path):
|
||||
return {"skipped": True, "reason": "not a workday"}
|
||||
payload = create_reminder_payload(f"{config.base_url}/submit")
|
||||
return send_webhook(config.feishu_webhook_url, payload)
|
||||
return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
|
||||
|
||||
|
||||
def send_summary(report_date: str | None = None) -> dict:
|
||||
@ -30,7 +30,7 @@ def send_summary(report_date: str | None = None) -> dict:
|
||||
selected_date_text = selected_date.isoformat()
|
||||
summary = service.list_reports_for_date(selected_date_text)
|
||||
payload = create_summary_payload(f"{config.base_url}/manager?date={selected_date_text}", summary)
|
||||
return send_webhook(config.feishu_webhook_url, payload)
|
||||
return send_webhook(config.feishu_webhook_url, payload, config.feishu_webhook_secret)
|
||||
finally:
|
||||
database.close()
|
||||
|
||||
|
||||
@ -170,6 +170,20 @@ def manager_page(current_date: str) -> bytes:
|
||||
)
|
||||
|
||||
|
||||
def forbidden_page() -> bytes:
|
||||
return page(
|
||||
"无权限",
|
||||
"""
|
||||
<main class="shell narrow">
|
||||
<section class="panel">
|
||||
<h1>无权限</h1>
|
||||
<p>你没有查看日报汇总的权限。请联系管理员开通。</p>
|
||||
<p><a href="/submit">返回日报填写页</a></p>
|
||||
</section>
|
||||
</main>""",
|
||||
)
|
||||
|
||||
|
||||
class DailyReportHandler(BaseHTTPRequestHandler):
|
||||
report_service: ReportService
|
||||
config: Config
|
||||
@ -209,6 +223,16 @@ class DailyReportHandler(BaseHTTPRequestHandler):
|
||||
def _oauth_enabled(self) -> bool:
|
||||
return bool(self.config.feishu_app_id and self.config.feishu_app_secret)
|
||||
|
||||
def _is_admin_session(self) -> bool:
|
||||
session = self._session()
|
||||
return bool(session and self.report_service.is_admin(session["feishu_user_id"]))
|
||||
|
||||
def _require_admin(self) -> bool:
|
||||
if self._is_admin_session():
|
||||
return True
|
||||
self._send(403, forbidden_page())
|
||||
return False
|
||||
|
||||
def do_GET(self) -> None:
|
||||
parsed = urlparse(self.path)
|
||||
query = parse_qs(parsed.query)
|
||||
@ -260,9 +284,13 @@ class DailyReportHandler(BaseHTTPRequestHandler):
|
||||
self.end_headers()
|
||||
return
|
||||
if parsed.path == "/manager":
|
||||
if not self._require_admin():
|
||||
return
|
||||
self._send(200, manager_page(query.get("date", [today_string()])[0]))
|
||||
return
|
||||
if parsed.path == "/api/reports":
|
||||
if not self._require_admin():
|
||||
return
|
||||
self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0]))
|
||||
return
|
||||
if parsed.path == "/api/reports/history":
|
||||
@ -273,6 +301,8 @@ class DailyReportHandler(BaseHTTPRequestHandler):
|
||||
self._json(400, {"error": str(error)})
|
||||
return
|
||||
if parsed.path == "/api/reports/export":
|
||||
if not self._require_admin():
|
||||
return
|
||||
report_date = query.get("date", [today_string()])[0]
|
||||
csv_text = self.report_service.export_reports_csv(report_date)
|
||||
self.send_response(200)
|
||||
@ -294,14 +324,34 @@ class DailyReportHandler(BaseHTTPRequestHandler):
|
||||
return
|
||||
if parsed.path == "/api/robot/send-reminder":
|
||||
payload = create_reminder_payload(f"{self.config.base_url}/submit")
|
||||
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)})
|
||||
self._json(
|
||||
200,
|
||||
{
|
||||
"ok": True,
|
||||
"result": send_webhook(
|
||||
self.config.feishu_webhook_url,
|
||||
payload,
|
||||
self.config.feishu_webhook_secret,
|
||||
),
|
||||
},
|
||||
)
|
||||
return
|
||||
if parsed.path == "/api/robot/send-summary":
|
||||
body = self._read_json()
|
||||
report_date = body.get("date") or today_string()
|
||||
summary = self.report_service.list_reports_for_date(report_date)
|
||||
payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary)
|
||||
self._json(200, {"ok": True, "result": send_webhook(self.config.feishu_webhook_url, payload)})
|
||||
self._json(
|
||||
200,
|
||||
{
|
||||
"ok": True,
|
||||
"result": send_webhook(
|
||||
self.config.feishu_webhook_url,
|
||||
payload,
|
||||
self.config.feishu_webhook_secret,
|
||||
),
|
||||
},
|
||||
)
|
||||
return
|
||||
self._json(404, {"error": "not found"})
|
||||
except (ValueError, json.JSONDecodeError) as error:
|
||||
|
||||
@ -4,6 +4,7 @@
|
||||
"name": "Alice",
|
||||
"department": "Operations",
|
||||
"manager": "Manager",
|
||||
"role": "staff",
|
||||
"active": true
|
||||
},
|
||||
{
|
||||
@ -11,6 +12,7 @@
|
||||
"name": "Bob",
|
||||
"department": "Operations",
|
||||
"manager": "Manager",
|
||||
"role": "staff",
|
||||
"active": true
|
||||
}
|
||||
]
|
||||
|
||||
@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from daily_report.robot_service import create_reminder_payload, create_summary_payload
|
||||
from daily_report.robot_service import create_reminder_payload, create_summary_payload, create_webhook_sign
|
||||
|
||||
|
||||
class RobotServiceTest(unittest.TestCase):
|
||||
@ -33,6 +33,12 @@ class RobotServiceTest(unittest.TestCase):
|
||||
self.assertIn("Chen", text)
|
||||
self.assertIn("http://localhost:8787/manager", text)
|
||||
|
||||
def test_creates_webhook_signature(self) -> None:
|
||||
signature = create_webhook_sign("1599360473", "test-secret")
|
||||
|
||||
self.assertIsInstance(signature, str)
|
||||
self.assertGreater(len(signature), 20)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@ -36,9 +36,10 @@ class ScheduledTest(unittest.TestCase):
|
||||
|
||||
captured = {}
|
||||
|
||||
def fake_send(webhook_url: str, payload: dict) -> dict:
|
||||
def fake_send(webhook_url: str, payload: dict, secret: str = "") -> dict:
|
||||
captured["webhook_url"] = webhook_url
|
||||
captured["payload"] = payload
|
||||
captured["secret"] = secret
|
||||
return {"ok": True}
|
||||
|
||||
with patch("daily_report.config.__file__", str(fake_file)), patch.dict("os.environ", env, clear=True), patch(
|
||||
@ -48,6 +49,7 @@ class ScheduledTest(unittest.TestCase):
|
||||
|
||||
self.assertEqual(result, {"ok": True})
|
||||
self.assertEqual(captured["webhook_url"], "https://example.test/webhook")
|
||||
self.assertEqual(captured["secret"], "")
|
||||
text = json.dumps(captured["payload"], ensure_ascii=False)
|
||||
self.assertIn("2026-05-07 日报提交汇总", text)
|
||||
self.assertIn("0/2", text)
|
||||
|
||||
@ -13,11 +13,11 @@ from daily_report.feishu_auth import create_session_cookie
|
||||
from daily_report.web import create_server
|
||||
|
||||
|
||||
def make_server() -> tuple:
|
||||
def make_server(employees: list[dict] | None = None) -> tuple:
|
||||
temp_dir = Path(tempfile.mkdtemp(prefix="daily-report-web-"))
|
||||
seed_path = temp_dir / "employees.json"
|
||||
seed_path.write_text(
|
||||
json.dumps([{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
|
||||
json.dumps(employees or [{"feishu_user_id": "u_1", "name": "Lin", "department": "Design", "active": True}]),
|
||||
encoding="utf-8",
|
||||
)
|
||||
config = Config(
|
||||
@ -45,6 +45,13 @@ def get(url: str) -> tuple[int, str]:
|
||||
return response.status, response.read().decode("utf-8")
|
||||
|
||||
|
||||
def get_with_cookie(url: str, cookie: str) -> tuple[int, str]:
|
||||
opener = urllib.request.build_opener()
|
||||
opener.addheaders = [("Cookie", cookie)]
|
||||
with opener.open(url, timeout=5) as response:
|
||||
return response.status, response.read().decode("utf-8")
|
||||
|
||||
|
||||
class WebTest(unittest.TestCase):
|
||||
def test_serves_submit_and_manager_pages(self) -> None:
|
||||
server, db, base_url = make_server()
|
||||
@ -55,7 +62,20 @@ class WebTest(unittest.TestCase):
|
||||
self.assertIn("1. \n2. \n3. \n4.", submit)
|
||||
self.assertIn("我的历史日报", submit)
|
||||
|
||||
status, manager = get(f"{base_url}/manager")
|
||||
admin_cookie = "daily_report_session=" + create_session_cookie(
|
||||
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
|
||||
)
|
||||
db.upsert_employee(
|
||||
{
|
||||
"feishu_user_id": "u_1",
|
||||
"name": "Lin",
|
||||
"department": "Design",
|
||||
"manager": "",
|
||||
"active": True,
|
||||
"role": "admin",
|
||||
}
|
||||
)
|
||||
status, manager = get_with_cookie(f"{base_url}/manager", admin_cookie)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertIn("日报浏览", manager)
|
||||
finally:
|
||||
@ -83,7 +103,20 @@ class WebTest(unittest.TestCase):
|
||||
with urllib.request.urlopen(request, timeout=5) as response:
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
status, body = get(f"{base_url}/api/reports?date=2026-05-07")
|
||||
db.upsert_employee(
|
||||
{
|
||||
"feishu_user_id": "u_1",
|
||||
"name": "Lin",
|
||||
"department": "Design",
|
||||
"manager": "",
|
||||
"active": True,
|
||||
"role": "admin",
|
||||
}
|
||||
)
|
||||
admin_cookie = "daily_report_session=" + create_session_cookie(
|
||||
{"feishu_user_id": "u_1", "name": "Lin"}, "session-secret"
|
||||
)
|
||||
status, body = get_with_cookie(f"{base_url}/api/reports?date=2026-05-07", admin_cookie)
|
||||
summary = json.loads(body)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(summary["submittedCount"], 1)
|
||||
@ -122,6 +155,32 @@ class WebTest(unittest.TestCase):
|
||||
server.server_close()
|
||||
db.close()
|
||||
|
||||
def test_manager_page_requires_admin_role(self) -> None:
|
||||
server, db, base_url = make_server(
|
||||
[
|
||||
{"feishu_user_id": "u_admin", "name": "Admin", "active": True, "role": "admin"},
|
||||
{"feishu_user_id": "u_staff", "name": "Staff", "active": True, "role": "staff"},
|
||||
]
|
||||
)
|
||||
try:
|
||||
staff_cookie = "daily_report_session=" + create_session_cookie(
|
||||
{"feishu_user_id": "u_staff", "name": "Staff"}, "session-secret"
|
||||
)
|
||||
with self.assertRaises(urllib.error.HTTPError) as error:
|
||||
get_with_cookie(f"{base_url}/manager", staff_cookie)
|
||||
self.assertEqual(error.exception.code, 403)
|
||||
|
||||
admin_cookie = "daily_report_session=" + create_session_cookie(
|
||||
{"feishu_user_id": "u_admin", "name": "Admin"}, "session-secret"
|
||||
)
|
||||
status, body = get_with_cookie(f"{base_url}/manager", admin_cookie)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertIn("日报浏览", body)
|
||||
finally:
|
||||
server.shutdown()
|
||||
server.server_close()
|
||||
db.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user