from __future__ import annotations import base64 import hashlib import hmac import json import secrets import urllib.parse import urllib.request from typing import Any AUTHORIZE_URL = "https://open.feishu.cn/open-apis/authen/v1/authorize" TOKEN_URL = "https://open.feishu.cn/open-apis/authen/v2/oauth/token" USER_INFO_URL = "https://open.feishu.cn/open-apis/authen/v1/user_info" def _b64encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") def _b64decode(text: str) -> bytes: padding = "=" * (-len(text) % 4) return base64.urlsafe_b64decode(text + padding) def create_session_cookie(session: dict[str, str], secret: str) -> str: payload = _b64encode(json.dumps(session, ensure_ascii=False, separators=(",", ":")).encode("utf-8")) signature = hmac.new(secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest() return f"{payload}.{signature}" def parse_session_cookie(cookie_value: str | None, secret: str) -> dict[str, str] | None: if not cookie_value or "." not in cookie_value: return None payload, signature = cookie_value.rsplit(".", 1) expected = hmac.new(secret.encode("utf-8"), payload.encode("ascii"), hashlib.sha256).hexdigest() if not hmac.compare_digest(signature, expected): return None try: data = json.loads(_b64decode(payload).decode("utf-8")) except (ValueError, json.JSONDecodeError): return None if not isinstance(data, dict) or not data.get("feishu_user_id"): return None return {str(key): str(value) for key, value in data.items()} def create_state() -> str: return secrets.token_urlsafe(24) def authorize_url(app_id: str, redirect_uri: str, state: str) -> str: query = urllib.parse.urlencode( { "app_id": app_id, "redirect_uri": redirect_uri, "state": state, } ) return f"{AUTHORIZE_URL}?{query}" def exchange_code_for_user_access_token(app_id: str, app_secret: str, code: str, redirect_uri: str) -> str: payload = json.dumps( { "grant_type": "authorization_code", "client_id": app_id, "client_secret": app_secret, "code": code, "redirect_uri": redirect_uri, } ).encode("utf-8") request = urllib.request.Request( TOKEN_URL, data=payload, headers={"content-type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=10) as response: result = json.loads(response.read().decode("utf-8")) token = result.get("access_token") or result.get("data", {}).get("access_token") if not token: raise RuntimeError(f"Feishu token response missing access_token: {result}") return str(token) def fetch_user_info(user_access_token: str) -> dict[str, Any]: request = urllib.request.Request( USER_INFO_URL, headers={"authorization": f"Bearer {user_access_token}"}, method="GET", ) with urllib.request.urlopen(request, timeout=10) as response: result = json.loads(response.read().decode("utf-8")) data = result.get("data", result) user_id = data.get("user_id") or data.get("open_id") or data.get("union_id") name = data.get("name") or data.get("en_name") or data.get("email") or user_id if not user_id: raise RuntimeError(f"Feishu user_info response missing user id: {result}") return { "feishu_user_id": str(user_id), "name": str(name), "department": "", "manager": "", "active": True, }