AirGate/backend/utils/iam_service.py
seaislee1209 0f034b7b26 feat: auto-manage Deny policy for project isolation
- Add upsert_deny_policy / remove_deny_policy to IAMService
- Auto-update Deny policy when adding/removing projects
- Auto-create Deny policy on sub-account creation
- Deny policy lists all non-authorized projects explicitly
- Verified: cross-project ListAssetGroups and ListApiKeys are blocked
- Updated research report with cross-project API findings (2026-03-28)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 20:30:56 +08:00

264 lines
9.6 KiB
Python

"""IAM 子账号管理服务"""
import logging
from .volcengine_client import get_iam_client, get_resource_client, VolcengineAPIError
logger = logging.getLogger(__name__)
class IAMService:
"""封装火山引擎 IAM API 操作"""
def __init__(self, ak: str, sk: str):
self.client = get_iam_client(ak, sk)
def list_users(self, limit=100, offset=0) -> dict:
return self.client.call("ListUsers", {"Limit": str(limit), "Offset": str(offset)})
def get_user(self, username: str) -> dict:
return self.client.call("GetUser", {"UserName": username})
def create_user(self, username: str, display_name: str = "", email: str = "",
phone: str = "") -> dict:
params = {"UserName": username}
if display_name:
params["DisplayName"] = display_name
if email:
params["Email"] = email
if phone:
params["MobilePhone"] = phone
return self.client.call("CreateUser", params)
def create_login_profile(self, username: str, password: str,
login_allowed: bool = True, must_reset: bool = True) -> dict:
return self.client.call("CreateLoginProfile", {
"UserName": username,
"Password": password,
"LoginAllowed": str(login_allowed).lower(),
"PasswordResetRequired": str(must_reset).lower(),
})
def update_login_allowed(self, username: str, allowed: bool) -> dict:
return self.client.call("UpdateLoginProfile", {
"UserName": username,
"LoginAllowed": str(allowed).lower(),
})
def get_login_profile(self, username: str) -> dict:
return self.client.call("GetLoginProfile", {"UserName": username})
def list_access_keys(self, username: str) -> list:
resp = self.client.call("ListAccessKeys", {"UserName": username})
return resp.get("Result", {}).get("AccessKeyMetadata", [])
def update_access_key(self, ak_id: str, status: str, username: str = "") -> dict:
params = {"AccessKeyId": ak_id, "Status": status}
if username:
params["UserName"] = username
return self.client.call("UpdateAccessKey", params)
def create_access_key(self, username: str) -> dict:
return self.client.call("CreateAccessKey", {"UserName": username})
def attach_user_policy(self, username: str, policy_name: str,
policy_type: str = "System") -> dict:
return self.client.call("AttachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
})
def detach_user_policy(self, username: str, policy_name: str,
policy_type: str = "System") -> dict:
return self.client.call("DetachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
})
def update_user(self, username: str, display_name: str = None,
email: str = None, phone: str = None) -> dict:
params = {"UserName": username}
if display_name is not None:
params["NewDisplayName"] = display_name
if email is not None:
params["NewEmail"] = email
if phone is not None:
params["NewMobilePhone"] = phone
return self.client.call("UpdateUser", params)
def list_attached_user_policies(self, username: str) -> dict:
return self.client.call("ListAttachedUserPolicies", {"UserName": username})
def attach_policy_in_project(self, username: str, policy_name: str,
project_name: str, policy_type: str = "System") -> dict:
"""在项目范围内授权"""
return self.client.call("AttachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
"ProjectName": project_name,
})
def detach_policy_in_project(self, username: str, policy_name: str,
project_name: str, policy_type: str = "System") -> dict:
"""在项目范围内回收权限"""
return self.client.call("DetachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
"ProjectName": project_name,
})
# === Deny Policy (project isolation) ===
def _deny_policy_name(self, username: str) -> str:
return f"AirGate_Deny_{username}"
def upsert_deny_policy(self, username: str, allowed_projects: list[str]):
"""创建或更新子账号的 Deny 策略,只允许访问指定项目"""
import json
policy_name = self._deny_policy_name(username)
# Get all projects to build explicit deny list
from .volcengine_client import get_resource_client
try:
res_client = get_resource_client(
self.client.ak, self.client.sk
)
resp = res_client.call("ListProjects", {"Limit": "100"})
all_projects = [
p.get("ProjectName", "") for p in
resp.get("Result", {}).get("Projects", [])
]
except Exception:
all_projects = []
if not allowed_projects:
# No projects, deny everything
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:*"],
"Resource": ["*"],
}]
})
else:
# Build explicit deny list: all projects minus allowed ones
deny_projects = [p for p in all_projects if p not in allowed_projects]
if deny_projects:
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:*"],
"Resource": [f"trn:iam::*:project/{p}" for p in deny_projects],
}]
})
else:
# All projects are allowed, no deny needed
# Create a no-op policy
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:ThisActionDoesNotExist"],
"Resource": ["*"],
}]
})
# Try to update existing, if not found create new
try:
self.client.call("DeletePolicy", {"PolicyName": policy_name})
except VolcengineAPIError:
pass # Policy doesn't exist yet
self.client.call("CreatePolicy", {
"PolicyName": policy_name,
"PolicyDocument": policy_doc,
"Description": f"AirGate 自动生成:限制 {username} 只能访问授权项目",
})
# Ensure it's attached
try:
self.attach_user_policy(username, policy_name, "Custom")
except VolcengineAPIError as e:
if "PolicyAttachConflict" not in str(e):
raise
def remove_deny_policy(self, username: str):
"""移除子账号的 Deny 策略"""
policy_name = self._deny_policy_name(username)
try:
self.detach_user_policy(username, policy_name, "Custom")
except VolcengineAPIError:
pass
try:
self.client.call("DeletePolicy", {"PolicyName": policy_name})
except VolcengineAPIError:
pass
def disable_user(self, username: str):
"""完全停用用户:停控制台 + 停所有 AccessKey"""
errors = []
try:
self.update_login_allowed(username, False)
except VolcengineAPIError as e:
errors.append(f"停用控制台失败: {e}")
try:
keys = self.list_access_keys(username)
for key in keys:
if key.get("Status") == "active":
self.update_access_key(key["AccessKeyId"], "inactive", username)
except VolcengineAPIError as e:
errors.append(f"停用密钥失败: {e}")
if errors:
raise VolcengineAPIError("DisableUser", "PartialFailure", "; ".join(errors))
def enable_user(self, username: str):
"""恢复用户:恢复控制台 + 恢复所有 AccessKey"""
errors = []
try:
self.update_login_allowed(username, True)
except VolcengineAPIError as e:
errors.append(f"恢复控制台失败: {e}")
try:
keys = self.list_access_keys(username)
for key in keys:
if key.get("Status") == "inactive":
self.update_access_key(key["AccessKeyId"], "active", username)
except VolcengineAPIError as e:
errors.append(f"恢复密钥失败: {e}")
if errors:
raise VolcengineAPIError("EnableUser", "PartialFailure", "; ".join(errors))
class ProjectService:
"""封装火山引擎项目管理 API"""
def __init__(self, ak: str, sk: str):
self.client = get_resource_client(ak, sk)
def list_projects(self) -> list:
"""获取所有项目列表"""
projects = []
page = 1
while True:
resp = self.client.call("ListProjects", {
"PageNumber": str(page),
"PageSize": "50",
})
items = resp.get("Result", {}).get("Projects", [])
if not items:
break
projects.extend(items)
total = resp.get("Result", {}).get("Total", 0)
if len(projects) >= total:
break
page += 1
return projects