seaislee1209 9e81717e08 feat: switch feishu alerts from Webhook to App (private message)
- Replace Webhook with App ID + App Secret + mobile number
- Reuse AirDrama's feishu app (send private card messages)
- Add test button in system settings
- Add test-feishu API endpoint
- Default monitor interval changed to 60 seconds
- Token caching for feishu tenant_access_token

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 21:56:01 +08:00

1453 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""AirGate 核心 API 视图"""
import logging
from datetime import datetime
from decimal import Decimal
from django.db.models import Sum
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from utils.crypto import encrypt, decrypt, make_hint
from utils.iam_service import IAMService, ProjectService
from utils.billing_service import BillingService
from utils.ark_service import ArkService
from utils.volcengine_client import VolcengineAPIError
from .models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord, QuotaAllocation, ArkApiKey
from .serializers import (
VolcAccountSerializer, VolcAccountCreateSerializer,
IAMUserSerializer, IAMUserCreateSerializer, IAMUserImportSerializer,
IAMUserConfigSerializer,
IAMUserProjectSerializer, IAMUserProjectAddSerializer, IAMUserProjectUpdateSerializer,
QuotaAllocateSerializer, QuotaAllocationSerializer,
GlobalConfigSerializer,
AlertRecordSerializer,
ArkApiKeySerializer, ArkApiKeyCreateSerializer,
DashboardSerializer,
)
logger = logging.getLogger(__name__)
def _get_volc_account(volc_id=None):
"""获取主账号,解密密钥"""
if volc_id:
account = VolcAccount.objects.get(pk=volc_id)
else:
account = VolcAccount.objects.filter(is_active=True).first()
if not account:
return None, '', ''
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
return account, ak, sk
def _update_deny_policy(user):
"""更新子账号的 Deny 策略,只允许访问已关联的项目"""
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return
svc = IAMService(ak, sk)
if user.deny_policy_exempt:
# 免除 Deny 策略的账号,移除已有的 Deny 策略
svc.remove_deny_policy(user.username)
return
allowed_projects = list(
user.projects.values_list('project_name', flat=True)
)
try:
svc.upsert_deny_policy(user.username, allowed_projects)
except Exception as e:
logger.error(f"更新 Deny 策略失败 ({user.username}): {e}")
def _refresh_all_deny_policies():
"""刷新所有子账号的 Deny 策略(新建火山项目后调用)"""
users = IAMUser.objects.filter(status=IAMUser.Status.ACTIVE)
for user in users:
if user.projects.exists():
_update_deny_policy(user)
# ==================== Dashboard ====================
@api_view(['GET'])
def dashboard_view(request):
total = IAMUser.objects.count()
active = IAMUser.objects.filter(status=IAMUser.Status.ACTIVE).count()
disabled = IAMUser.objects.filter(status=IAMUser.Status.DISABLED).count()
monitored = IAMUser.objects.filter(monitor_enabled=True).count()
total_spending = IAMUser.objects.aggregate(
total=Sum('consumed_total'))['total'] or Decimal('0')
recent_alerts = AlertRecord.objects.all()[:10]
data = {
'total_users': total,
'active_users': active,
'disabled_users': disabled,
'monitored_users': monitored,
'total_spending': total_spending,
'recent_alerts': AlertRecordSerializer(recent_alerts, many=True).data,
}
return Response(data)
# ==================== Volcengine Account ====================
@api_view(['GET', 'POST'])
def volc_account_view(request):
if request.method == 'GET':
accounts = VolcAccount.objects.all()
return Response(VolcAccountSerializer(accounts, many=True).data)
serializer = VolcAccountCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
account = VolcAccount.objects.create(
name=d['name'],
access_key_enc=encrypt(d['access_key']),
secret_key_enc=encrypt(d['secret_key']),
access_key_hint=make_hint(d['access_key']),
)
return Response(VolcAccountSerializer(account).data, status=status.HTTP_201_CREATED)
@api_view(['PUT', 'DELETE'])
def volc_account_detail_view(request, pk):
try:
account = VolcAccount.objects.get(pk=pk)
except VolcAccount.DoesNotExist:
return Response({'error': 'not_found', 'message': '主账号不存在'},
status=status.HTTP_404_NOT_FOUND)
if request.method == 'DELETE':
account.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# PUT: update
name = request.data.get('name')
if name:
account.name = name
ak = request.data.get('access_key')
sk = request.data.get('secret_key')
if ak:
account.access_key_enc = encrypt(ak)
account.access_key_hint = make_hint(ak)
if sk:
account.secret_key_enc = encrypt(sk)
account.save()
return Response(VolcAccountSerializer(account).data)
@api_view(['POST'])
def volc_account_test_view(request, pk):
"""测试主账号密钥是否有效"""
try:
account = VolcAccount.objects.get(pk=pk)
except VolcAccount.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
try:
svc = IAMService(ak, sk)
svc.list_users(limit=1)
return Response({'status': 'ok', 'message': '密钥验证成功'})
except VolcengineAPIError as e:
return Response({'status': 'error', 'message': str(e)},
status=status.HTTP_400_BAD_REQUEST)
# ==================== IAM Users ====================
@api_view(['GET'])
def iam_user_list_view(request):
users = IAMUser.objects.select_related('volc_account').all()
status_filter = request.query_params.get('status')
if status_filter:
users = users.filter(status=status_filter)
return Response(IAMUserSerializer(users, many=True).data)
@api_view(['POST'])
def iam_user_sync_view(request):
"""从火山引擎同步所有已有 IAM 用户"""
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
imported = []
volc_usernames = set()
offset = 0
while True:
try:
resp = svc.list_users(limit=100, offset=offset)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
users = resp.get("Result", {}).get("UserMetadata", [])
if not users:
break
for u in users:
username = u.get("UserName", "")
volc_usernames.add(username)
obj, created = IAMUser.objects.update_or_create(
volc_account=account,
username=username,
defaults={
'display_name': u.get("DisplayName", ""),
'user_id': u.get("UserId", ""),
'email': u.get("Email", ""),
'phone': u.get("MobilePhone", ""),
},
)
if created:
imported.append(username)
# Sync access keys
try:
keys = svc.list_access_keys(username)
obj.access_key_ids = [k["AccessKeyId"] for k in keys]
except Exception:
pass
# Sync account status: check both user status and AK status
volc_status = u.get("Status", "active")
if volc_status != "active":
obj.status = IAMUser.Status.DISABLED
else:
# User is active, but check if all AKs are inactive (stopped by AirGate)
all_inactive = False
try:
keys = svc.list_access_keys(username)
if keys and all(k.get("Status") == "inactive" for k in keys):
all_inactive = True
except Exception:
pass
obj.status = IAMUser.Status.DISABLED if all_inactive else IAMUser.Status.ACTIVE
# Sync volc login status separately
try:
profile = svc.get_login_profile(username)
lp = profile.get("Result", {}).get("LoginProfile", {})
create_date = lp.get("CreateDate", "")
if create_date.startswith("1970") or create_date.startswith("0001"):
obj.volc_login_allowed = False
else:
obj.volc_login_allowed = lp.get("LoginAllowed", False)
except Exception:
obj.volc_login_allowed = False
obj.save()
offset += 100
total = resp.get("Result", {}).get("Total", 0)
if offset >= total:
break
# 删除火山已不存在的用户(本地有但火山没有)
removed = []
local_users = IAMUser.objects.filter(volc_account=account)
for local_user in local_users:
if local_user.username not in volc_usernames:
removed.append(local_user.username)
local_user.delete()
total_count = IAMUser.objects.filter(volc_account=account).count()
msg = f'同步完成,共 {total_count} 个用户,新导入 {len(imported)}'
if removed:
msg += f',清理 {len(removed)} 个已删除用户({", ".join(removed)}'
return Response({
'message': msg,
'imported': imported,
'removed': removed,
'total': total_count,
})
@api_view(['POST'])
def iam_user_create_view(request):
"""在火山引擎创建新的 IAM 子账号并纳入管理"""
serializer = IAMUserCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
# 1. Create user on Volcengine
try:
resp = svc.create_user(
username=d['username'],
display_name=d.get('display_name', ''),
email=d.get('email', ''),
phone=d.get('phone', ''),
)
except VolcengineAPIError as e:
if 'UserAlreadyExists' in e.code or 'EntityAlreadyExists' in e.code:
return Response({'error': 'user_exists',
'message': f"火山引擎上已存在用户 {d['username']},请使用「同步已有用户」导入"},
status=status.HTTP_409_CONFLICT)
return Response({'error': 'create_failed', 'message': f'创建用户失败: {e}'},
status=status.HTTP_502_BAD_GATEWAY)
volc_user = resp.get("Result", {}).get("User", {})
result_info = {'username': d['username']}
# 2. Create login profile if password provided
password = d.get('password', '')
if password:
try:
svc.create_login_profile(d['username'], password)
result_info['login_enabled'] = True
result_info['volc_login_allowed'] = True
except VolcengineAPIError as e:
if 'InvalidPassword' in str(e):
# Rollback: delete the user we just created
try:
svc.client.call("DeleteUser", {"UserName": d['username']})
except Exception:
pass
return Response({
'message': f'火山控制台密码不符合要求需包含大小写字母、数字和特殊字符至少8位',
'detail': str(e),
}, status=status.HTTP_400_BAD_REQUEST)
result_info['login_error'] = str(e)
# 3. Create access key
try:
ak_resp = svc.create_access_key(d['username'])
ak_data = ak_resp.get("Result", {}).get("AccessKey", {})
result_info['access_key_id'] = ak_data.get("AccessKeyId", "")
result_info['secret_access_key'] = ak_data.get("SecretAccessKey", "")
result_info['secret_key_warning'] = "SecretAccessKey 仅此一次显示,请立即保存!"
except VolcengineAPIError as e:
result_info['access_key_error'] = str(e)
# 4. Attach basic policies
for policy in ['AccessKeySelfManageAccess']:
try:
svc.attach_user_policy(d['username'], policy, 'System')
except VolcengineAPIError:
pass
# 5. Save to local DB
obj = IAMUser.objects.create(
volc_account=account,
username=d['username'],
display_name=d.get('display_name', ''),
user_id=volc_user.get("UserId", ""),
email=d.get('email', ''),
phone=d.get('phone', ''),
status=IAMUser.Status.ACTIVE,
access_key_ids=[result_info.get('access_key_id', '')] if result_info.get('access_key_id') else [],
volc_login_allowed=result_info.get('volc_login_allowed', False),
)
# 6. Auto-add project if specified
project_name = d.get('project_name', '')
if project_name:
IAMUserProject.objects.create(
iam_user=obj,
project_name=project_name,
monitor_enabled=True,
)
# 7. Create Deny policy (project isolation) + refresh all users
_refresh_all_deny_policies()
AlertRecord.objects.create(
iam_user=obj,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"创建子账号 {d['username']}",
content=f"操作人: {request.user.username}",
)
return Response({
'message': f"子账号 {d['username']} 创建成功",
'user': IAMUserSerializer(obj).data,
'volcengine': result_info,
}, status=status.HTTP_201_CREATED)
@api_view(['POST'])
def iam_user_import_view(request):
"""导入指定的已有 IAM 用户"""
serializer = IAMUserImportSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
username = serializer.validated_data['username']
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
resp = svc.get_user(username)
except VolcengineAPIError as e:
return Response({'error': 'user_not_found', 'message': f'火山引擎未找到用户: {e}'},
status=status.HTTP_404_NOT_FOUND)
u = resp.get("Result", {}).get("User", {})
obj, created = IAMUser.objects.update_or_create(
volc_account=account,
username=username,
defaults={
'display_name': u.get("DisplayName", ""),
'user_id': u.get("UserId", ""),
'email': u.get("Email", ""),
'phone': u.get("MobilePhone", ""),
},
)
return Response({
'message': '导入成功' if created else '用户已存在,已更新信息',
'user': IAMUserSerializer(obj).data,
})
@api_view(['GET'])
def iam_user_detail_view(request, pk):
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
return Response(IAMUserSerializer(user).data)
@api_view(['PUT'])
def iam_user_update_view(request, pk):
"""更新子账号的本地配置(阈值、开关等)"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserConfigSerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
deny_changed = 'deny_policy_exempt' in serializer.validated_data and \
serializer.validated_data['deny_policy_exempt'] != user.deny_policy_exempt
for field, value in serializer.validated_data.items():
setattr(user, field, value)
user.save()
if deny_changed:
_update_deny_policy(user)
return Response(IAMUserSerializer(user).data)
@api_view(['POST'])
def iam_user_toggle_volc_login_view(request, pk):
"""切换子账号的火山控制台登录权限"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account = user.volc_account
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
iam = IAMService(ak, sk)
# Check current status
try:
resp = iam.get_login_profile(user.username)
current = resp.get('Result', {}).get('LoginProfile', {}).get('LoginAllowed', False)
except VolcengineAPIError as e:
if 'LoginProfileNotExist' in str(e):
return Response({'message': '该子账号未设置火山控制台密码,无法切换登录状态'},
status=status.HTTP_400_BAD_REQUEST)
raise
new_status = not current
try:
iam.update_login_allowed(user.username, new_status)
except VolcengineAPIError as e:
return Response({'message': f'操作失败: {e}'}, status=status.HTTP_400_BAD_REQUEST)
user.volc_login_allowed = new_status
user.save(update_fields=['volc_login_allowed'])
action = '开启' if new_status else '关闭'
AlertRecord.objects.create(
iam_user=user, alert_type='manual',
title=f'{action}火山控制台登录 {user.username}',
content=f'操作人: {request.user.username}',
)
return Response({'message': f'{action} {user.username} 的火山控制台登录',
'volc_login_allowed': new_status})
@api_view(['POST'])
def iam_user_edit_profile_view(request, pk):
"""编辑子账号信息(显示名、手机号、邮箱),同步到火山"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
display_name = request.data.get('display_name')
email = request.data.get('email')
phone = request.data.get('phone')
# Update on Volcengine
account = user.volc_account
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
iam = IAMService(ak, sk)
# Only pass non-empty values to Volcengine (empty strings are rejected)
try:
iam.update_user(user.username,
display_name=display_name if display_name else None,
email=email if email else None,
phone=phone if phone else None)
except VolcengineAPIError as e:
return Response({'message': f'火山 API 更新失败: {e}'},
status=status.HTTP_400_BAD_REQUEST)
# Update locally
if display_name is not None:
user.display_name = display_name
if email is not None:
user.email = email
if phone is not None:
user.phone = phone
user.save()
AlertRecord.objects.create(
iam_user=user, alert_type='manual',
title=f'编辑子账号信息 {user.username}',
content=f'操作人: {request.user.username}',
)
return Response({'message': '已更新', 'user': IAMUserSerializer(user).data})
@api_view(['POST'])
def iam_user_set_login_view(request, pk):
"""设置子账号的 AirGate 登录密码"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
password = request.data.get('password', '')
enabled = request.data.get('login_enabled')
if password:
if len(password) < 6:
return Response({'error': 'weak_password', 'message': '密码至少6位'},
status=status.HTTP_400_BAD_REQUEST)
user.set_login_password(password)
user.login_enabled = True
if enabled is not None:
user.login_enabled = enabled
user.save(update_fields=['login_password_hash', 'login_enabled'])
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"设置子账号 {user.username} 的 AirGate 登录",
content=f"操作人: {request.user.username},登录: {'开启' if user.login_enabled else '关闭'}",
)
return Response({
'message': f'{"开启" if user.login_enabled else "关闭"}子账号 {user.username} 的 AirGate 登录',
'login_enabled': user.login_enabled,
})
@api_view(['POST'])
def iam_user_disable_view(request, pk):
"""停用子账号"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
# 1. 停用控制台 + API 密钥
svc.disable_user(user.username)
# 2. 移除所有权限策略并保存快照(恢复时加回)
saved_policies = []
detach_errors = []
# 2a. 全局策略
try:
resp = svc.list_attached_user_policies(user.username)
policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
for p in policies:
pname = p.get("PolicyName", "")
ptype = p.get("PolicyType", "")
try:
svc.detach_user_policy(user.username, pname, ptype)
saved_policies.append({"name": pname, "type": ptype, "scope": "global"})
except VolcengineAPIError as detach_err:
detach_errors.append(f"{pname}(global): {detach_err}")
except VolcengineAPIError:
pass
# 2b. 项目级策略
for proj in user.projects.all():
try:
resp = svc.client.call('ListAttachedUserPolicies', {
'UserName': user.username,
'ProjectName': proj.project_name,
})
proj_policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
for p in proj_policies:
pname = p.get("PolicyName", "")
ptype = p.get("PolicyType", "")
try:
svc.detach_policy_in_project(user.username, pname, proj.project_name, ptype)
saved_policies.append({"name": pname, "type": ptype, "scope": "project", "project": proj.project_name})
except VolcengineAPIError as detach_err:
detach_errors.append(f"{pname}({proj.project_name}): {detach_err}")
except VolcengineAPIError:
pass
user.status = IAMUser.Status.DISABLED
# 在策略快照里记住停用前的火山登录状态
saved_policies.append({"_volc_login_was": user.volc_login_allowed})
user.saved_policies_on_disable = saved_policies
user.volc_login_allowed = False
user.save(update_fields=['status', 'saved_policies_on_disable', 'volc_login_allowed'])
policy_count = len(saved_policies)
error_info = f",移除失败: {detach_errors}" if detach_errors else ""
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"手动停用子账号 {user.username}",
content=f"操作人: {request.user.username},已移除 {policy_count} 个权限策略{error_info}",
)
msg = f'用户 {user.username} 已停用,{policy_count} 个权限策略已移除'
result = {'message': msg}
if detach_errors:
result['warnings'] = detach_errors
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_enable_view(request, pk):
"""恢复子账号"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
# 从快照中提取停用前的火山登录状态
saved_policies = user.saved_policies_on_disable or []
restore_login = False
actual_policies = []
for p in saved_policies:
if "_volc_login_was" in p:
restore_login = p["_volc_login_was"]
else:
actual_policies.append(p)
# 1. 恢复 API 密钥 + 控制台(按停用前状态)
svc.enable_user(user.username, restore_login=restore_login)
# 2. 重新附加停用时保存的策略(按原始位置:全局或项目级)
restored_count = 0
restore_errors = []
for p in actual_policies:
try:
if p.get("scope") == "project" and p.get("project"):
svc.attach_policy_in_project(user.username, p["name"], p["project"], p["type"])
else:
svc.attach_user_policy(user.username, p["name"], p["type"])
restored_count += 1
except VolcengineAPIError as restore_err:
restore_errors.append(f"{p['name']}: {restore_err}")
# 3. 重建 Deny 策略(项目隔离)
_update_deny_policy(user)
user.status = IAMUser.Status.ACTIVE
user.saved_policies_on_disable = []
user.volc_login_allowed = restore_login
user.save(update_fields=['status', 'saved_policies_on_disable', 'volc_login_allowed'])
error_info = f",恢复失败: {restore_errors}" if restore_errors else ""
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"手动恢复子账号 {user.username}",
content=f"操作人: {request.user.username},已恢复 {restored_count} 个权限策略{error_info}",
)
msg = f'用户 {user.username} 已恢复,{restored_count} 个权限策略已恢复'
result = {'message': msg}
if restore_errors:
result['warnings'] = restore_errors
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['GET'])
def iam_user_policies_overview_view(request, pk):
"""查看子账号的完整权限总览(全局 + 各项目)"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
# Get all policies
resp = svc.list_attached_user_policies(user.username)
all_policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
# Separate global vs project
global_policies = []
for p in all_policies:
scopes = p.get('PolicyScope', [])
is_global = not scopes or any(s.get('PolicyScopeType') == 'Global' for s in scopes)
if is_global:
global_policies.append({
'name': p.get('PolicyName', ''),
'type': p.get('PolicyType', ''),
'description': p.get('Description', ''),
})
# Get project-level policies for each associated project
project_policies = []
for proj in user.projects.all():
try:
resp2 = svc.client.call('ListAttachedUserPolicies', {
'UserName': user.username,
'ProjectName': proj.project_name,
})
proj_items = []
for p in resp2.get('Result', {}).get('AttachedPolicyMetadata', []):
scopes = p.get('PolicyScope', [])
for s in scopes:
if s.get('PolicyScopeType') == 'Project' and s.get('ProjectName') == proj.project_name:
proj_items.append({
'name': p.get('PolicyName', ''),
'type': p.get('PolicyType', ''),
'description': p.get('Description', ''),
})
break
project_policies.append({
'project_name': proj.project_name,
'display_name': proj.display_name,
'project_id': proj.id,
'monitor_enabled': proj.monitor_enabled,
'current_spending': str(proj.current_spending),
'policies': proj_items,
})
except Exception:
project_policies.append({
'project_name': proj.project_name,
'display_name': proj.display_name,
'project_id': proj.id,
'monitor_enabled': proj.monitor_enabled,
'current_spending': str(proj.current_spending),
'policies': [],
})
return Response({
'username': user.username,
'display_name': user.display_name,
'global_policies': global_policies,
'project_policies': project_policies,
})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
def iam_user_policies_view(request, pk):
"""查看子账号的权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
resp = svc.list_attached_user_policies(user.username)
all_policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
# 只返回全局策略(过滤项目级的)
global_policies = []
for p in all_policies:
scopes = p.get('PolicyScope', [])
is_global = not scopes or any(s.get('PolicyScopeType') == 'Global' for s in scopes)
if is_global:
global_policies.append(p)
return Response({'policies': global_policies})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_attach_policy_view(request, pk):
"""给子账号附加权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
policy_name = request.data.get('policy_name', '')
policy_type = request.data.get('policy_type', 'System')
if not policy_name:
return Response({'error': 'missing_policy_name', 'message': '请指定策略名'},
status=status.HTTP_400_BAD_REQUEST)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
svc.attach_user_policy(user.username, policy_name, policy_type)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"附加策略 {policy_name}{user.username}",
content=f"操作人: {request.user.username},策略类型: {policy_type}",
)
return Response({'message': f'已附加策略 {policy_name}'})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_detach_policy_view(request, pk):
"""从子账号移除权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
policy_name = request.data.get('policy_name', '')
policy_type = request.data.get('policy_type', 'System')
if not policy_name:
return Response({'error': 'missing_policy_name', 'message': '请指定策略名'},
status=status.HTTP_400_BAD_REQUEST)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
svc.detach_user_policy(user.username, policy_name, policy_type)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"移除策略 {policy_name}{user.username}",
content=f"操作人: {request.user.username},策略类型: {policy_type}",
)
return Response({'message': f'已移除策略 {policy_name}'})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
# ==================== IAM User Projects ====================
@api_view(['GET'])
def iam_user_project_list_view(request, pk):
"""查看子账号关联的项目列表(实时从火山同步项目级策略)"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
projects = user.projects.all()
# 实时从火山查询每个项目的策略,同步到本地(只取项目级的,过滤全局的)
account, ak, sk = _get_volc_account(user.volc_account_id)
if ak:
svc = IAMService(ak, sk)
for proj in projects:
try:
resp = svc.client.call('ListAttachedUserPolicies', {
'UserName': user.username,
'ProjectName': proj.project_name,
})
# 只保留 PolicyScopeType=Project 的策略,过滤掉全局的
volc_policies = []
for p in resp.get('Result', {}).get('AttachedPolicyMetadata', []):
scopes = p.get('PolicyScope', [])
for s in scopes:
if s.get('PolicyScopeType') == 'Project' and s.get('ProjectName') == proj.project_name:
volc_policies.append(p.get('PolicyName', ''))
break
if set(volc_policies) != set(proj.attached_policies or []):
proj.attached_policies = volc_policies
proj.save(update_fields=['attached_policies'])
except Exception:
pass
return Response(IAMUserProjectSerializer(projects, many=True).data)
@api_view(['POST'])
def iam_user_project_add_view(request, pk):
"""给子账号添加关联项目:加入监测 + 自动在项目范围内授权"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserProjectAddSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
obj, created = IAMUserProject.objects.get_or_create(
iam_user=user,
project_name=d['project_name'],
defaults={
'display_name': d.get('display_name', ''),
'monitor_enabled': d.get('monitor_enabled', True),
},
)
if not created:
return Response({'error': 'duplicate', 'message': f'项目 {d["project_name"]} 已关联'},
status=status.HTTP_409_CONFLICT)
# 在项目范围内授权前端指定的策略(如果传入了)
account, ak, sk = _get_volc_account(user.volc_account_id)
attached = []
auth_errors = []
policies_to_attach = d.get('policies', [])
if ak and policies_to_attach:
svc = IAMService(ak, sk)
for policy_name in policies_to_attach:
try:
svc.attach_policy_in_project(user.username, policy_name,
d['project_name'])
attached.append(policy_name)
except VolcengineAPIError as e:
auth_errors.append(f"{policy_name}: {e}")
obj.attached_policies = attached
obj.save(update_fields=['attached_policies'])
# 更新所有子账号的 Deny 策略(新项目需要加入其他人的拒绝列表)
_refresh_all_deny_policies()
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"添加项目 {d['project_name']}{user.username}",
content=f"操作人: {request.user.username},已授权策略: {attached}"
+ (f",授权失败: {auth_errors}" if auth_errors else ""),
)
result = {
'message': f'已关联项目 {d["project_name"]}',
'project': IAMUserProjectSerializer(obj).data,
'attached_policies': attached,
}
if auth_errors:
result['auth_errors'] = auth_errors
return Response(result, status=status.HTTP_201_CREATED)
@api_view(['PUT'])
def iam_user_project_update_view(request, pk, pid):
"""更新项目监测开关"""
try:
project = IAMUserProject.objects.get(pk=pid, iam_user_id=pk)
except IAMUserProject.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserProjectUpdateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
project.monitor_enabled = serializer.validated_data['monitor_enabled']
project.save(update_fields=['monitor_enabled'])
return Response(IAMUserProjectSerializer(project).data)
@api_view(['PUT'])
def iam_user_project_policies_view(request, pk, pid):
"""更新项目级授权策略(增量对比:移除旧的、添加新的)"""
try:
project = IAMUserProject.objects.get(pk=pid, iam_user_id=pk)
user = project.iam_user
except IAMUserProject.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
new_policies = request.data.get('policies', [])
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
# Get actual current policies from Volcengine (not local DB)
actual_old = []
try:
resp = svc.client.call('ListAttachedUserPolicies', {
'UserName': user.username,
'ProjectName': project.project_name,
})
for p in resp.get('Result', {}).get('AttachedPolicyMetadata', []):
scopes = p.get('PolicyScope', [])
for s in scopes:
if s.get('PolicyScopeType') == 'Project' and s.get('ProjectName') == project.project_name:
actual_old.append(p.get('PolicyName', ''))
break
except Exception:
actual_old = project.attached_policies or []
attached = []
detached = []
errors = []
# Remove policies that were removed
to_remove = [p for p in actual_old if p not in new_policies]
for policy_name in to_remove:
try:
svc.detach_policy_in_project(user.username, policy_name, project.project_name)
detached.append(policy_name)
except VolcengineAPIError as e:
errors.append(f"移除 {policy_name}: {e}")
# Add policies that are new
to_add = [p for p in new_policies if p not in actual_old]
for policy_name in to_add:
try:
svc.attach_policy_in_project(user.username, policy_name, project.project_name)
attached.append(policy_name)
except VolcengineAPIError as e:
if 'PolicyAttachConflict' in str(e):
attached.append(policy_name)
else:
errors.append(f"添加 {policy_name}: {e}")
project.attached_policies = new_policies
project.save(update_fields=['attached_policies'])
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"更新项目 {project.project_name} 授权策略",
content=f"操作人: {request.user.username},添加: {attached},移除: {detached}"
+ (f",失败: {errors}" if errors else ""),
)
result = {'message': f'已更新,添加 {len(attached)} 个、移除 {len(detached)} 个策略',
'project': IAMUserProjectSerializer(project).data}
if errors:
result['warnings'] = errors
return Response(result)
@api_view(['DELETE'])
def iam_user_project_delete_view(request, pk, pid):
"""移除关联项目:回收权限 + 移出监测"""
try:
project = IAMUserProject.objects.get(pk=pid, iam_user_id=pk)
user = project.iam_user
except IAMUserProject.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
name = project.project_name
detached = []
detach_errors = []
# 回收之前自动授权的策略
account, ak, sk = _get_volc_account(user.volc_account_id)
if ak and project.attached_policies:
svc = IAMService(ak, sk)
for policy_name in project.attached_policies:
try:
svc.detach_policy_in_project(user.username, policy_name, name)
detached.append(policy_name)
except VolcengineAPIError as e:
detach_errors.append(f"{policy_name}: {e}")
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"移除项目 {name}{user.username}",
content=f"操作人: {request.user.username},已回收策略: {detached}"
+ (f",回收失败: {detach_errors}" if detach_errors else ""),
)
project.delete()
# 更新所有子账号的 Deny 策略
_refresh_all_deny_policies()
result = {'message': f'已移除项目 {name},已回收权限: {detached}'}
if detach_errors:
result['detach_errors'] = detach_errors
return Response(result)
@api_view(['POST'])
def iam_user_project_toggle_all_view(request, pk):
"""批量切换所有项目的监测开关"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
enable = request.data.get('monitor_enabled', True)
count = user.projects.update(monitor_enabled=enable)
label = '开启' if enable else '关闭'
return Response({'message': f'{label}全部 {count} 个项目的监测'})
# ==================== Quota Allocation ====================
@api_view(['POST'])
def quota_allocate_view(request, pk):
"""额度变更:正数=追加,负数=扣减"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = QuotaAllocateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
amount = serializer.validated_data['amount']
note = serializer.validated_data.get('note', '')
new_total = user.allocated_quota + amount
# 扣减保护:总额度不能低于已消费金额
if new_total < user.consumed_total:
return Response({
'error': 'quota_underflow',
'message': (
f'扣减后总额度 ¥{new_total:.2f} 低于已消费 ¥{user.consumed_total:.2f}'
f'最多可扣减 ¥{user.allocated_quota - user.consumed_total:.2f}'
),
}, status=status.HTTP_400_BAD_REQUEST)
user.allocated_quota = new_total
user.triggered_alerts = [] # 额度变更时重置告警状态
user.save(update_fields=['allocated_quota', 'triggered_alerts'])
is_deduct = amount < 0
action_label = f"扣减额度 ¥{abs(amount)}" if is_deduct else f"追加额度 ¥{amount}"
allocation = QuotaAllocation.objects.create(
iam_user=user,
amount=amount,
total_after=user.allocated_quota,
note=note,
created_by=request.user.username,
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"{action_label}{user.username}",
content=f"操作人: {request.user.username},变更后总额度: ¥{user.allocated_quota},备注: {note}",
)
return Response({
'message': f'{action_label},当前总额度 ¥{user.allocated_quota}',
'user': IAMUserSerializer(user).data,
'allocation': QuotaAllocationSerializer(allocation).data,
})
@api_view(['GET'])
def quota_history_view(request, pk):
"""查看子账号的额度划拨历史"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
allocations = QuotaAllocation.objects.filter(iam_user=user)
return Response(QuotaAllocationSerializer(allocations, many=True).data)
# ==================== Billing ====================
@api_view(['GET'])
def spending_overview_view(request):
"""消费总览"""
bill_period = request.query_params.get('period', datetime.now().strftime("%Y-%m"))
users = IAMUser.objects.all().order_by('-consumed_total')
return Response({
'period': bill_period,
'users': IAMUserSerializer(users, many=True).data,
})
@api_view(['POST'])
def spending_refresh_view(request):
"""手动刷新消费数据"""
from utils.scheduler import check_spending
try:
check_spending()
return Response({'message': '消费数据刷新完成'})
except Exception as e:
return Response({'error': 'refresh_failed', 'message': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def balance_view(request):
"""查询主账号余额"""
account, ak, sk = _get_volc_account()
if not ak:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
try:
billing = BillingService(ak, sk)
result = billing.get_balance()
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
# ==================== Global Config ====================
@api_view(['GET', 'PUT'])
def global_config_view(request):
config = GlobalConfig.get_solo()
if request.method == 'GET':
return Response(GlobalConfigSerializer(config).data)
serializer = GlobalConfigSerializer(config, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
@api_view(['POST'])
def test_feishu_view(request):
"""测试飞书通知"""
config = GlobalConfig.get_solo()
app_id = config.feishu_app_id
app_secret = config.feishu_app_secret
mobile = request.data.get('mobile', '') or (config.feishu_alert_mobiles or '').split(',')[0].strip()
if not app_id or not app_secret:
return Response({'message': '请先配置飞书 App ID 和 App Secret'},
status=status.HTTP_400_BAD_REQUEST)
if not mobile:
return Response({'message': '请填写接收人手机号'},
status=status.HTTP_400_BAD_REQUEST)
from utils.feishu import send_feishu_test
success, msg = send_feishu_test(app_id, app_secret, mobile)
if success:
return Response({'message': msg})
return Response({'message': msg}, status=status.HTTP_400_BAD_REQUEST)
# ==================== Alerts ====================
@api_view(['GET'])
def alert_list_view(request):
alerts = AlertRecord.objects.select_related('iam_user').all()
alert_type = request.query_params.get('type')
if alert_type:
alerts = alerts.filter(alert_type=alert_type)
limit = int(request.query_params.get('limit', 50))
return Response(AlertRecordSerializer(alerts[:limit], many=True).data)
# ==================== Projects ====================
@api_view(['GET'])
def project_list_view(request):
"""从火山引擎拉取项目列表"""
account, ak, sk = _get_volc_account()
if not ak:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
try:
svc = ProjectService(ak, sk)
projects = svc.list_projects()
result = [
{
'name': p.get('ProjectName', ''),
'display_name': p.get('DisplayName', p.get('ProjectName', '')),
'description': p.get('Description', ''),
}
for p in projects
]
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
# ==================== Ark API Key Management (手动录入模式) ====================
@api_view(['GET'])
def ark_key_list_view(request):
"""列出 API Key管理员看全部子账号看自己的"""
keys = ArkApiKey.objects.select_related('iam_user').all()
# 按子账号筛选
iam_user_id = request.query_params.get('iam_user_id')
if iam_user_id:
keys = keys.filter(iam_user_id=iam_user_id)
# 按项目筛选
project_name = request.query_params.get('project_name')
if project_name:
keys = keys.filter(project_name=project_name)
return Response(ArkApiKeySerializer(keys, many=True).data)
@api_view(['POST'])
def ark_key_create_view(request):
"""录入 API Key管理员操作"""
serializer = ArkApiKeyCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
try:
iam_user = IAMUser.objects.get(pk=d['iam_user_id'])
except IAMUser.DoesNotExist:
return Response({'error': 'not_found', 'message': '子账号不存在'},
status=status.HTTP_404_NOT_FOUND)
api_key_raw = d['api_key']
obj = ArkApiKey.objects.create(
iam_user=iam_user,
project_name=d['project_name'],
key_name=d['key_name'],
api_key_enc=encrypt(api_key_raw),
api_key_hint=make_hint(api_key_raw),
remark=d.get('remark', ''),
created_by=request.user.username,
)
AlertRecord.objects.create(
iam_user=iam_user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"录入 API Key: {d['key_name']}",
content=f"操作人: {request.user.username},项目: {d['project_name']}",
)
return Response({
'message': f'API Key "{d["key_name"]}" 录入成功',
'key': ArkApiKeySerializer(obj).data,
}, status=status.HTTP_201_CREATED)
@api_view(['PUT'])
def ark_key_update_view(request, pk):
"""更新 API Key启用/停用/改备注)"""
try:
obj = ArkApiKey.objects.get(pk=pk)
except ArkApiKey.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
new_status = request.data.get('status')
if new_status and new_status in ('active', 'disabled'):
old_status = obj.status
obj.status = new_status
action = '启用' if new_status == 'active' else '停用'
AlertRecord.objects.create(
iam_user=obj.iam_user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"{action} API Key: {obj.key_name}",
content=f"操作人: {request.user.username}",
)
remark = request.data.get('remark')
if remark is not None:
obj.remark = remark
obj.save()
return Response(ArkApiKeySerializer(obj).data)
@api_view(['DELETE'])
def ark_key_delete_view(request, pk):
"""删除 API Key"""
try:
obj = ArkApiKey.objects.get(pk=pk)
except ArkApiKey.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
AlertRecord.objects.create(
iam_user=obj.iam_user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"删除 API Key: {obj.key_name}",
content=f"操作人: {request.user.username},项目: {obj.project_name}",
)
obj.delete()
return Response({'message': 'API Key 已删除'})
@api_view(['GET'])
def ark_key_reveal_view(request, pk):
"""查看完整 API Key解密展示"""
try:
obj = ArkApiKey.objects.get(pk=pk)
except ArkApiKey.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
AlertRecord.objects.create(
iam_user=obj.iam_user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"查看 API Key 明文: {obj.key_name}",
content=f"操作人: {request.user.username}",
)
return Response({
'api_key': decrypt(obj.api_key_enc),
'key_name': obj.key_name,
'project_name': obj.project_name,
})