seaislee1209 a7e030dc57 feat: auto-authorize policies when adding projects to sub-accounts
- Disable now removes all policies (saved to DB) + Enable restores them
- Project add: policies are now user-selected (checkbox), not auto-attached
- Fix serializer allow_blank for optional fields (email/phone/password)
- Better error reporting for policy detach/attach failures
- Handle duplicate user creation with clear error message

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 15:01:18 +08:00

864 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""AirGate 核心 API 视图"""
import logging
from datetime import datetime
from decimal import Decimal
from django.db.models import Sum
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from utils.crypto import encrypt, decrypt, make_hint
from utils.iam_service import IAMService, ProjectService
from utils.billing_service import BillingService
from utils.volcengine_client import VolcengineAPIError
from .models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord, QuotaAllocation
from .serializers import (
VolcAccountSerializer, VolcAccountCreateSerializer,
IAMUserSerializer, IAMUserCreateSerializer, IAMUserImportSerializer,
IAMUserConfigSerializer,
IAMUserProjectSerializer, IAMUserProjectAddSerializer, IAMUserProjectUpdateSerializer,
QuotaAllocateSerializer, QuotaAllocationSerializer,
GlobalConfigSerializer,
AlertRecordSerializer,
DashboardSerializer,
)
logger = logging.getLogger(__name__)
def _get_volc_account(volc_id=None):
"""获取主账号,解密密钥"""
if volc_id:
account = VolcAccount.objects.get(pk=volc_id)
else:
account = VolcAccount.objects.filter(is_active=True).first()
if not account:
return None, '', ''
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
return account, ak, sk
# ==================== Dashboard ====================
@api_view(['GET'])
def dashboard_view(request):
total = IAMUser.objects.count()
active = IAMUser.objects.filter(status=IAMUser.Status.ACTIVE).count()
disabled = IAMUser.objects.filter(status=IAMUser.Status.DISABLED).count()
monitored = IAMUser.objects.filter(monitor_enabled=True).count()
total_spending = IAMUser.objects.aggregate(
total=Sum('consumed_total'))['total'] or Decimal('0')
recent_alerts = AlertRecord.objects.all()[:10]
data = {
'total_users': total,
'active_users': active,
'disabled_users': disabled,
'monitored_users': monitored,
'total_spending': total_spending,
'recent_alerts': AlertRecordSerializer(recent_alerts, many=True).data,
}
return Response(data)
# ==================== Volcengine Account ====================
@api_view(['GET', 'POST'])
def volc_account_view(request):
if request.method == 'GET':
accounts = VolcAccount.objects.all()
return Response(VolcAccountSerializer(accounts, many=True).data)
serializer = VolcAccountCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
account = VolcAccount.objects.create(
name=d['name'],
access_key_enc=encrypt(d['access_key']),
secret_key_enc=encrypt(d['secret_key']),
access_key_hint=make_hint(d['access_key']),
)
return Response(VolcAccountSerializer(account).data, status=status.HTTP_201_CREATED)
@api_view(['PUT', 'DELETE'])
def volc_account_detail_view(request, pk):
try:
account = VolcAccount.objects.get(pk=pk)
except VolcAccount.DoesNotExist:
return Response({'error': 'not_found', 'message': '主账号不存在'},
status=status.HTTP_404_NOT_FOUND)
if request.method == 'DELETE':
account.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# PUT: update
name = request.data.get('name')
if name:
account.name = name
ak = request.data.get('access_key')
sk = request.data.get('secret_key')
if ak:
account.access_key_enc = encrypt(ak)
account.access_key_hint = make_hint(ak)
if sk:
account.secret_key_enc = encrypt(sk)
account.save()
return Response(VolcAccountSerializer(account).data)
@api_view(['POST'])
def volc_account_test_view(request, pk):
"""测试主账号密钥是否有效"""
try:
account = VolcAccount.objects.get(pk=pk)
except VolcAccount.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
ak = decrypt(account.access_key_enc)
sk = decrypt(account.secret_key_enc)
try:
svc = IAMService(ak, sk)
svc.list_users(limit=1)
return Response({'status': 'ok', 'message': '密钥验证成功'})
except VolcengineAPIError as e:
return Response({'status': 'error', 'message': str(e)},
status=status.HTTP_400_BAD_REQUEST)
# ==================== IAM Users ====================
@api_view(['GET'])
def iam_user_list_view(request):
users = IAMUser.objects.select_related('volc_account').all()
status_filter = request.query_params.get('status')
if status_filter:
users = users.filter(status=status_filter)
return Response(IAMUserSerializer(users, many=True).data)
@api_view(['POST'])
def iam_user_sync_view(request):
"""从火山引擎同步所有已有 IAM 用户"""
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
imported = []
offset = 0
while True:
try:
resp = svc.list_users(limit=100, offset=offset)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
users = resp.get("Result", {}).get("UserMetadata", [])
if not users:
break
for u in users:
username = u.get("UserName", "")
obj, created = IAMUser.objects.update_or_create(
volc_account=account,
username=username,
defaults={
'display_name': u.get("DisplayName", ""),
'user_id': u.get("UserId", ""),
'email': u.get("Email", ""),
'phone': u.get("MobilePhone", ""),
},
)
if created:
imported.append(username)
# Sync access keys
try:
keys = svc.list_access_keys(username)
obj.access_key_ids = [k["AccessKeyId"] for k in keys]
except Exception:
pass
# Sync login status
try:
profile = svc.get_login_profile(username)
login_allowed = profile.get("Result", {}).get("LoginProfile", {}).get("LoginAllowed", True)
obj.status = IAMUser.Status.ACTIVE if login_allowed else IAMUser.Status.DISABLED
except Exception:
obj.status = IAMUser.Status.UNKNOWN
obj.save()
offset += 100
total = resp.get("Result", {}).get("Total", 0)
if offset >= total:
break
total_count = IAMUser.objects.filter(volc_account=account).count()
return Response({
'message': f'同步完成,共 {total_count} 个用户,新导入 {len(imported)}',
'imported': imported,
'total': total_count,
})
@api_view(['POST'])
def iam_user_create_view(request):
"""在火山引擎创建新的 IAM 子账号并纳入管理"""
serializer = IAMUserCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
# 1. Create user on Volcengine
try:
resp = svc.create_user(
username=d['username'],
display_name=d.get('display_name', ''),
email=d.get('email', ''),
phone=d.get('phone', ''),
)
except VolcengineAPIError as e:
if 'UserAlreadyExists' in e.code or 'EntityAlreadyExists' in e.code:
return Response({'error': 'user_exists',
'message': f"火山引擎上已存在用户 {d['username']},请使用「同步已有用户」导入"},
status=status.HTTP_409_CONFLICT)
return Response({'error': 'create_failed', 'message': f'创建用户失败: {e}'},
status=status.HTTP_502_BAD_GATEWAY)
volc_user = resp.get("Result", {}).get("User", {})
result_info = {'username': d['username']}
# 2. Create login profile if password provided
password = d.get('password', '')
if password:
try:
svc.create_login_profile(d['username'], password)
result_info['login_enabled'] = True
except VolcengineAPIError as e:
result_info['login_error'] = str(e)
# 3. Create access key
try:
ak_resp = svc.create_access_key(d['username'])
ak_data = ak_resp.get("Result", {}).get("AccessKey", {})
result_info['access_key_id'] = ak_data.get("AccessKeyId", "")
result_info['secret_access_key'] = ak_data.get("SecretAccessKey", "")
result_info['secret_key_warning'] = "SecretAccessKey 仅此一次显示,请立即保存!"
except VolcengineAPIError as e:
result_info['access_key_error'] = str(e)
# 4. Attach basic policies
for policy in ['AccessKeySelfManageAccess']:
try:
svc.attach_user_policy(d['username'], policy, 'System')
except VolcengineAPIError:
pass
# 5. Save to local DB
obj = IAMUser.objects.create(
volc_account=account,
username=d['username'],
display_name=d.get('display_name', ''),
user_id=volc_user.get("UserId", ""),
email=d.get('email', ''),
phone=d.get('phone', ''),
status=IAMUser.Status.ACTIVE,
access_key_ids=[result_info.get('access_key_id', '')] if result_info.get('access_key_id') else [],
)
# 6. Auto-add project if specified
project_name = d.get('project_name', '')
if project_name:
IAMUserProject.objects.create(
iam_user=obj,
project_name=project_name,
monitor_enabled=True,
)
AlertRecord.objects.create(
iam_user=obj,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"创建子账号 {d['username']}",
content=f"操作人: {request.user.username}",
)
return Response({
'message': f"子账号 {d['username']} 创建成功",
'user': IAMUserSerializer(obj).data,
'volcengine': result_info,
}, status=status.HTTP_201_CREATED)
@api_view(['POST'])
def iam_user_import_view(request):
"""导入指定的已有 IAM 用户"""
serializer = IAMUserImportSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
username = serializer.validated_data['username']
account, ak, sk = _get_volc_account()
if not account:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
resp = svc.get_user(username)
except VolcengineAPIError as e:
return Response({'error': 'user_not_found', 'message': f'火山引擎未找到用户: {e}'},
status=status.HTTP_404_NOT_FOUND)
u = resp.get("Result", {}).get("User", {})
obj, created = IAMUser.objects.update_or_create(
volc_account=account,
username=username,
defaults={
'display_name': u.get("DisplayName", ""),
'user_id': u.get("UserId", ""),
'email': u.get("Email", ""),
'phone': u.get("MobilePhone", ""),
},
)
return Response({
'message': '导入成功' if created else '用户已存在,已更新信息',
'user': IAMUserSerializer(obj).data,
})
@api_view(['GET'])
def iam_user_detail_view(request, pk):
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
return Response(IAMUserSerializer(user).data)
@api_view(['PUT'])
def iam_user_update_view(request, pk):
"""更新子账号的本地配置(阈值、开关等)"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserConfigSerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
for field, value in serializer.validated_data.items():
setattr(user, field, value)
user.save()
return Response(IAMUserSerializer(user).data)
@api_view(['POST'])
def iam_user_disable_view(request, pk):
"""停用子账号"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
# 1. 停用控制台 + API 密钥
svc.disable_user(user.username)
# 2. 移除所有权限策略并保存快照(恢复时加回)
saved_policies = []
detach_errors = []
try:
resp = svc.list_attached_user_policies(user.username)
policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
for p in policies:
pname = p.get("PolicyName", "")
ptype = p.get("PolicyType", "")
try:
svc.detach_user_policy(user.username, pname, ptype)
saved_policies.append({"name": pname, "type": ptype})
except VolcengineAPIError as detach_err:
detach_errors.append(f"{pname}: {detach_err}")
except VolcengineAPIError:
pass
user.status = IAMUser.Status.DISABLED
user.saved_policies_on_disable = saved_policies
user.save(update_fields=['status', 'saved_policies_on_disable'])
policy_count = len(saved_policies)
error_info = f",移除失败: {detach_errors}" if detach_errors else ""
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"手动停用子账号 {user.username}",
content=f"操作人: {request.user.username},已移除 {policy_count} 个权限策略{error_info}",
)
msg = f'用户 {user.username} 已停用,{policy_count} 个权限策略已移除'
result = {'message': msg}
if detach_errors:
result['warnings'] = detach_errors
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_enable_view(request, pk):
"""恢复子账号"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
# 1. 恢复控制台 + API 密钥
svc.enable_user(user.username)
# 2. 重新附加停用时保存的策略
restored_count = 0
restore_errors = []
saved_policies = user.saved_policies_on_disable or []
for p in saved_policies:
try:
svc.attach_user_policy(user.username, p["name"], p["type"])
restored_count += 1
except VolcengineAPIError as restore_err:
restore_errors.append(f"{p['name']}: {restore_err}")
user.status = IAMUser.Status.ACTIVE
user.saved_policies_on_disable = []
user.save(update_fields=['status', 'saved_policies_on_disable'])
error_info = f",恢复失败: {restore_errors}" if restore_errors else ""
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"手动恢复子账号 {user.username}",
content=f"操作人: {request.user.username},已恢复 {restored_count} 个权限策略{error_info}",
)
msg = f'用户 {user.username} 已恢复,{restored_count} 个权限策略已恢复'
result = {'message': msg}
if restore_errors:
result['warnings'] = restore_errors
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['GET'])
def iam_user_policies_view(request, pk):
"""查看子账号的权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
resp = svc.list_attached_user_policies(user.username)
policies = resp.get("Result", {}).get("AttachedPolicyMetadata", [])
return Response({'policies': policies})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_attach_policy_view(request, pk):
"""给子账号附加权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
policy_name = request.data.get('policy_name', '')
policy_type = request.data.get('policy_type', 'System')
if not policy_name:
return Response({'error': 'missing_policy_name', 'message': '请指定策略名'},
status=status.HTTP_400_BAD_REQUEST)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
svc.attach_user_policy(user.username, policy_name, policy_type)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"附加策略 {policy_name}{user.username}",
content=f"操作人: {request.user.username},策略类型: {policy_type}",
)
return Response({'message': f'已附加策略 {policy_name}'})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
@api_view(['POST'])
def iam_user_detach_policy_view(request, pk):
"""从子账号移除权限策略"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
policy_name = request.data.get('policy_name', '')
policy_type = request.data.get('policy_type', 'System')
if not policy_name:
return Response({'error': 'missing_policy_name', 'message': '请指定策略名'},
status=status.HTTP_400_BAD_REQUEST)
account, ak, sk = _get_volc_account(user.volc_account_id)
if not ak:
return Response({'error': 'no_credentials'}, status=status.HTTP_400_BAD_REQUEST)
svc = IAMService(ak, sk)
try:
svc.detach_user_policy(user.username, policy_name, policy_type)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"移除策略 {policy_name}{user.username}",
content=f"操作人: {request.user.username},策略类型: {policy_type}",
)
return Response({'message': f'已移除策略 {policy_name}'})
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
# ==================== IAM User Projects ====================
@api_view(['GET'])
def iam_user_project_list_view(request, pk):
"""查看子账号关联的项目列表"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
projects = user.projects.all()
return Response(IAMUserProjectSerializer(projects, many=True).data)
@api_view(['POST'])
def iam_user_project_add_view(request, pk):
"""给子账号添加关联项目:加入监测 + 自动在项目范围内授权"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserProjectAddSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
obj, created = IAMUserProject.objects.get_or_create(
iam_user=user,
project_name=d['project_name'],
defaults={
'display_name': d.get('display_name', ''),
'monitor_enabled': d.get('monitor_enabled', True),
},
)
if not created:
return Response({'error': 'duplicate', 'message': f'项目 {d["project_name"]} 已关联'},
status=status.HTTP_409_CONFLICT)
# 在项目范围内授权前端指定的策略(如果传入了)
account, ak, sk = _get_volc_account(user.volc_account_id)
attached = []
auth_errors = []
policies_to_attach = d.get('policies', [])
if ak and policies_to_attach:
svc = IAMService(ak, sk)
for policy_name in policies_to_attach:
try:
svc.attach_policy_in_project(user.username, policy_name,
d['project_name'])
attached.append(policy_name)
except VolcengineAPIError as e:
auth_errors.append(f"{policy_name}: {e}")
obj.attached_policies = attached
obj.save(update_fields=['attached_policies'])
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"添加项目 {d['project_name']}{user.username}",
content=f"操作人: {request.user.username},已授权策略: {attached}"
+ (f",授权失败: {auth_errors}" if auth_errors else ""),
)
result = {
'message': f'已关联项目 {d["project_name"]}',
'project': IAMUserProjectSerializer(obj).data,
'attached_policies': attached,
}
if auth_errors:
result['auth_errors'] = auth_errors
return Response(result, status=status.HTTP_201_CREATED)
@api_view(['PUT'])
def iam_user_project_update_view(request, pk, pid):
"""更新项目监测开关"""
try:
project = IAMUserProject.objects.get(pk=pid, iam_user_id=pk)
except IAMUserProject.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = IAMUserProjectUpdateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
project.monitor_enabled = serializer.validated_data['monitor_enabled']
project.save(update_fields=['monitor_enabled'])
return Response(IAMUserProjectSerializer(project).data)
@api_view(['DELETE'])
def iam_user_project_delete_view(request, pk, pid):
"""移除关联项目:回收权限 + 移出监测"""
try:
project = IAMUserProject.objects.get(pk=pid, iam_user_id=pk)
user = project.iam_user
except IAMUserProject.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
name = project.project_name
detached = []
detach_errors = []
# 回收之前自动授权的策略
account, ak, sk = _get_volc_account(user.volc_account_id)
if ak and project.attached_policies:
svc = IAMService(ak, sk)
for policy_name in project.attached_policies:
try:
svc.detach_policy_in_project(user.username, policy_name, name)
detached.append(policy_name)
except VolcengineAPIError as e:
detach_errors.append(f"{policy_name}: {e}")
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"移除项目 {name}{user.username}",
content=f"操作人: {request.user.username},已回收策略: {detached}"
+ (f",回收失败: {detach_errors}" if detach_errors else ""),
)
project.delete()
result = {'message': f'已移除项目 {name},已回收权限: {detached}'}
if detach_errors:
result['detach_errors'] = detach_errors
return Response(result)
@api_view(['POST'])
def iam_user_project_toggle_all_view(request, pk):
"""批量切换所有项目的监测开关"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
enable = request.data.get('monitor_enabled', True)
count = user.projects.update(monitor_enabled=enable)
label = '开启' if enable else '关闭'
return Response({'message': f'{label}全部 {count} 个项目的监测'})
# ==================== Quota Allocation ====================
@api_view(['POST'])
def quota_allocate_view(request, pk):
"""额度变更:正数=追加,负数=扣减"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
serializer = QuotaAllocateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
amount = serializer.validated_data['amount']
note = serializer.validated_data.get('note', '')
new_total = user.allocated_quota + amount
# 扣减保护:总额度不能低于已消费金额
if new_total < user.consumed_total:
return Response({
'error': 'quota_underflow',
'message': (
f'扣减后总额度 ¥{new_total:.2f} 低于已消费 ¥{user.consumed_total:.2f}'
f'最多可扣减 ¥{user.allocated_quota - user.consumed_total:.2f}'
),
}, status=status.HTTP_400_BAD_REQUEST)
user.allocated_quota = new_total
user.triggered_alerts = [] # 额度变更时重置告警状态
user.save(update_fields=['allocated_quota', 'triggered_alerts'])
is_deduct = amount < 0
action_label = f"扣减额度 ¥{abs(amount)}" if is_deduct else f"追加额度 ¥{amount}"
allocation = QuotaAllocation.objects.create(
iam_user=user,
amount=amount,
total_after=user.allocated_quota,
note=note,
created_by=request.user.username,
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"{action_label}{user.username}",
content=f"操作人: {request.user.username},变更后总额度: ¥{user.allocated_quota},备注: {note}",
)
return Response({
'message': f'{action_label},当前总额度 ¥{user.allocated_quota}',
'user': IAMUserSerializer(user).data,
'allocation': QuotaAllocationSerializer(allocation).data,
})
@api_view(['GET'])
def quota_history_view(request, pk):
"""查看子账号的额度划拨历史"""
try:
user = IAMUser.objects.get(pk=pk)
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
allocations = QuotaAllocation.objects.filter(iam_user=user)
return Response(QuotaAllocationSerializer(allocations, many=True).data)
# ==================== Billing ====================
@api_view(['GET'])
def spending_overview_view(request):
"""消费总览"""
bill_period = request.query_params.get('period', datetime.now().strftime("%Y-%m"))
users = IAMUser.objects.all().order_by('-consumed_total')
return Response({
'period': bill_period,
'users': IAMUserSerializer(users, many=True).data,
})
@api_view(['POST'])
def spending_refresh_view(request):
"""手动刷新消费数据"""
from utils.scheduler import check_spending
try:
check_spending()
return Response({'message': '消费数据刷新完成'})
except Exception as e:
return Response({'error': 'refresh_failed', 'message': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET'])
def balance_view(request):
"""查询主账号余额"""
account, ak, sk = _get_volc_account()
if not ak:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
try:
billing = BillingService(ak, sk)
result = billing.get_balance()
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)
# ==================== Global Config ====================
@api_view(['GET', 'PUT'])
def global_config_view(request):
config = GlobalConfig.get_solo()
if request.method == 'GET':
return Response(GlobalConfigSerializer(config).data)
serializer = GlobalConfigSerializer(config, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
# ==================== Alerts ====================
@api_view(['GET'])
def alert_list_view(request):
alerts = AlertRecord.objects.select_related('iam_user').all()
alert_type = request.query_params.get('type')
if alert_type:
alerts = alerts.filter(alert_type=alert_type)
limit = int(request.query_params.get('limit', 50))
return Response(AlertRecordSerializer(alerts[:limit], many=True).data)
# ==================== Projects ====================
@api_view(['GET'])
def project_list_view(request):
"""从火山引擎拉取项目列表"""
account, ak, sk = _get_volc_account()
if not ak:
return Response({'error': 'no_account', 'message': '请先配置火山主账号'},
status=status.HTTP_400_BAD_REQUEST)
try:
svc = ProjectService(ak, sk)
projects = svc.list_projects()
result = [
{
'name': p.get('ProjectName', ''),
'display_name': p.get('DisplayName', p.get('ProjectName', '')),
'description': p.get('Description', ''),
}
for p in projects
]
return Response(result)
except VolcengineAPIError as e:
return Response({'error': 'api_error', 'message': str(e)},
status=status.HTTP_502_BAD_GATEWAY)