AirGate/backend/utils/iam_service.py
seaislee1209 d0d48ceb19 fix: add Scope=Project to project-level policy attach/detach
Without Scope parameter, AttachUserPolicy defaults to Global scope
even when ProjectName is provided. Adding Scope=Project ensures
policies are correctly limited to the specified project.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 22:39:24 +08:00

284 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""IAM 子账号管理服务"""
import logging
from .volcengine_client import get_iam_client, get_resource_client, VolcengineAPIError
logger = logging.getLogger(__name__)
class IAMService:
"""封装火山引擎 IAM API 操作"""
def __init__(self, ak: str, sk: str):
self.client = get_iam_client(ak, sk)
def list_users(self, limit=100, offset=0) -> dict:
return self.client.call("ListUsers", {"Limit": str(limit), "Offset": str(offset)})
def get_user(self, username: str) -> dict:
return self.client.call("GetUser", {"UserName": username})
def create_user(self, username: str, display_name: str = "", email: str = "",
phone: str = "") -> dict:
params = {"UserName": username}
if display_name:
params["DisplayName"] = display_name
if email:
params["Email"] = email
if phone:
params["MobilePhone"] = phone
return self.client.call("CreateUser", params)
def create_login_profile(self, username: str, password: str,
login_allowed: bool = True, must_reset: bool = True) -> dict:
return self.client.call("CreateLoginProfile", {
"UserName": username,
"Password": password,
"LoginAllowed": str(login_allowed).lower(),
"PasswordResetRequired": str(must_reset).lower(),
})
def update_login_allowed(self, username: str, allowed: bool) -> dict:
return self.client.call("UpdateLoginProfile", {
"UserName": username,
"LoginAllowed": str(allowed).lower(),
})
def get_login_profile(self, username: str) -> dict:
return self.client.call("GetLoginProfile", {"UserName": username})
def list_access_keys(self, username: str) -> list:
resp = self.client.call("ListAccessKeys", {"UserName": username})
return resp.get("Result", {}).get("AccessKeyMetadata", [])
def update_access_key(self, ak_id: str, status: str, username: str = "") -> dict:
params = {"AccessKeyId": ak_id, "Status": status}
if username:
params["UserName"] = username
return self.client.call("UpdateAccessKey", params)
def create_access_key(self, username: str) -> dict:
return self.client.call("CreateAccessKey", {"UserName": username})
def attach_user_policy(self, username: str, policy_name: str,
policy_type: str = "System") -> dict:
return self.client.call("AttachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
})
def detach_user_policy(self, username: str, policy_name: str,
policy_type: str = "System") -> dict:
return self.client.call("DetachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
})
def update_user(self, username: str, display_name: str = None,
email: str = None, phone: str = None) -> dict:
params = {"UserName": username}
if display_name is not None:
params["NewDisplayName"] = display_name
if email is not None:
params["NewEmail"] = email
if phone is not None:
params["NewMobilePhone"] = phone
return self.client.call("UpdateUser", params)
def list_attached_user_policies(self, username: str) -> dict:
return self.client.call("ListAttachedUserPolicies", {"UserName": username})
def attach_policy_in_project(self, username: str, policy_name: str,
project_name: str, policy_type: str = "System") -> dict:
"""在项目范围内授权(限定到指定项目)"""
return self.client.call("AttachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
"ProjectName": project_name,
"Scope": "Project",
})
def detach_policy_in_project(self, username: str, policy_name: str,
project_name: str, policy_type: str = "System") -> dict:
"""在项目范围内回收权限"""
return self.client.call("DetachUserPolicy", {
"UserName": username,
"PolicyName": policy_name,
"PolicyType": policy_type,
"ProjectName": project_name,
"Scope": "Project",
})
# === Deny Policy (project isolation) ===
def _deny_policy_name(self, username: str) -> str:
return f"AirGate_Deny_{username}"
def upsert_deny_policy(self, username: str, allowed_projects: list[str]):
"""创建或更新子账号的 Deny 策略,只允许访问指定项目"""
import json
policy_name = self._deny_policy_name(username)
# Get all projects to build explicit deny list
from .volcengine_client import get_resource_client
res_client = get_resource_client(
self.client.ak, self.client.sk
)
resp = res_client.call("ListProjects", {"Limit": "100"})
all_projects = [
p.get("ProjectName", "") for p in
resp.get("Result", {}).get("Projects", [])
]
if not all_projects:
logger.warning(f"无法获取项目列表,跳过 Deny 策略更新 ({username})")
return
if not allowed_projects:
# No projects, deny everything
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:*"],
"Resource": ["*"],
}]
})
else:
# Build explicit deny list: all projects minus allowed ones
deny_projects = [p for p in all_projects if p not in allowed_projects]
if deny_projects:
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:*"],
"Resource": [f"trn:iam::*:project/{p}" for p in deny_projects],
}]
})
else:
# All projects are allowed, no deny needed
# Create a no-op policy
policy_doc = json.dumps({
"Statement": [{
"Effect": "Deny",
"Action": ["ark:ThisActionDoesNotExist"],
"Resource": ["*"],
}]
})
# Delete old policy (must detach first), then recreate
try:
self.detach_user_policy(username, policy_name, "Custom")
except VolcengineAPIError:
pass # Not attached or doesn't exist
try:
self.client.call("DeletePolicy", {"PolicyName": policy_name})
except VolcengineAPIError:
pass # Policy doesn't exist yet
self.client.call("CreatePolicy", {
"PolicyName": policy_name,
"PolicyDocument": policy_doc,
"Description": f"AirGate 自动生成:限制 {username} 只能访问授权项目",
})
self.attach_user_policy(username, policy_name, "Custom")
def remove_deny_policy(self, username: str):
"""移除子账号的 Deny 策略"""
policy_name = self._deny_policy_name(username)
try:
self.detach_user_policy(username, policy_name, "Custom")
except VolcengineAPIError:
pass
try:
self.client.call("DeletePolicy", {"PolicyName": policy_name})
except VolcengineAPIError:
pass
def _has_login_profile(self, username: str) -> bool:
"""检查用户是否有真实的 LoginProfile火山可能返回空壳"""
try:
resp = self.get_login_profile(username)
profile = resp.get("Result", {}).get("LoginProfile", {})
# Empty shell has CreateDate=19700101 and Password=""
create_date = profile.get("CreateDate", "")
if create_date.startswith("1970") or create_date.startswith("0001"):
return False
return True
except VolcengineAPIError as e:
if "LoginProfileNotExist" in str(e) or "RecordNotFound" in str(e):
return False
raise
def disable_user(self, username: str):
"""完全停用用户:停控制台 + 停所有 AccessKey"""
errors = []
if self._has_login_profile(username):
try:
self.update_login_allowed(username, False)
except VolcengineAPIError as e:
errors.append(f"停用控制台失败: {e}")
try:
keys = self.list_access_keys(username)
for key in keys:
if key.get("Status") == "active":
self.update_access_key(key["AccessKeyId"], "inactive", username)
except VolcengineAPIError as e:
errors.append(f"停用密钥失败: {e}")
if errors:
raise VolcengineAPIError("DisableUser", "PartialFailure", "; ".join(errors))
def enable_user(self, username: str, restore_login: bool = True):
"""恢复用户:恢复控制台(可选) + 恢复所有 AccessKey"""
errors = []
if restore_login and self._has_login_profile(username):
try:
self.update_login_allowed(username, True)
except VolcengineAPIError as e:
errors.append(f"恢复控制台失败: {e}")
try:
keys = self.list_access_keys(username)
for key in keys:
if key.get("Status") == "inactive":
self.update_access_key(key["AccessKeyId"], "active", username)
except VolcengineAPIError as e:
errors.append(f"恢复密钥失败: {e}")
if errors:
raise VolcengineAPIError("EnableUser", "PartialFailure", "; ".join(errors))
class ProjectService:
"""封装火山引擎项目管理 API"""
def __init__(self, ak: str, sk: str):
self.client = get_resource_client(ak, sk)
def list_projects(self) -> list:
"""获取所有项目列表"""
projects = []
page = 1
while True:
resp = self.client.call("ListProjects", {
"PageNumber": str(page),
"PageSize": "50",
})
items = resp.get("Result", {}).get("Projects", [])
if not items:
break
projects.extend(items)
total = resp.get("Result", {}).get("Total", 0)
if len(projects) >= total:
break
page += 1
return projects