AttachUserPolicy ignores Scope=Project parameter - policies always
attach globally. Project isolation now relies entirely on Deny policy
(AirGate_Deny_{username}) which blocks access to non-whitelisted projects.
Updated report with this finding.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
282 lines
11 KiB
Python
282 lines
11 KiB
Python
"""IAM 子账号管理服务"""
|
||
|
||
import logging
|
||
from .volcengine_client import get_iam_client, get_resource_client, VolcengineAPIError
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class IAMService:
|
||
"""封装火山引擎 IAM API 操作"""
|
||
|
||
def __init__(self, ak: str, sk: str):
|
||
self.client = get_iam_client(ak, sk)
|
||
|
||
def list_users(self, limit=100, offset=0) -> dict:
|
||
return self.client.call("ListUsers", {"Limit": str(limit), "Offset": str(offset)})
|
||
|
||
def get_user(self, username: str) -> dict:
|
||
return self.client.call("GetUser", {"UserName": username})
|
||
|
||
def create_user(self, username: str, display_name: str = "", email: str = "",
|
||
phone: str = "") -> dict:
|
||
params = {"UserName": username}
|
||
if display_name:
|
||
params["DisplayName"] = display_name
|
||
if email:
|
||
params["Email"] = email
|
||
if phone:
|
||
params["MobilePhone"] = phone
|
||
return self.client.call("CreateUser", params)
|
||
|
||
def create_login_profile(self, username: str, password: str,
|
||
login_allowed: bool = True, must_reset: bool = True) -> dict:
|
||
return self.client.call("CreateLoginProfile", {
|
||
"UserName": username,
|
||
"Password": password,
|
||
"LoginAllowed": str(login_allowed).lower(),
|
||
"PasswordResetRequired": str(must_reset).lower(),
|
||
})
|
||
|
||
def update_login_allowed(self, username: str, allowed: bool) -> dict:
|
||
return self.client.call("UpdateLoginProfile", {
|
||
"UserName": username,
|
||
"LoginAllowed": str(allowed).lower(),
|
||
})
|
||
|
||
def get_login_profile(self, username: str) -> dict:
|
||
return self.client.call("GetLoginProfile", {"UserName": username})
|
||
|
||
def list_access_keys(self, username: str) -> list:
|
||
resp = self.client.call("ListAccessKeys", {"UserName": username})
|
||
return resp.get("Result", {}).get("AccessKeyMetadata", [])
|
||
|
||
def update_access_key(self, ak_id: str, status: str, username: str = "") -> dict:
|
||
params = {"AccessKeyId": ak_id, "Status": status}
|
||
if username:
|
||
params["UserName"] = username
|
||
return self.client.call("UpdateAccessKey", params)
|
||
|
||
def create_access_key(self, username: str) -> dict:
|
||
return self.client.call("CreateAccessKey", {"UserName": username})
|
||
|
||
def attach_user_policy(self, username: str, policy_name: str,
|
||
policy_type: str = "System") -> dict:
|
||
return self.client.call("AttachUserPolicy", {
|
||
"UserName": username,
|
||
"PolicyName": policy_name,
|
||
"PolicyType": policy_type,
|
||
})
|
||
|
||
def detach_user_policy(self, username: str, policy_name: str,
|
||
policy_type: str = "System") -> dict:
|
||
return self.client.call("DetachUserPolicy", {
|
||
"UserName": username,
|
||
"PolicyName": policy_name,
|
||
"PolicyType": policy_type,
|
||
})
|
||
|
||
def update_user(self, username: str, display_name: str = None,
|
||
email: str = None, phone: str = None) -> dict:
|
||
params = {"UserName": username}
|
||
if display_name is not None:
|
||
params["NewDisplayName"] = display_name
|
||
if email is not None:
|
||
params["NewEmail"] = email
|
||
if phone is not None:
|
||
params["NewMobilePhone"] = phone
|
||
return self.client.call("UpdateUser", params)
|
||
|
||
def list_attached_user_policies(self, username: str) -> dict:
|
||
return self.client.call("ListAttachedUserPolicies", {"UserName": username})
|
||
|
||
def attach_policy_in_project(self, username: str, policy_name: str,
|
||
project_name: str, policy_type: str = "System") -> dict:
|
||
"""授权策略(全局),项目隔离靠 Deny 策略实现。
|
||
注意:火山 Open API 不支持项目级授权(Scope=Project 无效),
|
||
所以统一走全局授权 + AirGate_Deny_{username} 策略隔离。"""
|
||
return self.client.call("AttachUserPolicy", {
|
||
"UserName": username,
|
||
"PolicyName": policy_name,
|
||
"PolicyType": policy_type,
|
||
})
|
||
|
||
def detach_policy_in_project(self, username: str, policy_name: str,
|
||
project_name: str, policy_type: str = "System") -> dict:
|
||
"""回收策略(全局)"""
|
||
return self.client.call("DetachUserPolicy", {
|
||
"UserName": username,
|
||
"PolicyName": policy_name,
|
||
"PolicyType": policy_type,
|
||
})
|
||
|
||
# === Deny Policy (project isolation) ===
|
||
|
||
def _deny_policy_name(self, username: str) -> str:
|
||
return f"AirGate_Deny_{username}"
|
||
|
||
def upsert_deny_policy(self, username: str, allowed_projects: list[str]):
|
||
"""创建或更新子账号的 Deny 策略,只允许访问指定项目"""
|
||
import json
|
||
policy_name = self._deny_policy_name(username)
|
||
|
||
# Get all projects to build explicit deny list
|
||
from .volcengine_client import get_resource_client
|
||
res_client = get_resource_client(
|
||
self.client.ak, self.client.sk
|
||
)
|
||
resp = res_client.call("ListProjects", {"Limit": "100"})
|
||
all_projects = [
|
||
p.get("ProjectName", "") for p in
|
||
resp.get("Result", {}).get("Projects", [])
|
||
]
|
||
|
||
if not all_projects:
|
||
logger.warning(f"无法获取项目列表,跳过 Deny 策略更新 ({username})")
|
||
return
|
||
|
||
if not allowed_projects:
|
||
# No projects, deny everything
|
||
policy_doc = json.dumps({
|
||
"Statement": [{
|
||
"Effect": "Deny",
|
||
"Action": ["ark:*"],
|
||
"Resource": ["*"],
|
||
}]
|
||
})
|
||
else:
|
||
# Build explicit deny list: all projects minus allowed ones
|
||
deny_projects = [p for p in all_projects if p not in allowed_projects]
|
||
if deny_projects:
|
||
policy_doc = json.dumps({
|
||
"Statement": [{
|
||
"Effect": "Deny",
|
||
"Action": ["ark:*"],
|
||
"Resource": [f"trn:iam::*:project/{p}" for p in deny_projects],
|
||
}]
|
||
})
|
||
else:
|
||
# All projects are allowed, no deny needed
|
||
# Create a no-op policy
|
||
policy_doc = json.dumps({
|
||
"Statement": [{
|
||
"Effect": "Deny",
|
||
"Action": ["ark:ThisActionDoesNotExist"],
|
||
"Resource": ["*"],
|
||
}]
|
||
})
|
||
|
||
# Delete old policy (must detach first), then recreate
|
||
try:
|
||
self.detach_user_policy(username, policy_name, "Custom")
|
||
except VolcengineAPIError:
|
||
pass # Not attached or doesn't exist
|
||
|
||
try:
|
||
self.client.call("DeletePolicy", {"PolicyName": policy_name})
|
||
except VolcengineAPIError:
|
||
pass # Policy doesn't exist yet
|
||
|
||
self.client.call("CreatePolicy", {
|
||
"PolicyName": policy_name,
|
||
"PolicyDocument": policy_doc,
|
||
"Description": f"AirGate 自动生成:限制 {username} 只能访问授权项目",
|
||
})
|
||
|
||
self.attach_user_policy(username, policy_name, "Custom")
|
||
|
||
def remove_deny_policy(self, username: str):
|
||
"""移除子账号的 Deny 策略"""
|
||
policy_name = self._deny_policy_name(username)
|
||
try:
|
||
self.detach_user_policy(username, policy_name, "Custom")
|
||
except VolcengineAPIError:
|
||
pass
|
||
try:
|
||
self.client.call("DeletePolicy", {"PolicyName": policy_name})
|
||
except VolcengineAPIError:
|
||
pass
|
||
|
||
def _has_login_profile(self, username: str) -> bool:
|
||
"""检查用户是否有真实的 LoginProfile(火山可能返回空壳)"""
|
||
try:
|
||
resp = self.get_login_profile(username)
|
||
profile = resp.get("Result", {}).get("LoginProfile", {})
|
||
# Empty shell has CreateDate=19700101 and Password=""
|
||
create_date = profile.get("CreateDate", "")
|
||
if create_date.startswith("1970") or create_date.startswith("0001"):
|
||
return False
|
||
return True
|
||
except VolcengineAPIError as e:
|
||
if "LoginProfileNotExist" in str(e) or "RecordNotFound" in str(e):
|
||
return False
|
||
raise
|
||
|
||
def disable_user(self, username: str):
|
||
"""完全停用用户:停控制台 + 停所有 AccessKey"""
|
||
errors = []
|
||
|
||
if self._has_login_profile(username):
|
||
try:
|
||
self.update_login_allowed(username, False)
|
||
except VolcengineAPIError as e:
|
||
errors.append(f"停用控制台失败: {e}")
|
||
|
||
try:
|
||
keys = self.list_access_keys(username)
|
||
for key in keys:
|
||
if key.get("Status") == "active":
|
||
self.update_access_key(key["AccessKeyId"], "inactive", username)
|
||
except VolcengineAPIError as e:
|
||
errors.append(f"停用密钥失败: {e}")
|
||
|
||
if errors:
|
||
raise VolcengineAPIError("DisableUser", "PartialFailure", "; ".join(errors))
|
||
|
||
def enable_user(self, username: str, restore_login: bool = True):
|
||
"""恢复用户:恢复控制台(可选) + 恢复所有 AccessKey"""
|
||
errors = []
|
||
|
||
if restore_login and self._has_login_profile(username):
|
||
try:
|
||
self.update_login_allowed(username, True)
|
||
except VolcengineAPIError as e:
|
||
errors.append(f"恢复控制台失败: {e}")
|
||
|
||
try:
|
||
keys = self.list_access_keys(username)
|
||
for key in keys:
|
||
if key.get("Status") == "inactive":
|
||
self.update_access_key(key["AccessKeyId"], "active", username)
|
||
except VolcengineAPIError as e:
|
||
errors.append(f"恢复密钥失败: {e}")
|
||
|
||
if errors:
|
||
raise VolcengineAPIError("EnableUser", "PartialFailure", "; ".join(errors))
|
||
|
||
|
||
class ProjectService:
|
||
"""封装火山引擎项目管理 API"""
|
||
|
||
def __init__(self, ak: str, sk: str):
|
||
self.client = get_resource_client(ak, sk)
|
||
|
||
def list_projects(self) -> list:
|
||
"""获取所有项目列表"""
|
||
projects = []
|
||
page = 1
|
||
while True:
|
||
resp = self.client.call("ListProjects", {
|
||
"PageNumber": str(page),
|
||
"PageSize": "50",
|
||
})
|
||
items = resp.get("Result", {}).get("Projects", [])
|
||
if not items:
|
||
break
|
||
projects.extend(items)
|
||
total = resp.get("Result", {}).get("Total", 0)
|
||
if len(projects) >= total:
|
||
break
|
||
page += 1
|
||
return projects
|