"""IAM 子账号管理服务""" import logging from .volcengine_client import get_iam_client, get_resource_client, VolcengineAPIError logger = logging.getLogger(__name__) class IAMService: """封装火山引擎 IAM API 操作""" def __init__(self, ak: str, sk: str): self.client = get_iam_client(ak, sk) def list_users(self, limit=100, offset=0) -> dict: return self.client.call("ListUsers", {"Limit": str(limit), "Offset": str(offset)}) def get_user(self, username: str) -> dict: return self.client.call("GetUser", {"UserName": username}) def create_user(self, username: str, display_name: str = "", email: str = "", phone: str = "") -> dict: params = {"UserName": username} if display_name: params["DisplayName"] = display_name if email: params["Email"] = email if phone: params["MobilePhone"] = phone return self.client.call("CreateUser", params) def create_login_profile(self, username: str, password: str, login_allowed: bool = True, must_reset: bool = True) -> dict: return self.client.call("CreateLoginProfile", { "UserName": username, "Password": password, "LoginAllowed": str(login_allowed).lower(), "PasswordResetRequired": str(must_reset).lower(), }) def update_login_allowed(self, username: str, allowed: bool) -> dict: return self.client.call("UpdateLoginProfile", { "UserName": username, "LoginAllowed": str(allowed).lower(), }) def get_login_profile(self, username: str) -> dict: return self.client.call("GetLoginProfile", {"UserName": username}) def list_access_keys(self, username: str) -> list: resp = self.client.call("ListAccessKeys", {"UserName": username}) return resp.get("Result", {}).get("AccessKeyMetadata", []) def update_access_key(self, ak_id: str, status: str, username: str = "") -> dict: params = {"AccessKeyId": ak_id, "Status": status} if username: params["UserName"] = username return self.client.call("UpdateAccessKey", params) def create_access_key(self, username: str) -> dict: return self.client.call("CreateAccessKey", {"UserName": username}) def attach_user_policy(self, username: str, policy_name: str, policy_type: str = "System") -> dict: return self.client.call("AttachUserPolicy", { "UserName": username, "PolicyName": policy_name, "PolicyType": policy_type, }) def detach_user_policy(self, username: str, policy_name: str, policy_type: str = "System") -> dict: return self.client.call("DetachUserPolicy", { "UserName": username, "PolicyName": policy_name, "PolicyType": policy_type, }) def update_user(self, username: str, display_name: str = None, email: str = None, phone: str = None) -> dict: params = {"UserName": username} if display_name is not None: params["NewDisplayName"] = display_name if email is not None: params["NewEmail"] = email if phone is not None: params["NewMobilePhone"] = phone return self.client.call("UpdateUser", params) def list_attached_user_policies(self, username: str) -> dict: return self.client.call("ListAttachedUserPolicies", {"UserName": username}) def attach_policy_in_project(self, username: str, policy_name: str, project_name: str, policy_type: str = "System") -> dict: """在项目范围内授权""" return self.client.call("AttachUserPolicy", { "UserName": username, "PolicyName": policy_name, "PolicyType": policy_type, "ProjectName": project_name, }) def detach_policy_in_project(self, username: str, policy_name: str, project_name: str, policy_type: str = "System") -> dict: """在项目范围内回收权限""" return self.client.call("DetachUserPolicy", { "UserName": username, "PolicyName": policy_name, "PolicyType": policy_type, "ProjectName": project_name, }) # === Deny Policy (project isolation) === def _deny_policy_name(self, username: str) -> str: return f"AirGate_Deny_{username}" def upsert_deny_policy(self, username: str, allowed_projects: list[str]): """创建或更新子账号的 Deny 策略,只允许访问指定项目""" import json policy_name = self._deny_policy_name(username) # Get all projects to build explicit deny list from .volcengine_client import get_resource_client res_client = get_resource_client( self.client.ak, self.client.sk ) resp = res_client.call("ListProjects", {"Limit": "100"}) all_projects = [ p.get("ProjectName", "") for p in resp.get("Result", {}).get("Projects", []) ] if not all_projects: logger.warning(f"无法获取项目列表,跳过 Deny 策略更新 ({username})") return if not allowed_projects: # No projects, deny everything policy_doc = json.dumps({ "Statement": [{ "Effect": "Deny", "Action": ["ark:*"], "Resource": ["*"], }] }) else: # Build explicit deny list: all projects minus allowed ones deny_projects = [p for p in all_projects if p not in allowed_projects] if deny_projects: policy_doc = json.dumps({ "Statement": [{ "Effect": "Deny", "Action": ["ark:*"], "Resource": [f"trn:iam::*:project/{p}" for p in deny_projects], }] }) else: # All projects are allowed, no deny needed # Create a no-op policy policy_doc = json.dumps({ "Statement": [{ "Effect": "Deny", "Action": ["ark:ThisActionDoesNotExist"], "Resource": ["*"], }] }) # Delete old policy (must detach first), then recreate try: self.detach_user_policy(username, policy_name, "Custom") except VolcengineAPIError: pass # Not attached or doesn't exist try: self.client.call("DeletePolicy", {"PolicyName": policy_name}) except VolcengineAPIError: pass # Policy doesn't exist yet self.client.call("CreatePolicy", { "PolicyName": policy_name, "PolicyDocument": policy_doc, "Description": f"AirGate 自动生成:限制 {username} 只能访问授权项目", }) self.attach_user_policy(username, policy_name, "Custom") def remove_deny_policy(self, username: str): """移除子账号的 Deny 策略""" policy_name = self._deny_policy_name(username) try: self.detach_user_policy(username, policy_name, "Custom") except VolcengineAPIError: pass try: self.client.call("DeletePolicy", {"PolicyName": policy_name}) except VolcengineAPIError: pass def disable_user(self, username: str): """完全停用用户:停控制台 + 停所有 AccessKey""" errors = [] try: self.update_login_allowed(username, False) except VolcengineAPIError as e: errors.append(f"停用控制台失败: {e}") try: keys = self.list_access_keys(username) for key in keys: if key.get("Status") == "active": self.update_access_key(key["AccessKeyId"], "inactive", username) except VolcengineAPIError as e: errors.append(f"停用密钥失败: {e}") if errors: raise VolcengineAPIError("DisableUser", "PartialFailure", "; ".join(errors)) def enable_user(self, username: str): """恢复用户:恢复控制台 + 恢复所有 AccessKey""" errors = [] try: self.update_login_allowed(username, True) except VolcengineAPIError as e: errors.append(f"恢复控制台失败: {e}") try: keys = self.list_access_keys(username) for key in keys: if key.get("Status") == "inactive": self.update_access_key(key["AccessKeyId"], "active", username) except VolcengineAPIError as e: errors.append(f"恢复密钥失败: {e}") if errors: raise VolcengineAPIError("EnableUser", "PartialFailure", "; ".join(errors)) class ProjectService: """封装火山引擎项目管理 API""" def __init__(self, ak: str, sk: str): self.client = get_resource_client(ak, sk) def list_projects(self) -> list: """获取所有项目列表""" projects = [] page = 1 while True: resp = self.client.call("ListProjects", { "PageNumber": str(page), "PageSize": "50", }) items = resp.get("Result", {}).get("Projects", []) if not items: break projects.extend(items) total = resp.get("Result", {}).get("Total", 0) if len(projects) >= total: break page += 1 return projects