from django.contrib.auth import authenticate from rest_framework import status from rest_framework.decorators import api_view, permission_classes, authentication_classes from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.response import Response from rest_framework_simplejwt.tokens import RefreshToken from .models import AdminUser from .serializers import ( LoginSerializer, UserInfoSerializer, ChangePasswordSerializer, AdminUserCreateSerializer, ) @api_view(['POST']) @permission_classes([AllowAny]) def login_view(request): serializer = LoginSerializer(data=request.data) serializer.is_valid(raise_exception=True) user = authenticate( username=serializer.validated_data['username'], password=serializer.validated_data['password'], ) if not user: return Response( {'error': 'invalid_credentials', 'message': '用户名或密码错误'}, status=status.HTTP_401_UNAUTHORIZED, ) if not user.is_active: return Response( {'error': 'user_disabled', 'message': '账号已停用'}, status=status.HTTP_403_FORBIDDEN, ) refresh = RefreshToken.for_user(user) return Response({ 'access': str(refresh.access_token), 'refresh': str(refresh), 'user': UserInfoSerializer(user).data, }) @api_view(['POST']) def refresh_view(request): token = request.data.get('refresh') if not token: return Response( {'error': 'missing_token', 'message': '缺少 refresh token'}, status=status.HTTP_400_BAD_REQUEST, ) try: refresh = RefreshToken(token) return Response({'access': str(refresh.access_token)}) except Exception: return Response( {'error': 'invalid_token', 'message': 'token 无效或已过期'}, status=status.HTTP_401_UNAUTHORIZED, ) @api_view(['GET']) def me_view(request): return Response(UserInfoSerializer(request.user).data) @api_view(['POST']) def change_password_view(request): """修改当前用户密码""" serializer = ChangePasswordSerializer(data=request.data) serializer.is_valid(raise_exception=True) if not request.user.check_password(serializer.validated_data['old_password']): return Response({'error': 'wrong_password', 'message': '原密码错误'}, status=status.HTTP_400_BAD_REQUEST) request.user.set_password(serializer.validated_data['new_password']) request.user.save() # Log operation from apps.monitor.models import AlertRecord AlertRecord.objects.create( alert_type=AlertRecord.AlertType.MANUAL, title=f"管理员 {request.user.username} 修改密码", content=f"操作人: {request.user.username}", ) return Response({'message': '密码修改成功,请重新登录'}) # ==================== Admin User Management ==================== @api_view(['GET']) def admin_list_view(request): """列出所有管理员""" if not request.user.is_superuser: return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'}, status=status.HTTP_403_FORBIDDEN) users = AdminUser.objects.all().order_by('id') return Response(UserInfoSerializer(users, many=True).data) @api_view(['POST']) def admin_create_view(request): """创建管理员账号""" if not request.user.is_superuser: return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'}, status=status.HTTP_403_FORBIDDEN) serializer = AdminUserCreateSerializer(data=request.data) serializer.is_valid(raise_exception=True) d = serializer.validated_data if AdminUser.objects.filter(username=d['username']).exists(): return Response({'error': 'user_exists', 'message': f'用户名 {d["username"]} 已存在'}, status=status.HTTP_409_CONFLICT) user = AdminUser.objects.create_user( username=d['username'], password=d['password'], is_superuser=d.get('is_superuser', False), is_staff=True, ) from apps.monitor.models import AlertRecord AlertRecord.objects.create( alert_type=AlertRecord.AlertType.MANUAL, title=f"创建管理员 {d['username']}", content=f"操作人: {request.user.username},超级管理员: {'是' if d.get('is_superuser') else '否'}", ) return Response({ 'message': f'管理员 {d["username"]} 创建成功', 'user': UserInfoSerializer(user).data, }, status=status.HTTP_201_CREATED) @api_view(['POST']) def admin_toggle_view(request, pk): """启用/停用管理员""" if not request.user.is_superuser: return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'}, status=status.HTTP_403_FORBIDDEN) try: user = AdminUser.objects.get(pk=pk) except AdminUser.DoesNotExist: return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND) if user.pk == request.user.pk: return Response({'error': 'self_toggle', 'message': '不能停用自己'}, status=status.HTTP_400_BAD_REQUEST) user.is_active = not user.is_active user.save(update_fields=['is_active']) action = '启用' if user.is_active else '停用' from apps.monitor.models import AlertRecord AlertRecord.objects.create( alert_type=AlertRecord.AlertType.MANUAL, title=f"{action}管理员 {user.username}", content=f"操作人: {request.user.username}", ) return Response({'message': f'已{action}管理员 {user.username}', 'user': UserInfoSerializer(user).data}) @api_view(['POST']) def admin_reset_password_view(request, pk): """超管重置其他管理员密码""" if not request.user.is_superuser: return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'}, status=status.HTTP_403_FORBIDDEN) try: user = AdminUser.objects.get(pk=pk) except AdminUser.DoesNotExist: return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND) new_password = request.data.get('new_password', '') if len(new_password) < 6: return Response({'error': 'weak_password', 'message': '密码至少6位'}, status=status.HTTP_400_BAD_REQUEST) user.set_password(new_password) user.save() from apps.monitor.models import AlertRecord AlertRecord.objects.create( alert_type=AlertRecord.AlertType.MANUAL, title=f"重置管理员 {user.username} 密码", content=f"操作人: {request.user.username}", ) return Response({'message': f'已重置 {user.username} 的密码'}) # ==================== Sub-account (IAM User) Login ==================== @api_view(['POST']) @permission_classes([AllowAny]) def iam_login_view(request): """子账号登录 AirGate""" username = request.data.get('username', '') password = request.data.get('password', '') if not username or not password: return Response({'error': 'missing', 'message': '请输入用户名和密码'}, status=status.HTTP_400_BAD_REQUEST) from apps.monitor.models import IAMUser try: iam_user = IAMUser.objects.get(username=username) except IAMUser.DoesNotExist: return Response({'error': 'invalid_credentials', 'message': '用户名或密码错误'}, status=status.HTTP_401_UNAUTHORIZED) if not iam_user.login_enabled: return Response({'error': 'login_disabled', 'message': '此账号未开通 AirGate 登录'}, status=status.HTTP_403_FORBIDDEN) if iam_user.status != IAMUser.Status.ACTIVE: return Response({'error': 'user_disabled', 'message': '账号已停用'}, status=status.HTTP_403_FORBIDDEN) if not iam_user.check_login_password(password): return Response({'error': 'invalid_credentials', 'message': '用户名或密码错误'}, status=status.HTTP_401_UNAUTHORIZED) # Generate JWT token with iam_user info (use a dummy admin user for simplejwt) import jwt from django.conf import settings from datetime import datetime, timedelta, timezone payload = { 'token_type': 'access', 'iam_user_id': iam_user.id, 'username': iam_user.username, 'role': 'iam_user', 'exp': datetime.now(timezone.utc) + timedelta(hours=24), 'iat': datetime.now(timezone.utc), } token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256') return Response({ 'access': token, 'user': { 'id': iam_user.id, 'username': iam_user.username, 'display_name': iam_user.display_name, 'role': 'iam_user', } }) @api_view(['GET']) @authentication_classes([]) @permission_classes([AllowAny]) def iam_me_view(request): """子账号获取自身信息(通过 JWT token 中的 iam_user_id)""" import jwt from django.conf import settings auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED) token = auth_header.split(' ', 1)[1] try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) except jwt.ExpiredSignatureError: return Response({'error': 'token_expired'}, status=status.HTTP_401_UNAUTHORIZED) except jwt.InvalidTokenError: return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED) if payload.get('role') != 'iam_user': return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN) from apps.monitor.models import IAMUser try: iam_user = IAMUser.objects.get(pk=payload['iam_user_id']) except IAMUser.DoesNotExist: return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND) from apps.monitor.serializers import IAMUserSerializer return Response({ 'role': 'iam_user', 'user': IAMUserSerializer(iam_user).data, }) @api_view(['GET']) @authentication_classes([]) @permission_classes([AllowAny]) def iam_my_keys_view(request): """子账号查看自己的 API Key""" import jwt from django.conf import settings auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED) token = auth_header.split(' ', 1)[1] try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED) if payload.get('role') != 'iam_user': return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN) from apps.monitor.models import ArkApiKey from apps.monitor.serializers import ArkApiKeySerializer keys = ArkApiKey.objects.filter(iam_user_id=payload['iam_user_id']) return Response(ArkApiKeySerializer(keys, many=True).data) @api_view(['GET']) @authentication_classes([]) @permission_classes([AllowAny]) def iam_my_key_reveal_view(request, pk): """子账号查看自己的 API Key 明文""" import jwt from django.conf import settings auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED) token = auth_header.split(' ', 1)[1] try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED) if payload.get('role') != 'iam_user': return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN) from apps.monitor.models import ArkApiKey from utils.crypto import decrypt try: key = ArkApiKey.objects.get(pk=pk, iam_user_id=payload['iam_user_id']) except ArkApiKey.DoesNotExist: return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND) return Response({ 'api_key': decrypt(key.api_key_enc), 'key_name': key.key_name, 'project_name': key.project_name, }) @api_view(['POST']) @authentication_classes([]) @permission_classes([AllowAny]) def iam_change_password_view(request): """子账号修改自己的 AirGate 登录密码""" import jwt from django.conf import settings auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED) token = auth_header.split(' ', 1)[1] try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED) if payload.get('role') != 'iam_user': return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN) from apps.monitor.models import IAMUser try: iam_user = IAMUser.objects.get(pk=payload['iam_user_id']) except IAMUser.DoesNotExist: return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND) old_password = request.data.get('old_password', '') new_password = request.data.get('new_password', '') if not old_password or not new_password: return Response({'error': 'missing', 'message': '请输入原密码和新密码'}, status=status.HTTP_400_BAD_REQUEST) if not iam_user.check_login_password(old_password): return Response({'error': 'wrong_password', 'message': '原密码错误'}, status=status.HTTP_400_BAD_REQUEST) if len(new_password) < 6: return Response({'error': 'weak_password', 'message': '密码至少6位'}, status=status.HTTP_400_BAD_REQUEST) iam_user.set_login_password(new_password) iam_user.save(update_fields=['login_password_hash']) from apps.monitor.models import AlertRecord AlertRecord.objects.create( iam_user=iam_user, alert_type=AlertRecord.AlertType.MANUAL, title=f"子账号 {iam_user.username} 修改 AirGate 密码", content=f"操作人: {iam_user.username}(自行修改)", ) return Response({'message': '密码修改成功,请重新登录'})