2026-05-07 22:39:58 +08:00

540 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from datetime import date
from html import escape
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from .config import Config, read_config
from .db import Database
from .feishu_auth import (
authorize_url,
create_session_cookie,
create_state,
exchange_code_for_user_access_token,
fetch_user_info,
parse_session_cookie,
)
from .report_service import ReportService
from .robot_service import create_reminder_payload, create_summary_payload, send_webhook
def today_string() -> str:
return date.today().isoformat()
def page(title: str, body: str, scripts: str = "") -> bytes:
html = f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{escape(title)}</title>
<link rel="stylesheet" href="/static/styles.css">
</head>
<body>
{body}
{scripts}
</body>
</html>"""
return html.encode("utf-8")
def submit_page(current_date: str, session: dict[str, str] | None = None) -> bytes:
if session:
identity_html = f"""
<div class="identity-box">
<span>当前填写人</span>
<strong>{escape(session.get("name", session["feishu_user_id"]))}</strong>
</div>
<input id="feishu-user-id" type="hidden" name="feishu_user_id" value="{escape(session["feishu_user_id"])}">
"""
history_hint = "系统已自动识别你的飞书身份,这里会显示你最近提交过的日报。"
else:
identity_html = '<label>员工 ID<input id="feishu-user-id" name="feishu_user_id" required placeholder="例如 u_alice"></label>'
history_hint = "输入员工 ID 后,可以查看自己最近提交过的日报。"
return page(
"每日报告",
f"""
<main class="shell narrow">
<h1>每日工作汇报</h1>
<form id="report-form" class="panel">
{identity_html}
<label>日期<input id="report-date" name="report_date" type="date" required value="{escape(current_date)}"></label>
<fieldset class="status-options">
<legend>今日状态</legend>
<label><input type="radio" name="report_status" value="normal" checked> 正常</label>
<label><input type="radio" name="report_status" value="risk"> 有风险</label>
<label><input type="radio" name="report_status" value="need_help"> 需要支持</label>
</fieldset>
<section class="item-group">
<div class="item-group-head">
<h2>今日完成</h2>
<button class="add-item" type="button" data-add-list="today_done" aria-label="添加今日完成">+</button>
</div>
<div data-list="today_done"></div>
</section>
<section class="item-group">
<div class="item-group-head">
<h2>明日计划</h2>
<button class="add-item" type="button" data-add-list="tomorrow_plan" aria-label="添加明日计划">+</button>
</div>
<div data-list="tomorrow_plan"></div>
</section>
<label>遇到的问题<textarea name="blockers" rows="3" placeholder="没有可以不填"></textarea></label>
<label>需要协助<textarea name="help_needed" rows="3" placeholder="没有可以不填"></textarea></label>
<button type="submit">提交日报</button>
<p id="form-message" class="message"></p>
</form>
<section id="previous-plan" class="panel previous-plan" hidden>
<div>
<h2>上次“明日计划”参考</h2>
<p id="previous-plan-date"></p>
</div>
<pre id="previous-plan-content"></pre>
<button id="use-previous-plan" type="button">填入今日完成</button>
</section>
<section class="panel history-panel">
<div class="history-head">
<div>
<h2>我的历史日报</h2>
<p>{history_hint}</p>
</div>
<button id="load-history" type="button">刷新历史</button>
</div>
<div id="history-list" class="history-list">暂无历史记录。</div>
</section>
</main>
<script>
const statusLabels = {{
normal: "正常",
risk: "有风险",
need_help: "需要支持"
}};
function escapeHtml(value) {{
return String(value || "").replace(/[&<>"']/g, (char) => ({{
"&": "&amp;",
"<": "&lt;",
">": "&gt;",
'"': "&quot;",
"'": "&#39;"
}}[char]));
}}
function collectItems(name) {{
const values = Array.from(document.querySelectorAll(`[data-list="${{name}}"] input`))
.map((input) => input.value.trim())
.filter(Boolean);
return values.map((value, index) => `${{index + 1}}. ${{value}}`).join("\\n");
}}
function renumberItems(name) {{
Array.from(document.querySelectorAll(`[data-list="${{name}}"] .item-row`)).forEach((row, index) => {{
row.querySelector(".item-index").textContent = `${{index + 1}}.`;
row.querySelector("input").placeholder = `第 ${{index + 1}} 条`;
}});
}}
function addItem(name, value = "") {{
const list = document.querySelector(`[data-list="${{name}}"]`);
const row = document.createElement("div");
row.className = "item-row";
row.innerHTML = `
<span class="item-index"></span>
<input value="${{escapeHtml(value)}}" autocomplete="off">
<button class="remove-item" type="button" aria-label="删除这一条">×</button>
`;
row.querySelector(".remove-item").addEventListener("click", () => {{
if (list.querySelectorAll(".item-row").length <= 1) {{
row.querySelector("input").value = "";
return;
}}
row.remove();
renumberItems(name);
}});
list.appendChild(row);
renumberItems(name);
}}
function resetItems(name, values = []) {{
const list = document.querySelector(`[data-list="${{name}}"]`);
list.innerHTML = "";
const items = values.length ? values : ["", "", ""];
items.forEach((value) => addItem(name, value));
}}
function fillItemInputs(name, text) {{
const lines = String(text || "")
.split(/\\n+/)
.map((line) => line.replace(/^\\s*\\d+[\\.、)]\\s*/, "").trim())
.filter(Boolean);
resetItems(name, lines);
}}
function renderHistory(data) {{
const container = document.querySelector("#history-list");
if (!data.reports.length) {{
container.innerHTML = "<p>还没有历史日报。</p>";
return;
}}
container.innerHTML = data.reports.map((report) => `
<article class="history-card">
<h3>${{escapeHtml(report.report_date)}} · ${{statusLabels[report.report_status] || "正常"}}</h3>
<div><strong>今日完成</strong><pre>${{escapeHtml(report.today_done)}}</pre></div>
<div><strong>明日计划</strong><pre>${{escapeHtml(report.tomorrow_plan)}}</pre></div>
<div><strong>问题</strong><pre>${{escapeHtml(report.blockers || "")}}</pre></div>
<div><strong>协助</strong><pre>${{escapeHtml(report.help_needed || "")}}</pre></div>
</article>
`).join("");
}}
async function loadHistory() {{
const userId = document.querySelector("#feishu-user-id").value.trim();
const container = document.querySelector("#history-list");
if (!userId) {{
container.textContent = "请先填写员工 ID。";
return;
}}
const response = await fetch(`/api/reports/history?feishu_user_id=${{encodeURIComponent(userId)}}`);
if (response.ok) {{
renderHistory(await response.json());
}} else {{
const result = await response.json();
container.textContent = result.error || "历史记录加载失败。";
}}
}}
async function loadPreviousPlan() {{
const userId = document.querySelector("#feishu-user-id").value.trim();
const reportDate = document.querySelector("#report-date").value;
const panel = document.querySelector("#previous-plan");
if (!userId || !reportDate) {{
panel.hidden = true;
return;
}}
const response = await fetch(`/api/reports/previous?feishu_user_id=${{encodeURIComponent(userId)}}&date=${{encodeURIComponent(reportDate)}}`);
if (!response.ok) {{
panel.hidden = true;
return;
}}
const result = await response.json();
if (!result.report) {{
panel.hidden = true;
return;
}}
document.querySelector("#previous-plan-date").textContent = `${{result.report.report_date}} 提交`;
document.querySelector("#previous-plan-content").textContent = result.report.tomorrow_plan;
panel.hidden = false;
}}
document.querySelector("#load-history").addEventListener("click", loadHistory);
document.querySelector("#report-date").addEventListener("change", loadPreviousPlan);
document.querySelectorAll("[data-add-list]").forEach((button) => {{
button.addEventListener("click", () => {{
addItem(button.dataset.addList);
}});
}});
document.querySelector("#use-previous-plan").addEventListener("click", () => {{
fillItemInputs("today_done", document.querySelector("#previous-plan-content").textContent);
}});
document.querySelector("#report-form").addEventListener("submit", async (event) => {{
event.preventDefault();
const data = Object.fromEntries(new FormData(event.currentTarget).entries());
data.today_done = collectItems("today_done");
data.tomorrow_plan = collectItems("tomorrow_plan");
const response = await fetch("/api/reports", {{
method: "POST",
headers: {{ "content-type": "application/json" }},
body: JSON.stringify(data)
}});
const message = document.querySelector("#form-message");
if (response.ok) {{
message.textContent = "提交成功。";
message.className = "message success";
loadHistory();
loadPreviousPlan();
}} else {{
const result = await response.json();
message.textContent = result.error || "提交失败。";
message.className = "message error";
}}
}});
resetItems("today_done");
resetItems("tomorrow_plan");
loadHistory();
loadPreviousPlan();
</script>""",
)
def manager_page(current_date: str) -> bytes:
return page(
"日报浏览",
f"""
<main class="shell">
<header class="topbar">
<div>
<h1>日报浏览</h1>
<p>按日期、员工和关键词浏览所有日报记录。</p>
</div>
<button id="copy-summary">复制今日汇总</button>
</header>
<section class="filters">
<input id="date-filter" type="date" value="{escape(current_date)}">
<input id="employee-filter" placeholder="筛选员工">
<input id="keyword-filter" placeholder="搜索日报内容">
<label class="check"><input id="blocker-filter" type="checkbox"> 只看问题/协助</label>
<a id="export-link" href="/api/reports/export?date={escape(current_date)}">导出 CSV</a>
</section>
<section id="stats" class="stats"></section>
<section id="reports" class="report-list"></section>
<section>
<h2>未提交人员</h2>
<div id="missing" class="missing"></div>
</section>
</main>""",
'<script src="/static/manager.js"></script>',
)
def forbidden_page() -> bytes:
return page(
"无权限",
"""
<main class="shell narrow">
<section class="panel">
<h1>无权限</h1>
<p>你没有查看日报汇总的权限,请联系管理员开通。</p>
<p><a href="/submit">返回日报填写页</a></p>
</section>
</main>""",
)
class DailyReportHandler(BaseHTTPRequestHandler):
report_service: ReportService
config: Config
static_dir: Path
def log_message(self, format: str, *args: object) -> None:
return
def _send(self, status: int, body: bytes, content_type: str = "text/html; charset=utf-8") -> None:
self.send_response(status)
self.send_header("content-type", content_type)
self.send_header("content-length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _json(self, status: int, data: dict) -> None:
self._send(status, json.dumps(data, ensure_ascii=False).encode("utf-8"), "application/json; charset=utf-8")
def _read_json(self) -> dict:
length = int(self.headers.get("content-length", "0"))
if length == 0:
return {}
return json.loads(self.rfile.read(length).decode("utf-8"))
def _cookie(self, name: str) -> str | None:
for item in self.headers.get("cookie", "").split(";"):
if "=" not in item:
continue
key, value = item.strip().split("=", 1)
if key == name:
return value
return None
def _session(self) -> dict[str, str] | None:
return parse_session_cookie(self._cookie("daily_report_session"), self.config.session_secret)
def _oauth_enabled(self) -> bool:
return bool(self.config.feishu_app_id and self.config.feishu_app_secret)
def _is_admin_session(self) -> bool:
session = self._session()
return bool(session and self.report_service.is_admin(session["feishu_user_id"]))
def _require_admin(self) -> bool:
if self._is_admin_session():
return True
self._send(403, forbidden_page())
return False
def do_GET(self) -> None:
parsed = urlparse(self.path)
query = parse_qs(parsed.query)
if parsed.path == "/":
self.send_response(302)
self.send_header("location", "/manager")
self.end_headers()
return
if parsed.path == "/submit":
session = self._session()
if self._oauth_enabled() and not session:
self.send_response(302)
self.send_header("location", "/auth/feishu/start")
self.end_headers()
return
self._send(200, submit_page(today_string(), session))
return
if parsed.path == "/auth/feishu/start":
state = create_state()
redirect_uri = f"{self.config.base_url}/auth/feishu/callback"
self.send_response(302)
self.send_header("set-cookie", f"daily_report_oauth_state={state}; Path=/; HttpOnly; SameSite=Lax")
self.send_header("location", authorize_url(self.config.feishu_app_id, redirect_uri, state))
self.end_headers()
return
if parsed.path == "/auth/feishu/callback":
state = query.get("state", [""])[0]
code = query.get("code", [""])[0]
if not code or not state or state != self._cookie("daily_report_oauth_state"):
self._json(400, {"error": "invalid feishu oauth callback"})
return
redirect_uri = f"{self.config.base_url}/auth/feishu/callback"
token = exchange_code_for_user_access_token(
self.config.feishu_app_id,
self.config.feishu_app_secret,
code,
redirect_uri,
)
employee = fetch_user_info(token)
self.report_service.database.upsert_employee(employee)
cookie = create_session_cookie(
{"feishu_user_id": employee["feishu_user_id"], "name": employee["name"]},
self.config.session_secret,
)
self.send_response(302)
self.send_header("set-cookie", f"daily_report_session={cookie}; Path=/; HttpOnly; SameSite=Lax")
self.send_header("set-cookie", "daily_report_oauth_state=; Path=/; Max-Age=0")
self.send_header("location", "/submit")
self.end_headers()
return
if parsed.path == "/manager":
if not self._require_admin():
return
self._send(200, manager_page(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports":
if not self._require_admin():
return
self._json(200, self.report_service.list_reports_for_date(query.get("date", [today_string()])[0]))
return
if parsed.path == "/api/reports/history":
user_id = query.get("feishu_user_id", [""])[0]
try:
self._json(200, self.report_service.list_employee_history(user_id))
except ValueError as error:
self._json(400, {"error": str(error)})
return
if parsed.path == "/api/reports/previous":
user_id = query.get("feishu_user_id", [""])[0]
report_date = query.get("date", [today_string()])[0]
try:
self._json(200, self.report_service.previous_report_reference(user_id, report_date))
except ValueError as error:
self._json(400, {"error": str(error)})
return
if parsed.path == "/api/reports/export":
if not self._require_admin():
return
report_date = query.get("date", [today_string()])[0]
csv_text = self.report_service.export_reports_csv(report_date)
self.send_response(200)
self.send_header("content-type", "text/csv; charset=utf-8")
self.send_header("content-disposition", f'attachment; filename="daily-reports-{report_date}.csv"')
self.end_headers()
self.wfile.write(csv_text.encode("utf-8-sig"))
return
if parsed.path.startswith("/static/"):
self._serve_static(parsed.path.removeprefix("/static/"))
return
self._json(404, {"error": "not found"})
def do_POST(self) -> None:
parsed = urlparse(self.path)
try:
if parsed.path == "/api/reports":
self._json(200, {"report": self.report_service.upsert_report(self._read_json())})
return
if parsed.path == "/api/robot/send-reminder":
payload = create_reminder_payload(f"{self.config.base_url}/submit")
self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return
if parsed.path == "/api/robot/send-summary":
body = self._read_json()
report_date = body.get("date") or today_string()
summary = self.report_service.list_reports_for_date(report_date)
payload = create_summary_payload(f"{self.config.base_url}/manager?date={report_date}", summary)
self._json(
200,
{
"ok": True,
"result": send_webhook(
self.config.feishu_webhook_url,
payload,
self.config.feishu_webhook_secret,
),
},
)
return
self._json(404, {"error": "not found"})
except (ValueError, json.JSONDecodeError) as error:
self._json(400, {"error": str(error)})
except Exception as error:
self._json(500, {"error": str(error)})
def _serve_static(self, name: str) -> None:
safe_name = name.replace("\\", "/").split("/")[-1]
file_path = self.static_dir / safe_name
if not file_path.exists():
self._json(404, {"error": "not found"})
return
content_type = "application/javascript; charset=utf-8" if file_path.suffix == ".js" else "text/css; charset=utf-8"
self._send(200, file_path.read_bytes(), content_type)
def create_server(config: Config, database: Database | None = None) -> ThreadingHTTPServer:
db = database or Database(config.database_path, config.employee_seed_path)
service = ReportService(db)
class Handler(DailyReportHandler):
report_service = service
static_dir = config.root_dir / "daily_report" / "static"
Handler.config = config
server = ThreadingHTTPServer(("", config.port), Handler)
server.database = db # type: ignore[attr-defined]
return server
def main() -> None:
config = read_config()
server = create_server(config)
print(f"Daily report app listening on {config.base_url}")
try:
server.serve_forever()
finally:
server.server_close()
server.database.close() # type: ignore[attr-defined]
if __name__ == "__main__":
main()