seaislee1209 cbc19a6d9e feat: add admin management, change password, and operation log
- Change password: current user can change their own password
- Admin management: superuser can create/toggle/reset-password for admins
- Operation log: view all system operations with type filter
- All operations are recorded to AlertRecord for audit trail

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 18:20:14 +08:00

198 lines
6.7 KiB
Python

from django.contrib.auth import authenticate
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from .models import AdminUser
from .serializers import (
LoginSerializer, UserInfoSerializer,
ChangePasswordSerializer, AdminUserCreateSerializer,
)
@api_view(['POST'])
@permission_classes([AllowAny])
def login_view(request):
serializer = LoginSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = authenticate(
username=serializer.validated_data['username'],
password=serializer.validated_data['password'],
)
if not user:
return Response(
{'error': 'invalid_credentials', 'message': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED,
)
if not user.is_active:
return Response(
{'error': 'user_disabled', 'message': '账号已停用'},
status=status.HTTP_403_FORBIDDEN,
)
refresh = RefreshToken.for_user(user)
return Response({
'access': str(refresh.access_token),
'refresh': str(refresh),
'user': UserInfoSerializer(user).data,
})
@api_view(['POST'])
def refresh_view(request):
token = request.data.get('refresh')
if not token:
return Response(
{'error': 'missing_token', 'message': '缺少 refresh token'},
status=status.HTTP_400_BAD_REQUEST,
)
try:
refresh = RefreshToken(token)
return Response({'access': str(refresh.access_token)})
except Exception:
return Response(
{'error': 'invalid_token', 'message': 'token 无效或已过期'},
status=status.HTTP_401_UNAUTHORIZED,
)
@api_view(['GET'])
def me_view(request):
return Response(UserInfoSerializer(request.user).data)
@api_view(['POST'])
def change_password_view(request):
"""修改当前用户密码"""
serializer = ChangePasswordSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
if not request.user.check_password(serializer.validated_data['old_password']):
return Response({'error': 'wrong_password', 'message': '原密码错误'},
status=status.HTTP_400_BAD_REQUEST)
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
# Log operation
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"管理员 {request.user.username} 修改密码",
content=f"操作人: {request.user.username}",
)
return Response({'message': '密码修改成功,请重新登录'})
# ==================== Admin User Management ====================
@api_view(['GET'])
def admin_list_view(request):
"""列出所有管理员"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
users = AdminUser.objects.all().order_by('id')
return Response(UserInfoSerializer(users, many=True).data)
@api_view(['POST'])
def admin_create_view(request):
"""创建管理员账号"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
serializer = AdminUserCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
d = serializer.validated_data
if AdminUser.objects.filter(username=d['username']).exists():
return Response({'error': 'user_exists', 'message': f'用户名 {d["username"]} 已存在'},
status=status.HTTP_409_CONFLICT)
user = AdminUser.objects.create_user(
username=d['username'],
password=d['password'],
is_superuser=d.get('is_superuser', False),
is_staff=True,
)
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"创建管理员 {d['username']}",
content=f"操作人: {request.user.username},超级管理员: {'' if d.get('is_superuser') else ''}",
)
return Response({
'message': f'管理员 {d["username"]} 创建成功',
'user': UserInfoSerializer(user).data,
}, status=status.HTTP_201_CREATED)
@api_view(['POST'])
def admin_toggle_view(request, pk):
"""启用/停用管理员"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
try:
user = AdminUser.objects.get(pk=pk)
except AdminUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
if user.pk == request.user.pk:
return Response({'error': 'self_toggle', 'message': '不能停用自己'},
status=status.HTTP_400_BAD_REQUEST)
user.is_active = not user.is_active
user.save(update_fields=['is_active'])
action = '启用' if user.is_active else '停用'
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"{action}管理员 {user.username}",
content=f"操作人: {request.user.username}",
)
return Response({'message': f'{action}管理员 {user.username}',
'user': UserInfoSerializer(user).data})
@api_view(['POST'])
def admin_reset_password_view(request, pk):
"""超管重置其他管理员密码"""
if not request.user.is_superuser:
return Response({'error': 'forbidden', 'message': '仅超级管理员可操作'},
status=status.HTTP_403_FORBIDDEN)
try:
user = AdminUser.objects.get(pk=pk)
except AdminUser.DoesNotExist:
return Response({'error': 'not_found'}, status=status.HTTP_404_NOT_FOUND)
new_password = request.data.get('new_password', '')
if len(new_password) < 6:
return Response({'error': 'weak_password', 'message': '密码至少6位'},
status=status.HTTP_400_BAD_REQUEST)
user.set_password(new_password)
user.save()
from apps.monitor.models import AlertRecord
AlertRecord.objects.create(
alert_type=AlertRecord.AlertType.MANUAL,
title=f"重置管理员 {user.username} 密码",
content=f"操作人: {request.user.username}",
)
return Response({'message': f'已重置 {user.username} 的密码'})