kaikai_test/daily_report/feishu_auth.py
2026-05-07 16:31:56 +08:00

113 lines
3.6 KiB
Python

from __future__ import annotations
import base64
import hashlib
import hmac
import json
import secrets
import urllib.parse
import urllib.request
from typing import Any
AUTHORIZE_URL = "https://open.feishu.cn/open-apis/authen/v1/authorize"
TOKEN_URL = "https://open.feishu.cn/open-apis/authen/v2/oauth/token"
USER_INFO_URL = "https://open.feishu.cn/open-apis/authen/v1/user_info"
def _b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64decode(text: str) -> bytes:
padding = "=" * (-len(text) % 4)
return base64.urlsafe_b64decode(text + padding)
def create_session_cookie(session: dict[str, str], secret: str) -> str:
payload = _b64encode(json.dumps(session, ensure_ascii=False, separators=(",", ":")).encode("utf-8"))
signature = hmac.new(secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
return f"{payload}.{signature}"
def parse_session_cookie(cookie_value: str | None, secret: str) -> dict[str, str] | None:
if not cookie_value or "." not in cookie_value:
return None
payload, signature = cookie_value.rsplit(".", 1)
expected = hmac.new(secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest()
if not hmac.compare_digest(signature, expected):
return None
try:
data = json.loads(_b64decode(payload).decode("utf-8"))
except (ValueError, json.JSONDecodeError):
return None
if not isinstance(data, dict) or not data.get("feishu_user_id"):
return None
return {str(key): str(value) for key, value in data.items()}
def create_state() -> str:
return secrets.token_urlsafe(24)
def authorize_url(app_id: str, redirect_uri: str, state: str) -> str:
query = urllib.parse.urlencode(
{
"app_id": app_id,
"redirect_uri": redirect_uri,
"state": state,
}
)
return f"{AUTHORIZE_URL}?{query}"
def exchange_code_for_user_access_token(app_id: str, app_secret: str, code: str, redirect_uri: str) -> str:
payload = json.dumps(
{
"grant_type": "authorization_code",
"client_id": app_id,
"client_secret": app_secret,
"code": code,
"redirect_uri": redirect_uri,
}
).encode("utf-8")
request = urllib.request.Request(
TOKEN_URL,
data=payload,
headers={"content-type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=10) as response:
result = json.loads(response.read().decode("utf-8"))
token = result.get("access_token") or result.get("data", {}).get("access_token")
if not token:
raise RuntimeError(f"Feishu token response missing access_token: {result}")
return str(token)
def fetch_user_info(user_access_token: str) -> dict[str, Any]:
request = urllib.request.Request(
USER_INFO_URL,
headers={"authorization": f"Bearer {user_access_token}"},
method="GET",
)
with urllib.request.urlopen(request, timeout=10) as response:
result = json.loads(response.read().decode("utf-8"))
data = result.get("data", result)
user_id = data.get("user_id") or data.get("open_id") or data.get("union_id")
name = data.get("name") or data.get("en_name") or data.get("email") or user_id
if not user_id:
raise RuntimeError(f"Feishu user_info response missing user id: {result}")
return {
"feishu_user_id": str(user_id),
"name": str(name),
"department": "",
"manager": "",
"active": True,
}