2026-01-29 10:41:26 +08:00

213 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
用户模块视图 - App端用户
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework_simplejwt.tokens import RefreshToken
from django.db.models import Count
from utils.response import success, error
from apps.admins.authentication import AppJWTAuthentication, AdminJWTAuthentication
from apps.admins.permissions import IsAdminUser
from .models import User
from .serializers import (
UserSerializer,
UserDetailSerializer,
PhoneLoginSerializer,
UpdateUserSerializer
)
def get_app_tokens(user):
"""
为App用户生成JWT Token
在token中添加 user_type='app' 以区分管理员
"""
refresh = RefreshToken.for_user(user)
# 添加自定义声明
refresh['user_type'] = 'app'
refresh['phone'] = user.phone
return {
'access': str(refresh.access_token),
'refresh': str(refresh),
}
class AuthViewSet(viewsets.ViewSet):
"""认证视图集 - App端"""
permission_classes = [AllowAny]
@action(detail=False, methods=['post'], url_path='phone-login')
def phone_login(self, request):
"""
手机号一键登录
POST /api/v1/auth/phone-login
"""
serializer = PhoneLoginSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
phone = serializer.validated_data['phone']
# 获取或创建用户
user, created = User.objects.get_or_create(
phone=phone,
defaults={'nickname': f'用户{phone[-4:]}'}
)
if not user.is_active:
return error(code=101, message='账号已被禁用')
# 生成JWT Token带user_type='app'标识)
tokens = get_app_tokens(user)
return success(data={
'user': UserSerializer(user).data,
'token': tokens,
'is_new_user': created
}, message='登录成功')
@action(detail=False, methods=['post'], url_path='refresh')
def refresh_token(self, request):
"""
刷新Token
POST /api/v1/auth/refresh
"""
refresh_token = request.data.get('refresh')
if not refresh_token:
return error(message='refresh token不能为空')
try:
refresh = RefreshToken(refresh_token)
# 验证是否为app token
user_type = refresh.get('user_type', 'app')
if user_type not in ['app', None]:
return error(code=103, message='无效的用户Token')
return success(data={
'access': str(refresh.access_token),
'refresh': str(refresh),
})
except Exception as e:
return error(code=103, message='Token已过期或无效')
class UserViewSet(viewsets.ViewSet):
"""用户视图集 - App端"""
authentication_classes = [AppJWTAuthentication]
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['get'])
def me(self, request):
"""
获取当前用户信息
GET /api/v1/users/me
"""
return success(data=UserSerializer(request.user).data)
@action(detail=False, methods=['put'])
def update_me(self, request):
"""
更新当前用户信息
PUT /api/v1/users/update_me
"""
serializer = UpdateUserSerializer(request.user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return success(data=UserSerializer(request.user).data, message='更新成功')
return error(message=str(serializer.errors))
class AdminUserManageViewSet(viewsets.ViewSet):
"""App用户管理视图集 - 管理端"""
authentication_classes = [AdminJWTAuthentication]
permission_classes = [IsAdminUser]
def list(self, request):
"""
用户列表
GET /api/admin/users
"""
queryset = User.objects.all().order_by('-created_at')
# 搜索条件
phone = request.query_params.get('phone')
nickname = request.query_params.get('nickname')
is_active = request.query_params.get('is_active')
if phone:
queryset = queryset.filter(phone__contains=phone)
if nickname:
queryset = queryset.filter(nickname__contains=nickname)
if is_active is not None:
queryset = queryset.filter(is_active=(is_active.lower() == 'true'))
# 标注统计数据
queryset = queryset.annotate(
spirit_count=Count('spirits', distinct=True),
device_count=Count('user_devices', distinct=True)
)
# 分页
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
start = (page - 1) * page_size
end = start + page_size
total = queryset.count()
users = queryset[start:end]
# 手动构造带统计的数据
items = []
for user in users:
data = UserSerializer(user).data
data['spirit_count'] = getattr(user, 'spirit_count', 0)
data['device_count'] = getattr(user, 'device_count', 0)
items.append(data)
return success(data={
'total': total,
'items': items
})
def retrieve(self, request, pk=None):
"""
用户详情
GET /api/admin/users/{id}
"""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return error(message='用户不存在')
data = UserDetailSerializer(user).data
data['spirit_count'] = user.spirits.count()
data['device_count'] = user.user_devices.count()
return success(data=data)
@action(detail=True, methods=['post'], url_path='toggle-status')
def toggle_status(self, request, pk=None):
"""
启用/禁用用户
POST /api/admin/users/{id}/toggle-status
"""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return error(message='用户不存在')
user.is_active = not user.is_active
user.save()
return success(
data=UserSerializer(user).data,
message='已启用' if user.is_active else '已禁用'
)