repair-agent 88b8f023f4
Some checks failed
Build and Deploy Backend / build-and-deploy (push) Failing after 1m36s
Fix app api
2026-02-09 15:35:33 +08:00

339 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
用户模块视图 - App端用户
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework_simplejwt.tokens import RefreshToken
from django.db.models import Count
from drf_spectacular.utils import extend_schema
from django.utils import timezone
from utils.response import success, error
from utils.exceptions import ErrorCode
from utils.sms import send_sms_code, verify_sms_code
from utils.oss import get_oss_client
from apps.admins.authentication import AppJWTAuthentication, AdminJWTAuthentication
from apps.admins.permissions import IsAdminUser
from .models import User
from .serializers import (
UserSerializer,
UserDetailSerializer,
PhoneLoginSerializer,
UpdateUserSerializer,
SendCodeSerializer,
CodeLoginSerializer,
)
def get_app_tokens(user):
"""
为App用户生成JWT Token
在token中添加 user_type='app' 以区分管理员
"""
refresh = RefreshToken.for_user(user)
# 添加自定义声明
refresh['user_type'] = 'app'
refresh['phone'] = user.phone
return {
'access': str(refresh.access_token),
'refresh': str(refresh),
}
@extend_schema(tags=['认证'])
class AuthViewSet(viewsets.ViewSet):
"""认证视图集 - App端"""
permission_classes = [AllowAny]
@action(detail=False, methods=['post'], url_path='phone-login')
def phone_login(self, request):
"""
手机号一键登录
POST /api/v1/auth/phone-login
"""
serializer = PhoneLoginSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
phone = serializer.validated_data['phone']
# 获取或创建用户
user, created = User.objects.get_or_create(
phone=phone,
defaults={'nickname': f'用户{phone[-4:]}'}
)
if not user.is_active:
return error(code=101, message='账号已被禁用')
# 生成JWT Token带user_type='app'标识)
tokens = get_app_tokens(user)
return success(data={
'user': UserSerializer(user).data,
'token': tokens,
'is_new_user': created
}, message='登录成功')
@action(detail=False, methods=['post'], url_path='send-code')
def send_code(self, request):
"""
发送验证码
POST /api/v1/auth/send-code/
"""
serializer = SendCodeSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
phone = serializer.validated_data['phone']
ok, err = send_sms_code(phone)
if not ok:
return error(code=ErrorCode.SMS_CODE_SEND_LIMIT, message=err)
return success(
data={'expire_in': 60},
message='验证码已发送'
)
@action(detail=False, methods=['post'], url_path='code-login')
def code_login(self, request):
"""
验证码登录
POST /api/v1/auth/code-login/
"""
serializer = CodeLoginSerializer(data=request.data)
if not serializer.is_valid():
return error(message=str(serializer.errors))
phone = serializer.validated_data['phone']
code = serializer.validated_data['code']
# 校验验证码
valid, err = verify_sms_code(phone, code)
if not valid:
return error(code=ErrorCode.SMS_CODE_INVALID, message=err)
# 获取或创建用户
user, created = User.objects.get_or_create(
phone=phone,
defaults={'nickname': f'用户{phone[-4:]}'}
)
if not user.is_active:
return error(code=ErrorCode.USER_DISABLED, message='账号已被禁用')
tokens = get_app_tokens(user)
return success(data={
'user': UserSerializer(user).data,
'token': tokens,
'is_new_user': created
}, message='登录成功')
@action(detail=False, methods=['post'], url_path='logout',
authentication_classes=[AppJWTAuthentication],
permission_classes=[IsAuthenticated])
def logout(self, request):
"""
退出登录
POST /api/v1/auth/logout/
"""
try:
refresh_token = request.data.get('refresh')
if refresh_token:
token = RefreshToken(refresh_token)
token.blacklist()
except Exception:
pass
return success(message='已退出登录')
@action(detail=False, methods=['delete'], url_path='account',
authentication_classes=[AppJWTAuthentication],
permission_classes=[IsAuthenticated])
def delete_account(self, request):
"""
账号注销
DELETE /api/v1/auth/account/
"""
user = request.user
user.is_pending_deletion = True
user.deletion_requested_at = timezone.now()
user.save(update_fields=['is_pending_deletion', 'deletion_requested_at'])
return success(message='账号注销申请已提交将在7个工作日内处理')
@action(detail=False, methods=['post'], url_path='refresh')
def refresh_token(self, request):
"""
刷新Token
POST /api/v1/auth/refresh
"""
refresh_token = request.data.get('refresh')
if not refresh_token:
return error(message='refresh token不能为空')
try:
refresh = RefreshToken(refresh_token)
# 验证是否为app token
user_type = refresh.get('user_type', 'app')
if user_type not in ['app', None]:
return error(code=103, message='无效的用户Token')
return success(data={
'access': str(refresh.access_token),
'refresh': str(refresh),
})
except Exception as e:
return error(code=103, message='Token已过期或无效')
@extend_schema(tags=['用户'])
class UserViewSet(viewsets.ViewSet):
"""用户视图集 - App端"""
authentication_classes = [AppJWTAuthentication]
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['get'])
def me(self, request):
"""
获取当前用户信息
GET /api/v1/users/me
"""
return success(data=UserSerializer(request.user).data)
@action(detail=False, methods=['put'])
def update_me(self, request):
"""
更新当前用户信息
PUT /api/v1/users/update_me
"""
serializer = UpdateUserSerializer(request.user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return success(data=UserSerializer(request.user).data, message='更新成功')
return error(message=str(serializer.errors))
@action(detail=False, methods=['post'])
def avatar(self, request):
"""
上传头像
POST /api/v1/users/avatar/
"""
file = request.FILES.get('file')
if not file:
return error(message='请选择文件')
# 验证文件类型
allowed_types = ['image/jpeg', 'image/png', 'image/gif']
if file.content_type not in allowed_types:
return error(message='仅支持 JPG、PNG、GIF 格式')
# 验证文件大小 (5MB)
if file.size > 5 * 1024 * 1024:
return error(message='文件大小不能超过 5MB')
try:
oss_client = get_oss_client()
avatar_url = oss_client.upload_file(file, folder='avatars')
except Exception as e:
return error(message=f'上传失败: {str(e)}')
request.user.avatar = avatar_url
request.user.save(update_fields=['avatar'])
return success(data={'avatar_url': avatar_url})
@extend_schema(tags=['管理员-用户管理'])
class AdminUserManageViewSet(viewsets.ViewSet):
"""App用户管理视图集 - 管理端"""
authentication_classes = [AdminJWTAuthentication]
permission_classes = [IsAdminUser]
def list(self, request):
"""
用户列表
GET /api/admin/users
"""
queryset = User.objects.all().order_by('-created_at')
# 搜索条件
phone = request.query_params.get('phone')
nickname = request.query_params.get('nickname')
is_active = request.query_params.get('is_active')
if phone:
queryset = queryset.filter(phone__contains=phone)
if nickname:
queryset = queryset.filter(nickname__contains=nickname)
if is_active is not None:
queryset = queryset.filter(is_active=(is_active.lower() == 'true'))
# 标注统计数据
queryset = queryset.annotate(
spirit_count=Count('spirits', distinct=True),
device_count=Count('user_devices', distinct=True)
)
# 分页
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 10))
start = (page - 1) * page_size
end = start + page_size
total = queryset.count()
users = queryset[start:end]
# 手动构造带统计的数据
items = []
for user in users:
data = UserSerializer(user).data
data['spirit_count'] = getattr(user, 'spirit_count', 0)
data['device_count'] = getattr(user, 'device_count', 0)
items.append(data)
return success(data={
'total': total,
'items': items
})
def retrieve(self, request, pk=None):
"""
用户详情
GET /api/admin/users/{id}
"""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return error(message='用户不存在')
data = UserDetailSerializer(user).data
data['spirit_count'] = user.spirits.count()
data['device_count'] = user.user_devices.count()
return success(data=data)
@action(detail=True, methods=['post'], url_path='toggle-status')
def toggle_status(self, request, pk=None):
"""
启用/禁用用户
POST /api/admin/users/{id}/toggle-status
"""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
return error(message='用户不存在')
user.is_active = not user.is_active
user.save()
return success(
data=UserSerializer(user).data,
message='已启用' if user.is_active else '已禁用'
)