Project-level authorization: - Adding a project to a sub-account now auto-calls AttachPolicyInProject to grant default policies (ArkFullAccess, TOSFullAccess) in that project scope - Removing a project auto-calls DetachPolicyInProject to revoke those policies - Each project records which policies were attached (attached_policies field) so removal knows exactly what to revoke Configuration: - GlobalConfig.default_project_policies: configurable list of policies to auto-attach (editable in Settings page, defaults to ArkFullAccess + TOSFullAccess) IAM Service: - Added attach_policy_in_project() and detach_policy_in_project() methods using standard AttachUserPolicy/DetachUserPolicy with ProjectName parameter Frontend: - Projects dialog now shows "已授权策略" column with policy tags - Settings page has "项目默认授权策略" config field Alert logging: - Project add/remove operations are logged with attached/detached policy details Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
167 lines
6.0 KiB
Python
167 lines
6.0 KiB
Python
"""IAM 子账号管理服务"""
|
|
|
|
import logging
|
|
from .volcengine_client import get_iam_client, get_resource_client, VolcengineAPIError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IAMService:
|
|
"""封装火山引擎 IAM API 操作"""
|
|
|
|
def __init__(self, ak: str, sk: str):
|
|
self.client = get_iam_client(ak, sk)
|
|
|
|
def list_users(self, limit=100, offset=0) -> dict:
|
|
return self.client.call("ListUsers", {"Limit": str(limit), "Offset": str(offset)})
|
|
|
|
def get_user(self, username: str) -> dict:
|
|
return self.client.call("GetUser", {"UserName": username})
|
|
|
|
def create_user(self, username: str, display_name: str = "", email: str = "",
|
|
phone: str = "") -> dict:
|
|
params = {"UserName": username}
|
|
if display_name:
|
|
params["DisplayName"] = display_name
|
|
if email:
|
|
params["Email"] = email
|
|
if phone:
|
|
params["MobilePhone"] = phone
|
|
return self.client.call("CreateUser", params)
|
|
|
|
def create_login_profile(self, username: str, password: str,
|
|
login_allowed: bool = True, must_reset: bool = True) -> dict:
|
|
return self.client.call("CreateLoginProfile", {
|
|
"UserName": username,
|
|
"Password": password,
|
|
"LoginAllowed": str(login_allowed).lower(),
|
|
"PasswordResetRequired": str(must_reset).lower(),
|
|
})
|
|
|
|
def update_login_allowed(self, username: str, allowed: bool) -> dict:
|
|
return self.client.call("UpdateLoginProfile", {
|
|
"UserName": username,
|
|
"LoginAllowed": str(allowed).lower(),
|
|
})
|
|
|
|
def get_login_profile(self, username: str) -> dict:
|
|
return self.client.call("GetLoginProfile", {"UserName": username})
|
|
|
|
def list_access_keys(self, username: str) -> list:
|
|
resp = self.client.call("ListAccessKeys", {"UserName": username})
|
|
return resp.get("Result", {}).get("AccessKeyMetadata", [])
|
|
|
|
def update_access_key(self, ak_id: str, status: str, username: str = "") -> dict:
|
|
params = {"AccessKeyId": ak_id, "Status": status}
|
|
if username:
|
|
params["UserName"] = username
|
|
return self.client.call("UpdateAccessKey", params)
|
|
|
|
def create_access_key(self, username: str) -> dict:
|
|
return self.client.call("CreateAccessKey", {"UserName": username})
|
|
|
|
def attach_user_policy(self, username: str, policy_name: str,
|
|
policy_type: str = "System") -> dict:
|
|
return self.client.call("AttachUserPolicy", {
|
|
"UserName": username,
|
|
"PolicyName": policy_name,
|
|
"PolicyType": policy_type,
|
|
})
|
|
|
|
def detach_user_policy(self, username: str, policy_name: str,
|
|
policy_type: str = "System") -> dict:
|
|
return self.client.call("DetachUserPolicy", {
|
|
"UserName": username,
|
|
"PolicyName": policy_name,
|
|
"PolicyType": policy_type,
|
|
})
|
|
|
|
def list_attached_user_policies(self, username: str) -> dict:
|
|
return self.client.call("ListAttachedUserPolicies", {"UserName": username})
|
|
|
|
def attach_policy_in_project(self, username: str, policy_name: str,
|
|
project_name: str, policy_type: str = "System") -> dict:
|
|
"""在项目范围内授权"""
|
|
return self.client.call("AttachUserPolicy", {
|
|
"UserName": username,
|
|
"PolicyName": policy_name,
|
|
"PolicyType": policy_type,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def detach_policy_in_project(self, username: str, policy_name: str,
|
|
project_name: str, policy_type: str = "System") -> dict:
|
|
"""在项目范围内回收权限"""
|
|
return self.client.call("DetachUserPolicy", {
|
|
"UserName": username,
|
|
"PolicyName": policy_name,
|
|
"PolicyType": policy_type,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def disable_user(self, username: str):
|
|
"""完全停用用户:停控制台 + 停所有 AccessKey"""
|
|
errors = []
|
|
|
|
try:
|
|
self.update_login_allowed(username, False)
|
|
except VolcengineAPIError as e:
|
|
errors.append(f"停用控制台失败: {e}")
|
|
|
|
try:
|
|
keys = self.list_access_keys(username)
|
|
for key in keys:
|
|
if key.get("Status") == "active":
|
|
self.update_access_key(key["AccessKeyId"], "inactive", username)
|
|
except VolcengineAPIError as e:
|
|
errors.append(f"停用密钥失败: {e}")
|
|
|
|
if errors:
|
|
raise VolcengineAPIError("DisableUser", "PartialFailure", "; ".join(errors))
|
|
|
|
def enable_user(self, username: str):
|
|
"""恢复用户:恢复控制台 + 恢复所有 AccessKey"""
|
|
errors = []
|
|
|
|
try:
|
|
self.update_login_allowed(username, True)
|
|
except VolcengineAPIError as e:
|
|
errors.append(f"恢复控制台失败: {e}")
|
|
|
|
try:
|
|
keys = self.list_access_keys(username)
|
|
for key in keys:
|
|
if key.get("Status") == "inactive":
|
|
self.update_access_key(key["AccessKeyId"], "active", username)
|
|
except VolcengineAPIError as e:
|
|
errors.append(f"恢复密钥失败: {e}")
|
|
|
|
if errors:
|
|
raise VolcengineAPIError("EnableUser", "PartialFailure", "; ".join(errors))
|
|
|
|
|
|
class ProjectService:
|
|
"""封装火山引擎项目管理 API"""
|
|
|
|
def __init__(self, ak: str, sk: str):
|
|
self.client = get_resource_client(ak, sk)
|
|
|
|
def list_projects(self) -> list:
|
|
"""获取所有项目列表"""
|
|
projects = []
|
|
page = 1
|
|
while True:
|
|
resp = self.client.call("ListProjects", {
|
|
"PageNumber": str(page),
|
|
"PageSize": "50",
|
|
})
|
|
items = resp.get("Result", {}).get("Projects", [])
|
|
if not items:
|
|
break
|
|
projects.extend(items)
|
|
total = resp.get("Result", {}).get("Total", 0)
|
|
if len(projects) >= total:
|
|
break
|
|
page += 1
|
|
return projects
|