"""火山引擎 Open API 客户端(HMAC-SHA256 签名)""" import datetime import hashlib import hmac import logging from urllib.parse import quote import requests logger = logging.getLogger(__name__) class VolcengineAPIError(Exception): def __init__(self, action: str, code: str, message: str): self.action = action self.code = code super().__init__(f"[{action}] {code}: {message}") class VolcengineClient: """火山引擎 API 客户端""" def __init__(self, ak: str, sk: str, service: str, host: str, region: str = "cn-north-1", version: str = "2018-01-01"): self.ak = ak self.sk = sk self.service = service self.host = host self.region = region self.version = version def _norm_query(self, params: dict) -> str: query = "" for key in sorted(params.keys()): if isinstance(params[key], list): for v in params[key]: query += quote(key, safe="-_.~") + "=" + quote(str(v), safe="-_.~") + "&" else: query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&" return query[:-1].replace("+", "%20") if query else "" def _hmac_sha256(self, key: bytes, content: str) -> bytes: return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() def _hash_sha256(self, content: str) -> str: return hashlib.sha256(content.encode("utf-8")).hexdigest() def call(self, action: str, params: dict = None, body: str = "") -> dict: params = params or {} now = datetime.datetime.now(datetime.timezone.utc) x_date = now.strftime("%Y%m%dT%H%M%SZ") short_date = x_date[:8] x_content_sha256 = self._hash_sha256(body) all_params = {"Action": action, "Version": self.version, **params} signed_headers_str = "content-type;host;x-content-sha256;x-date" canonical_headers = ( f"content-type:application/x-www-form-urlencoded\n" f"host:{self.host}\n" f"x-content-sha256:{x_content_sha256}\n" f"x-date:{x_date}" ) query_string = self._norm_query(all_params) canonical_request = "\n".join([ "GET", "/", query_string, canonical_headers, "", signed_headers_str, x_content_sha256 ]) credential_scope = f"{short_date}/{self.region}/{self.service}/request" string_to_sign = "\n".join([ "HMAC-SHA256", x_date, credential_scope, self._hash_sha256(canonical_request) ]) k_date = self._hmac_sha256(self.sk.encode("utf-8"), short_date) k_region = self._hmac_sha256(k_date, self.region) k_service = self._hmac_sha256(k_region, self.service) k_signing = self._hmac_sha256(k_service, "request") signature = self._hmac_sha256(k_signing, string_to_sign).hex() headers = { "Host": self.host, "X-Date": x_date, "X-Content-Sha256": x_content_sha256, "Content-Type": "application/x-www-form-urlencoded", "Authorization": ( f"HMAC-SHA256 Credential={self.ak}/{credential_scope}, " f"SignedHeaders={signed_headers_str}, Signature={signature}" ), } url = f"https://{self.host}/?{query_string}" try: r = requests.get(url, headers=headers, timeout=30) resp = r.json() except Exception as e: raise VolcengineAPIError(action, "NetworkError", str(e)) error = resp.get("ResponseMetadata", {}).get("Error") if error: raise VolcengineAPIError( action, error.get("Code", "Unknown"), error.get("Message", "") ) return resp def get_iam_client(ak: str, sk: str) -> VolcengineClient: return VolcengineClient(ak, sk, "iam", "iam.volcengineapi.com") def get_billing_client(ak: str, sk: str) -> VolcengineClient: return VolcengineClient(ak, sk, "billing", "billing.volcengineapi.com", version="2022-01-01") def get_resource_client(ak: str, sk: str) -> VolcengineClient: return VolcengineClient(ak, sk, "iam", "iam.volcengineapi.com", version="2021-08-01")