seaislee1209 fac5e1b541 feat: password management for admin and sub-accounts
- Admin: set sub-account AirGate login password via dropdown menu
- Admin: toggle sub-account login enabled/disabled
- Sub-account: change own password (sidebar "修改密码")
- Sub-account: auto-redirect to login page after password change

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 15:54:35 +08:00

409 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.contrib.auth import authenticate
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes, authentication_classes
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from .models import AdminUser
from .serializers import (
LoginSerializer, UserInfoSerializer,
ChangePasswordSerializer, AdminUserCreateSerializer,
)
@api_view(['POST'])
@permission_classes([AllowAny])
def login_view(request):
serializer = LoginSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = authenticate(
username=serializer.validated_data['username'],
password=serializer.validated_data['password'],
)
if not user:
return Response(
{'error': 'invalid_credentials', 'message': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED,
)
if not user.is_active:
return Response(
{'error': 'user_disabled', 'message': '账号已停用'},
status=status.HTTP_403_FORBIDDEN,
)
refresh = RefreshToken.for_user(user)
return Response({
'access': str(refresh.access_token),
'refresh': str(refresh),
'user': UserInfoSerializer(user).data,
})
@api_view(['POST'])
def refresh_view(request):
token = request.data.get('refresh')
if not token:
return Response(
{'error': 'missing_token', 'message': '缺少 refresh token'},
status=status.HTTP_400_BAD_REQUEST,
)
try:
refresh = RefreshToken(token)
return Response({'access': str(refresh.access_token)})
except Exception:
return Response(
{'error': 'invalid_token', 'message': 'token 无效或已过期'},
status=status.HTTP_401_UNAUTHORIZED,
)
@api_view(['GET'])
def me_view(request):
return Response(UserInfoSerializer(request.user).data)
@api_view(['POST'])
def change_password_view(request):
"""修改当前用户密码"""
serializer = ChangePasswordSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
if not request.user.check_password(serializer.validated_data['old_password']):
return Response({'error': 'wrong_password', 'message': '原密码错误'},
status=status.HTTP_400_BAD_REQUEST)
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
# Log operation
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"管理员 {request.user.username} 修改密码",
content=f"操作人: {request.user.username}",
)
return Response({'message': '密码修改成功,请重新登录'})
# ==================== Admin User Management ====================
@api_view(['GET'])
def admin_list_view(request):
"""列出所有管理员"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
users = AdminUser.objects.all().order_by('id')
return Response(UserInfoSerializer(users, many=True).data)
@api_view(['POST'])
def admin_create_view(request):
"""创建管理员账号"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
serializer = AdminUserCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
if AdminUser.objects.filter(username=d['username']).exists():
return Response({'error': 'user_exists', 'message': f'用户名 {d["username"]} 已存在'},
status=status.HTTP_409_CONFLICT)
user = AdminUser.objects.create_user(
username=d['username'],
password=d['password'],
is_superuser=d.get('is_superuser', False),
is_staff=True,
)
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"创建管理员 {d['username']}",
content=f"操作人: {request.user.username},超级管理员: {'' if d.get('is_superuser') else ''}",
)
return Response({
'message': f'管理员 {d["username"]} 创建成功',
'user': UserInfoSerializer(user).data,
}, status=status.HTTP_201_CREATED)
@api_view(['POST'])
def admin_toggle_view(request, pk):
"""启用/停用管理员"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
try:
user = AdminUser.objects.get(pk=pk)
except AdminUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
if user.pk == request.user.pk:
return Response({'error': 'self_toggle', 'message': '不能停用自己'},
status=status.HTTP_400_BAD_REQUEST)
user.is_active = not user.is_active
user.save(update_fields=['is_active'])
action = '启用' if user.is_active else '停用'
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"{action}管理员 {user.username}",
content=f"操作人: {request.user.username}",
)
return Response({'message': f'{action}管理员 {user.username}',
'user': UserInfoSerializer(user).data})
@api_view(['POST'])
def admin_reset_password_view(request, pk):
"""超管重置其他管理员密码"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
try:
user = AdminUser.objects.get(pk=pk)
except AdminUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
new_password = request.data.get('new_password', '')
if len(new_password) < 6:
return Response({'error': 'weak_password', 'message': '密码至少6位'},
status=status.HTTP_400_BAD_REQUEST)
user.set_password(new_password)
user.save()
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"重置管理员 {user.username} 密码",
content=f"操作人: {request.user.username}",
)
return Response({'message': f'已重置 {user.username} 的密码'})
# ==================== Sub-account (IAM User) Login ====================
@api_view(['POST'])
@permission_classes([AllowAny])
def iam_login_view(request):
"""子账号登录 AirGate"""
username = request.data.get('username', '')
password = request.data.get('password', '')
if not username or not password:
return Response({'error': 'missing', 'message': '请输入用户名和密码'},
status=status.HTTP_400_BAD_REQUEST)
from apps.monitor.models import IAMUser
try:
iam_user = IAMUser.objects.get(username=username)
except IAMUser.DoesNotExist:
return Response({'error': 'invalid_credentials', 'message': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED)
if not iam_user.login_enabled:
return Response({'error': 'login_disabled', 'message': '此账号未开通 AirGate 登录'},
status=status.HTTP_403_FORBIDDEN)
if iam_user.status != IAMUser.Status.ACTIVE:
return Response({'error': 'user_disabled', 'message': '账号已停用'},
status=status.HTTP_403_FORBIDDEN)
if not iam_user.check_login_password(password):
return Response({'error': 'invalid_credentials', 'message': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED)
# Generate JWT token with iam_user info (use a dummy admin user for simplejwt)
import jwt
from django.conf import settings
from datetime import datetime, timedelta, timezone
payload = {
'token_type': 'access',
'iam_user_id': iam_user.id,
'username': iam_user.username,
'role': 'iam_user',
'exp': datetime.now(timezone.utc) + timedelta(hours=24),
'iat': datetime.now(timezone.utc),
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')
return Response({
'access': token,
'user': {
'id': iam_user.id,
'username': iam_user.username,
'display_name': iam_user.display_name,
'role': 'iam_user',
}
})
@api_view(['GET'])
@authentication_classes([])
@permission_classes([AllowAny])
def iam_me_view(request):
"""子账号获取自身信息(通过 JWT token 中的 iam_user_id"""
import jwt
from django.conf import settings
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED)
token = auth_header.split(' ', 1)[1]
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return Response({'error': 'token_expired'}, status=status.HTTP_401_UNAUTHORIZED)
except jwt.InvalidTokenError:
return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED)
if payload.get('role') != 'iam_user':
return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN)
from apps.monitor.models import IAMUser
try:
iam_user = IAMUser.objects.get(pk=payload['iam_user_id'])
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
from apps.monitor.serializers import IAMUserSerializer
return Response({
'role': 'iam_user',
'user': IAMUserSerializer(iam_user).data,
})
@api_view(['GET'])
@authentication_classes([])
@permission_classes([AllowAny])
def iam_my_keys_view(request):
"""子账号查看自己的 API Key"""
import jwt
from django.conf import settings
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED)
token = auth_header.split(' ', 1)[1]
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED)
if payload.get('role') != 'iam_user':
return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN)
from apps.monitor.models import ArkApiKey
from apps.monitor.serializers import ArkApiKeySerializer
keys = ArkApiKey.objects.filter(iam_user_id=payload['iam_user_id'])
return Response(ArkApiKeySerializer(keys, many=True).data)
@api_view(['GET'])
@authentication_classes([])
@permission_classes([AllowAny])
def iam_my_key_reveal_view(request, pk):
"""子账号查看自己的 API Key 明文"""
import jwt
from django.conf import settings
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED)
token = auth_header.split(' ', 1)[1]
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED)
if payload.get('role') != 'iam_user':
return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN)
from apps.monitor.models import ArkApiKey
from utils.crypto import decrypt
try:
key = ArkApiKey.objects.get(pk=pk, iam_user_id=payload['iam_user_id'])
except ArkApiKey.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
return Response({
'api_key': decrypt(key.api_key_enc),
'key_name': key.key_name,
'project_name': key.project_name,
})
@api_view(['POST'])
@authentication_classes([])
@permission_classes([AllowAny])
def iam_change_password_view(request):
"""子账号修改自己的 AirGate 登录密码"""
import jwt
from django.conf import settings
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return Response({'error': 'unauthorized'}, status=status.HTTP_401_UNAUTHORIZED)
token = auth_header.split(' ', 1)[1]
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return Response({'error': 'invalid_token'}, status=status.HTTP_401_UNAUTHORIZED)
if payload.get('role') != 'iam_user':
return Response({'error': 'not_iam_user'}, status=status.HTTP_403_FORBIDDEN)
from apps.monitor.models import IAMUser
try:
iam_user = IAMUser.objects.get(pk=payload['iam_user_id'])
except IAMUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
old_password = request.data.get('old_password', '')
new_password = request.data.get('new_password', '')
if not old_password or not new_password:
return Response({'error': 'missing', 'message': '请输入原密码和新密码'},
status=status.HTTP_400_BAD_REQUEST)
if not iam_user.check_login_password(old_password):
return Response({'error': 'wrong_password', 'message': '原密码错误'},
status=status.HTTP_400_BAD_REQUEST)
if len(new_password) < 6:
return Response({'error': 'weak_password', 'message': '密码至少6位'},
status=status.HTTP_400_BAD_REQUEST)
iam_user.set_login_password(new_password)
iam_user.save(update_fields=['login_password_hash'])
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
iam_user=iam_user,
alert_type=AlertRecord.AlertType.MANUAL,
title=f"子账号 {iam_user.username} 修改 AirGate 密码",
content=f"操作人: {iam_user.username}(自行修改)",
)
return Response({'message': '密码修改成功,请重新登录'})