AirGate/backend/utils/volcengine_client.py
seaislee1209 555c86ce76 feat: initialize AirGate - Volcengine IAM sub-account management platform
Backend (Django 4.2 + DRF):
- Admin auth with SimpleJWT
- Volcengine API client with HMAC-SHA256 signing
- IAM user management (create/sync/import/disable/enable)
- Billing query with pagination
- Feishu webhook notifications (async)
- APScheduler for periodic spending checks
- AES-256 encrypted credential storage
- API key auth for external system integration

Frontend (Vue 3 + Element Plus):
- Login page
- Dashboard with stats overview
- IAM user list with per-user threshold config
- Billing view with spending progress bars
- Alert history with type filtering
- Settings page for global config and Volcengine account management

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 13:03:30 +08:00

116 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""火山引擎 Open API 客户端HMAC-SHA256 签名)"""
import datetime
import hashlib
import hmac
import logging
from urllib.parse import quote
import requests
logger = logging.getLogger(__name__)
class VolcengineAPIError(Exception):
def __init__(self, action: str, code: str, message: str):
self.action = action
self.code = code
super().__init__(f"[{action}] {code}: {message}")
class VolcengineClient:
"""火山引擎 API 客户端"""
def __init__(self, ak: str, sk: str, service: str, host: str,
region: str = "cn-north-1", version: str = "2018-01-01"):
self.ak = ak
self.sk = sk
self.service = service
self.host = host
self.region = region
self.version = version
def _norm_query(self, params: dict) -> str:
query = ""
for key in sorted(params.keys()):
if isinstance(params[key], list):
for v in params[key]:
query += quote(key, safe="-_.~") + "=" + quote(str(v), safe="-_.~") + "&"
else:
query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&"
return query[:-1].replace("+", "%20") if query else ""
def _hmac_sha256(self, key: bytes, content: str) -> bytes:
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
def _hash_sha256(self, content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def call(self, action: str, params: dict = None, body: str = "") -> dict:
params = params or {}
now = datetime.datetime.now(datetime.timezone.utc)
x_date = now.strftime("%Y%m%dT%H%M%SZ")
short_date = x_date[:8]
x_content_sha256 = self._hash_sha256(body)
all_params = {"Action": action, "Version": self.version, **params}
signed_headers_str = "content-type;host;x-content-sha256;x-date"
canonical_headers = (
f"content-type:application/x-www-form-urlencoded\n"
f"host:{self.host}\n"
f"x-content-sha256:{x_content_sha256}\n"
f"x-date:{x_date}"
)
query_string = self._norm_query(all_params)
canonical_request = "\n".join([
"GET", "/", query_string,
canonical_headers, "", signed_headers_str, x_content_sha256
])
credential_scope = f"{short_date}/{self.region}/{self.service}/request"
string_to_sign = "\n".join([
"HMAC-SHA256", x_date, credential_scope,
self._hash_sha256(canonical_request)
])
k_date = self._hmac_sha256(self.sk.encode("utf-8"), short_date)
k_region = self._hmac_sha256(k_date, self.region)
k_service = self._hmac_sha256(k_region, self.service)
k_signing = self._hmac_sha256(k_service, "request")
signature = self._hmac_sha256(k_signing, string_to_sign).hex()
headers = {
"Host": self.host,
"X-Date": x_date,
"X-Content-Sha256": x_content_sha256,
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": (
f"HMAC-SHA256 Credential={self.ak}/{credential_scope}, "
f"SignedHeaders={signed_headers_str}, Signature={signature}"
),
}
url = f"https://{self.host}/?{query_string}"
try:
r = requests.get(url, headers=headers, timeout=30)
resp = r.json()
except Exception as e:
raise VolcengineAPIError(action, "NetworkError", str(e))
error = resp.get("ResponseMetadata", {}).get("Error")
if error:
raise VolcengineAPIError(
action, error.get("Code", "Unknown"), error.get("Message", "")
)
return resp
def get_iam_client(ak: str, sk: str) -> VolcengineClient:
return VolcengineClient(ak, sk, "iam", "iam.volcengineapi.com")
def get_billing_client(ak: str, sk: str) -> VolcengineClient:
return VolcengineClient(ak, sk, "billing", "billing.volcengineapi.com",
version="2022-01-01")