- 步骤1: MacAddressLoginView 增强,返回 code=4010(未绑定)/4011(不存在),支持自动激活 - 步骤2: 新增 bind_status 接口,设备端轮询查询绑定状态(无需认证) - 步骤3: 新增 register 设备自注册接口,首次开机自动注册(无需认证) - 步骤4: UserDeviceSerializer 增加 mac_address 字段 - 步骤5: WebSocket 新增 device_info 消息类型,支持设备状态上报和广播 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
823 lines
32 KiB
Python
823 lines
32 KiB
Python
from dj_rest_auth.registration.views import RegisterView
|
||
from rest_framework import viewsets
|
||
from .models import ParadiseUser, AffinityRule, AffinityLevel
|
||
from device_interaction.models import Device, UserDevice
|
||
from .serializers import ParadiseUserSerializer, CustomRegisterSerializer, UserInfoSerializer, ProfileUpdateSerializer
|
||
from rest_framework import viewsets, status
|
||
from django.contrib.auth.models import Group, Permission
|
||
from rest_framework.response import Response
|
||
from rest_framework.decorators import action, permission_classes, authentication_classes
|
||
from rest_framework.permissions import AllowAny, IsAuthenticated
|
||
from rest_framework.decorators import action
|
||
from django.conf import settings
|
||
from django.contrib.auth import authenticate
|
||
import random
|
||
from .utils import send_sms, generate_token, get_user_id_from_token
|
||
from django.core.cache import cache
|
||
from .authentication import RedisTokenAuthentication
|
||
from rest_framework.views import APIView
|
||
|
||
from django.conf import settings
|
||
from django.utils import timezone
|
||
from django.http import HttpResponseRedirect
|
||
from django.utils import translation
|
||
|
||
# 添加drf-yasg相关导入
|
||
from drf_yasg.utils import swagger_auto_schema
|
||
from drf_yasg import openapi
|
||
from rest_framework import serializers
|
||
|
||
# 引入标准化响应工具
|
||
from common.responses import success_response, error_response, created_response, api_response
|
||
from common.swagger_utils import get_standardized_response_schema, StandardizedResponseSchema
|
||
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 定义Swagger的请求和响应Schema
|
||
class RegisterRequestSchema(serializers.Serializer):
|
||
username = serializers.CharField(required=True, help_text="用户名")
|
||
password1 = serializers.CharField(required=True, help_text="密码")
|
||
password2 = serializers.CharField(required=True, help_text="确认密码")
|
||
email = serializers.EmailField(required=False, help_text="邮箱(可选)")
|
||
phone_number = serializers.CharField(required=False, help_text="手机号码(可选)")
|
||
|
||
class RegisterResponseSchema(serializers.Serializer):
|
||
status = serializers.CharField(help_text="操作状态")
|
||
code = serializers.IntegerField(help_text="状态码")
|
||
message = serializers.CharField(help_text="操作结果信息")
|
||
|
||
class PhoneLoginRequestSchema(serializers.Serializer):
|
||
phone_number = serializers.CharField(required=True, help_text="手机号码")
|
||
code = serializers.CharField(required=True, help_text="验证码")
|
||
|
||
class LoginResponseSchema(serializers.Serializer):
|
||
token = serializers.CharField(help_text="认证令牌")
|
||
|
||
class SendVerifyCodeRequestSchema(serializers.Serializer):
|
||
phone_number = serializers.CharField(required=True, help_text="手机号码")
|
||
|
||
class SendVerifyCodeResponseSchema(serializers.Serializer):
|
||
message = serializers.CharField(help_text="操作结果信息")
|
||
|
||
class UsernameLoginRequestSchema(serializers.Serializer):
|
||
username = serializers.CharField(required=True, help_text="用户名")
|
||
password = serializers.CharField(required=True, help_text="密码")
|
||
|
||
class EmailLoginRequestSchema(serializers.Serializer):
|
||
email = serializers.EmailField(required=True, help_text="邮箱")
|
||
password = serializers.CharField(required=True, help_text="密码")
|
||
|
||
class ErrorResponseSchema(serializers.Serializer):
|
||
error = serializers.CharField(help_text="错误信息")
|
||
|
||
class Meta:
|
||
ref_name = "UserErrorResponse"
|
||
|
||
class MacAddressLoginRequestSchema(serializers.Serializer):
|
||
mac_address = serializers.CharField(required=True, help_text="设备MAC地址")
|
||
|
||
class MacAddressLoginView(APIView):
|
||
"""
|
||
设备MAC地址登录接口
|
||
|
||
使用设备MAC地址进行登录。
|
||
---
|
||
请求参数:
|
||
- mac_address: 设备MAC地址
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=MacAddressLoginRequestSchema,
|
||
responses={
|
||
200: openapi.Response('登录成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
|
||
404: openapi.Response('设备不存在或未绑定', get_standardized_response_schema())
|
||
},
|
||
operation_description="使用设备MAC地址进行登录,返回认证令牌"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
设备MAC地址登录
|
||
|
||
使用设备MAC地址进行登录,返回认证令牌。
|
||
"""
|
||
mac_address = request.data.get('mac_address')
|
||
|
||
if not mac_address:
|
||
logger.warning("Attempt to login without MAC address")
|
||
return error_response(message="MAC address is required")
|
||
|
||
logger.info(f"Attempting MAC address login for device: {mac_address}")
|
||
|
||
try:
|
||
device = Device.objects.get(mac_address=mac_address)
|
||
|
||
# 检查设备是否已绑定给用户
|
||
user_device = UserDevice.objects.filter(device=device).first()
|
||
if not user_device:
|
||
logger.warning(f"Device not bound to any user: {mac_address}")
|
||
# 返回特定 code=4010,让设备端可以识别"未绑定"状态
|
||
return api_response(
|
||
success=False,
|
||
message="Device is not bound to any user",
|
||
code=4010,
|
||
data={
|
||
'device_code': device.device_code,
|
||
'mac_address': mac_address,
|
||
'bound': False
|
||
}
|
||
)
|
||
|
||
# 如果设备未激活,在登录时自动激活
|
||
if not device.is_active:
|
||
device.is_active = True
|
||
device.activated_at = timezone.now()
|
||
device.save()
|
||
logger.info(f"Device auto-activated on login: {mac_address}")
|
||
|
||
# 生成绑定用户的 token
|
||
token = generate_token(user_device.user.id)
|
||
logger.info(f"Successfully logged in device with MAC: {mac_address}, bound to user: {user_device.user.id}")
|
||
|
||
return success_response(
|
||
data={
|
||
'token': token,
|
||
'user_id': user_device.user.id,
|
||
'device_code': device.device_code,
|
||
'mac_address': mac_address,
|
||
'bound': True
|
||
},
|
||
message="登录成功"
|
||
)
|
||
|
||
except Device.DoesNotExist:
|
||
logger.warning(f"Device not found: {mac_address}")
|
||
return api_response(
|
||
success=False,
|
||
message="Device not found",
|
||
code=4011,
|
||
data={
|
||
'mac_address': mac_address,
|
||
'registered': False
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"MAC address login failed: {str(e)}")
|
||
return error_response(message=f"Login failed: {str(e)}")
|
||
|
||
class CustomRegisterView(RegisterView):
|
||
"""
|
||
用户注册接口
|
||
|
||
使用用户名和密码创建新用户账号。
|
||
---
|
||
请求参数:
|
||
- username: 用户名
|
||
- password1: 密码
|
||
- password2: 确认密码
|
||
- email: 邮箱(可选)
|
||
- phone_number: 手机号码(可选)
|
||
"""
|
||
serializer_class = CustomRegisterSerializer
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=RegisterRequestSchema,
|
||
responses={
|
||
201: openapi.Response('注册成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema())
|
||
},
|
||
operation_description="通过用户名和密码创建新用户账号"
|
||
)
|
||
def create(self, request, *args, **kwargs):
|
||
"""
|
||
创建新用户
|
||
|
||
通过用户名和密码创建新用户账号。
|
||
"""
|
||
logger.info(f"Attempting to register new user with username: {request.data.get('username')}")
|
||
serializer = self.get_serializer(data=request.data)
|
||
|
||
if not serializer.is_valid():
|
||
logger.warning(f"Registration validation failed: {serializer.errors}")
|
||
return error_response(message="Registration validation failed", code=400)
|
||
|
||
headers = self.get_success_headers(serializer.data)
|
||
user = serializer.save(self.request)
|
||
logger.info(f"Successfully registered new user with ID: {user.id}")
|
||
|
||
return created_response(
|
||
message="User registered successfully",
|
||
code=201,
|
||
headers=headers
|
||
)
|
||
|
||
|
||
# 新增专门的手机登录视图类
|
||
class PhoneLoginView(APIView):
|
||
"""
|
||
手机验证码登录接口
|
||
|
||
使用手机号和验证码进行登录。
|
||
---
|
||
请求参数:
|
||
- phone_number: 手机号码
|
||
- code: 验证码
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=PhoneLoginRequestSchema,
|
||
responses={
|
||
200: openapi.Response('登录成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema())
|
||
},
|
||
operation_description="使用手机号和验证码进行登录,返回认证令牌"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
手机验证码登录
|
||
|
||
使用手机号和验证码进行登录,返回认证令牌。
|
||
"""
|
||
phone_number = request.data.get('phone_number')
|
||
code = request.data.get('code')
|
||
|
||
if not phone_number or not code:
|
||
logger.warning("Attempt to verify code login without phone number or code")
|
||
return error_response(message="Phone number and code are required")
|
||
|
||
logger.info(f"Attempting verification code login for phone number: {phone_number}")
|
||
cached_code = cache.get(phone_number)
|
||
|
||
if cached_code and cached_code == code:
|
||
cache.delete(phone_number) # 验证成功后删除验证码
|
||
user, created = ParadiseUser.objects.get_or_create(
|
||
phone_number=phone_number,
|
||
defaults={'username': phone_number}
|
||
)
|
||
if created:
|
||
logger.info(f"Created new user with phone number: {phone_number}")
|
||
user.set_unusable_password() # 防止未设置密码的用户被黑客利用
|
||
user.save()
|
||
token = generate_token(user.id)
|
||
logger.info(f"Successfully logged in user with phone number: {phone_number}")
|
||
return success_response(data={'token': token}, message="登录成功")
|
||
|
||
logger.warning(f"Failed verification code login attempt for phone number: {phone_number}")
|
||
return error_response(message="Invalid or expired code")
|
||
|
||
|
||
# 新增发送验证码视图类
|
||
class SendVerifyCodeView(APIView):
|
||
"""
|
||
发送手机验证码接口
|
||
|
||
向指定手机号发送验证码。
|
||
---
|
||
请求参数:
|
||
- phone_number: 手机号码
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=SendVerifyCodeRequestSchema,
|
||
responses={
|
||
200: openapi.Response('发送成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema())
|
||
},
|
||
operation_description="向指定手机号发送6位数验证码,有效期10分钟"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
发送手机验证码
|
||
|
||
向指定手机号发送6位数验证码,有效期10分钟。
|
||
"""
|
||
phone_number = request.data.get('phone_number')
|
||
if not phone_number:
|
||
logger.warning("Attempt to send verification code without phone number")
|
||
return error_response(message="Phone number is required")
|
||
|
||
logger.info(f"Generating verification code for phone number: {phone_number}")
|
||
code = str(random.randint(100000, 999999))
|
||
response = send_sms(phone_number, code)
|
||
|
||
if response and 'Code' in response.decode('utf-8') and 'OK' in response.decode('utf-8'):
|
||
cache.set(phone_number, code, timeout=600) # 存储验证码,超时时间为10分钟
|
||
logger.info(f"Successfully sent verification code to {phone_number}")
|
||
return success_response(message="Verification code sent")
|
||
|
||
logger.error(f"Failed to send verification code to {phone_number}")
|
||
return error_response(message="Failed to send verification code")
|
||
|
||
|
||
# 新增邮箱登录视图类
|
||
class EmailLoginView(APIView):
|
||
"""
|
||
邮箱登录接口
|
||
|
||
使用邮箱和密码进行登录。
|
||
---
|
||
请求参数:
|
||
- email: 邮箱
|
||
- password: 密码
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=EmailLoginRequestSchema,
|
||
responses={
|
||
200: openapi.Response('登录成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema())
|
||
},
|
||
operation_description="使用邮箱和密码进行登录,返回认证令牌"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
邮箱登录
|
||
|
||
使用邮箱和密码进行登录,返回认证令牌。
|
||
"""
|
||
email = request.data.get('email')
|
||
password = request.data.get('password')
|
||
|
||
if not email or not password:
|
||
logger.warning("Attempt to login without email or password")
|
||
return error_response(message="Email and password are required")
|
||
|
||
# 检查缓存中是否有该用户邮箱的认证结果
|
||
cache_key = f"auth:email:{email}"
|
||
cached_auth = cache.get(cache_key)
|
||
|
||
if cached_auth:
|
||
if cached_auth == password:
|
||
try:
|
||
user = ParadiseUser.objects.get(email=email)
|
||
token = generate_token(user.id)
|
||
logger.info(f"User logged in via cached email credentials: {email}")
|
||
return success_response(data={'token': token}, message="登录成功")
|
||
except ParadiseUser.DoesNotExist:
|
||
logger.warning(f"Login attempt with cached credentials but user not found: {email}")
|
||
return error_response(message="Invalid email or password")
|
||
else:
|
||
return error_response(message="Invalid email or password")
|
||
|
||
try:
|
||
user = ParadiseUser.objects.get(email=email)
|
||
if user.check_password(password):
|
||
cache.set(cache_key, password, timeout=3600) # 缓存认证结果,有效期1小时
|
||
token = generate_token(user.id)
|
||
logger.info(f"User logged in with email: {email}")
|
||
return success_response(data={'token': token}, message="登录成功")
|
||
except ParadiseUser.DoesNotExist:
|
||
pass
|
||
|
||
logger.warning(f"Failed login attempt with email: {email}")
|
||
return error_response(message="Invalid email or password")
|
||
|
||
|
||
# 新增用户名登录视图类
|
||
class UsernameLoginView(APIView):
|
||
"""
|
||
用户名登录接口
|
||
|
||
使用用户名和密码进行登录。
|
||
---
|
||
请求参数:
|
||
- username: 用户名
|
||
- password: 密码
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['用户认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=UsernameLoginRequestSchema,
|
||
responses={
|
||
200: openapi.Response('登录成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema())
|
||
},
|
||
operation_description="使用用户名和密码进行登录,返回认证令牌"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
用户名登录
|
||
|
||
使用用户名和密码进行登录,返回认证令牌。
|
||
"""
|
||
username = request.data.get('username')
|
||
password = request.data.get('password')
|
||
|
||
if not username or not password:
|
||
return error_response(message="Username and password are required")
|
||
|
||
# 检查缓存中是否有该用户的认证结果
|
||
cache_key = f"auth:{username}"
|
||
cached_auth = cache.get(cache_key)
|
||
|
||
if cached_auth:
|
||
if cached_auth == password:
|
||
user = ParadiseUser.objects.get(username=username)
|
||
token = generate_token(user.id)
|
||
return success_response(data={'token': token}, message="登录成功")
|
||
else:
|
||
return error_response(message="Invalid username or password")
|
||
|
||
user = authenticate(request, username=username, password=password)
|
||
|
||
if user is not None:
|
||
# 缓存认证结果,有效期1小时
|
||
cache.set(cache_key, password, timeout=3600)
|
||
token = generate_token(user.id)
|
||
return success_response(data={'token': token}, message="登录成功")
|
||
else:
|
||
return error_response(message="Invalid username or password")
|
||
|
||
|
||
class UserInfoResponseSchema(serializers.Serializer):
|
||
status = serializers.CharField(help_text="操作状态")
|
||
code = serializers.IntegerField(help_text="状态码")
|
||
data = UserInfoSerializer(help_text="用户信息")
|
||
|
||
class ProfileUpdateRequestSchema(serializers.Serializer):
|
||
gender = serializers.ChoiceField(choices=['M', 'F', 'O'], required=False, help_text="性别")
|
||
resident_city = serializers.CharField(required=False, help_text="常驻城市")
|
||
birthday = serializers.DateField(required=False, help_text="生日")
|
||
zodiac_sign = serializers.CharField(required=False, help_text="星座")
|
||
mbti = serializers.ChoiceField(
|
||
choices=[
|
||
'INTJ', 'INTP', 'ENTJ', 'ENTP',
|
||
'INFJ', 'INFP', 'ENFJ', 'ENFP',
|
||
'ISTJ', 'ISFJ', 'ESTJ', 'ESFJ',
|
||
'ISTP', 'ISFP', 'ESTP', 'ESFP'
|
||
],
|
||
required=False,
|
||
help_text="MBTI性格类型"
|
||
)
|
||
interests = serializers.CharField(required=False, help_text="兴趣爱好")
|
||
social_identity = serializers.CharField(required=False, help_text="社会身份")
|
||
|
||
class BindPhoneRequestSchema(serializers.Serializer):
|
||
phone_number = serializers.CharField(required=True, help_text="手机号码")
|
||
code = serializers.CharField(required=True, help_text="验证码")
|
||
|
||
|
||
class GroupSerializer(serializers.ModelSerializer):
|
||
user_count = serializers.SerializerMethodField()
|
||
permissions = serializers.SerializerMethodField()
|
||
|
||
class Meta:
|
||
model = Group
|
||
fields = ['id', 'name', 'user_count', 'permissions']
|
||
|
||
def get_user_count(self, obj):
|
||
return obj.paradiseuser_set.count()
|
||
|
||
def get_permissions(self, obj):
|
||
return list(obj.permissions.values_list('codename', flat=True))
|
||
|
||
|
||
class GroupViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
角色(用户组)管理接口
|
||
仅管理员可访问
|
||
"""
|
||
queryset = Group.objects.all()
|
||
serializer_class = GroupSerializer
|
||
permission_classes = [IsAuthenticated]
|
||
authentication_classes = [RedisTokenAuthentication]
|
||
|
||
def get_permissions(self):
|
||
if self.action in ['list', 'retrieve']:
|
||
return [IsAuthenticated()]
|
||
return [IsAuthenticated()]
|
||
|
||
def get_queryset(self):
|
||
if not self.request.user.is_staff:
|
||
return Group.objects.none()
|
||
return Group.objects.all()
|
||
|
||
|
||
class AffinityRuleSerializer(serializers.ModelSerializer):
|
||
class Meta:
|
||
model = AffinityRule
|
||
fields = ['id', 'name', 'description', 'points', 'daily_limit', 'is_active', 'created_at', 'updated_at']
|
||
|
||
|
||
class AffinityLevelSerializer(serializers.ModelSerializer):
|
||
class Meta:
|
||
model = AffinityLevel
|
||
fields = ['id', 'level', 'name', 'description', 'required_points', 'rewards', 'created_at', 'updated_at']
|
||
|
||
|
||
class AffinityRuleViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
好感度规则管理接口
|
||
仅管理员可写,认证用户可读
|
||
"""
|
||
queryset = AffinityRule.objects.all()
|
||
serializer_class = AffinityRuleSerializer
|
||
permission_classes = [IsAuthenticated]
|
||
authentication_classes = [RedisTokenAuthentication]
|
||
|
||
def get_permissions(self):
|
||
if self.action in ['list', 'retrieve']:
|
||
return [IsAuthenticated()]
|
||
if not self.request.user.is_staff:
|
||
from rest_framework.exceptions import PermissionDenied
|
||
raise PermissionDenied()
|
||
return [IsAuthenticated()]
|
||
|
||
|
||
class AffinityLevelViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
好感度等级管理接口
|
||
仅管理员可写,认证用户可读
|
||
"""
|
||
queryset = AffinityLevel.objects.all()
|
||
serializer_class = AffinityLevelSerializer
|
||
permission_classes = [IsAuthenticated]
|
||
authentication_classes = [RedisTokenAuthentication]
|
||
|
||
def get_permissions(self):
|
||
if self.action in ['list', 'retrieve']:
|
||
return [IsAuthenticated()]
|
||
if not self.request.user.is_staff:
|
||
from rest_framework.exceptions import PermissionDenied
|
||
raise PermissionDenied()
|
||
return [IsAuthenticated()]
|
||
|
||
|
||
class ParadiseUserViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
用户管理接口
|
||
|
||
提供用户信息的管理功能,包括获取用户详情等。
|
||
"""
|
||
queryset = ParadiseUser.objects.all()
|
||
serializer_class = ParadiseUserSerializer
|
||
authentication_classes = [RedisTokenAuthentication]
|
||
permission_classes = [IsAuthenticated]
|
||
tags = ['用户管理']
|
||
|
||
def get_permissions(self):
|
||
"""根据不同的action设置不同的权限"""
|
||
if self.action == 'create':
|
||
return [AllowAny()]
|
||
return [IsAuthenticated()]
|
||
|
||
@swagger_auto_schema(
|
||
responses={
|
||
200: openapi.Response('获取成功', get_standardized_response_schema()),
|
||
401: openapi.Response('认证失败', get_standardized_response_schema())
|
||
},
|
||
operation_description="返回当前登录用户的详细信息,不包含敏感字段如密码",
|
||
security=[{'Bearer': []}]
|
||
)
|
||
@action(detail=False, methods=['get'])
|
||
def info(self, request):
|
||
"""
|
||
获取当前用户信息
|
||
|
||
返回当前登录用户的详细信息,不包含敏感字段如密码。
|
||
"""
|
||
user = request.user
|
||
serializer = UserInfoSerializer(user)
|
||
logger.info(f"User {user.id} retrieved their profile info")
|
||
return success_response(data=serializer.data)
|
||
|
||
@swagger_auto_schema(
|
||
methods=['put', 'patch'],
|
||
request_body=ProfileUpdateRequestSchema,
|
||
responses={
|
||
200: openapi.Response('更新成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
|
||
401: openapi.Response('认证失败', get_standardized_response_schema())
|
||
},
|
||
operation_description="更新当前登录用户的个人资料信息",
|
||
security=[{'Bearer': []}]
|
||
)
|
||
@action(detail=False, methods=['put', 'patch'])
|
||
def update_profile(self, request):
|
||
"""
|
||
更新用户个人资料
|
||
|
||
允许用户更新自己的个人资料信息,包括性别、常驻城市、生日等。
|
||
"""
|
||
user = request.user
|
||
serializer = ProfileUpdateSerializer(user, data=request.data, partial=True)
|
||
|
||
if serializer.is_valid():
|
||
serializer.save()
|
||
logger.info(f"User {user.id} updated their profile")
|
||
return success_response(
|
||
data=serializer.data,
|
||
message="个人资料已更新"
|
||
)
|
||
else:
|
||
logger.warning(f"User {user.id} profile update failed: {serializer.errors}")
|
||
return error_response(
|
||
message="个人资料更新失败",
|
||
data=serializer.errors,
|
||
code=400
|
||
)
|
||
|
||
@swagger_auto_schema(
|
||
methods=['post'],
|
||
request_body=BindPhoneRequestSchema,
|
||
responses={
|
||
200: openapi.Response('绑定成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
|
||
401: openapi.Response('认证失败', get_standardized_response_schema()),
|
||
409: openapi.Response('手机号已被使用', get_standardized_response_schema())
|
||
},
|
||
operation_description="为当前用户绑定或更新手机号,需要通过验证码验证",
|
||
security=[{'Bearer': []}]
|
||
)
|
||
@action(detail=False, methods=['post'])
|
||
def bind_phone(self, request):
|
||
"""
|
||
绑定或修改手机号
|
||
|
||
允许用户绑定新手机号或修改已有手机号,需通过验证码验证。
|
||
"""
|
||
user = request.user
|
||
phone_number = request.data.get('phone_number')
|
||
code = request.data.get('code')
|
||
|
||
if not phone_number or not code:
|
||
logger.warning(f"User {user.id} attempted to bind phone without providing phone number or code")
|
||
return error_response(message="手机号和验证码不能为空", code=400)
|
||
|
||
# 验证手机号格式(简单验证)
|
||
if not phone_number.isdigit() or len(phone_number) < 5 or len(phone_number) > 20:
|
||
logger.warning(f"User {user.id} attempted to bind invalid phone number format: {phone_number}")
|
||
return error_response(message="手机号格式不正确", code=400)
|
||
|
||
# 检查手机号是否被其他用户使用
|
||
if ParadiseUser.objects.filter(phone_number=phone_number).exclude(id=user.id).exists():
|
||
logger.warning(f"User {user.id} attempted to bind phone number already in use: {phone_number}")
|
||
return error_response(message="该手机号已被其他账号使用", code=409)
|
||
|
||
# 验证验证码
|
||
cached_code = cache.get(phone_number)
|
||
if not cached_code or cached_code != code:
|
||
logger.warning(f"User {user.id} provided invalid verification code for phone: {phone_number}")
|
||
return error_response(message="验证码无效或已过期", code=400)
|
||
|
||
# 验证通过,更新手机号
|
||
old_phone = user.phone_number
|
||
user.phone_number = phone_number
|
||
user.save()
|
||
|
||
# 删除缓存中的验证码
|
||
cache.delete(phone_number)
|
||
|
||
if old_phone:
|
||
logger.info(f"User {user.id} changed phone number from {old_phone} to {phone_number}")
|
||
return success_response(message="手机号已更新")
|
||
else:
|
||
logger.info(f"User {user.id} bound new phone number: {phone_number}")
|
||
return success_response(message="手机号已绑定")
|
||
|
||
def custom_set_language(request):
|
||
lang_code = request.GET.get('language', None)
|
||
next_url = request.GET.get('next', '/')
|
||
|
||
if lang_code and lang_code in dict(settings.LANGUAGES):
|
||
logger.info(f"Setting language to: {lang_code}")
|
||
# 设置语言
|
||
translation.activate(lang_code)
|
||
response = HttpResponseRedirect(next_url)
|
||
response.set_cookie(settings.LANGUAGE_COOKIE_NAME, lang_code)
|
||
return response
|
||
|
||
logger.warning(f"Attempt to set invalid language code: {lang_code}")
|
||
return HttpResponseRedirect(next_url)
|
||
|
||
class AdminEmailLoginRequestSchema(serializers.Serializer):
|
||
email = serializers.EmailField(required=True, help_text="管理员邮箱")
|
||
password = serializers.CharField(required=True, help_text="密码")
|
||
|
||
class AdminEmailLoginView(APIView):
|
||
"""
|
||
管理员邮箱登录接口
|
||
|
||
专用于管理员通过邮箱和密码进行登录,拒绝普通用户登录。
|
||
---
|
||
请求参数:
|
||
- email: 管理员邮箱
|
||
- password: 密码
|
||
"""
|
||
permission_classes = [AllowAny]
|
||
tags = ['管理员认证']
|
||
|
||
@swagger_auto_schema(
|
||
request_body=AdminEmailLoginRequestSchema,
|
||
responses={
|
||
200: openapi.Response('登录成功', get_standardized_response_schema()),
|
||
400: openapi.Response('请求参数错误', get_standardized_response_schema()),
|
||
403: openapi.Response('权限不足', get_standardized_response_schema())
|
||
},
|
||
operation_description="专用于管理员通过邮箱和密码登录,拒绝普通用户登录,返回认证令牌"
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
管理员邮箱登录
|
||
|
||
使用邮箱和密码进行管理员登录,返回认证令牌。普通用户无法通过此接口登录。
|
||
"""
|
||
email = request.data.get('email')
|
||
password = request.data.get('password')
|
||
|
||
if not email or not password:
|
||
logger.warning("Attempt to admin login without email or password")
|
||
return error_response(message="Email and password are required")
|
||
|
||
try:
|
||
user = ParadiseUser.objects.get(email=email)
|
||
|
||
# 验证用户是否是管理员
|
||
if not user.is_staff:
|
||
logger.warning(f"Non-admin user attempted to login via admin endpoint: {email}")
|
||
return error_response(
|
||
message="Access denied. Admin privileges required.",
|
||
code=403,
|
||
status_code=status.HTTP_403_FORBIDDEN
|
||
)
|
||
|
||
if user.check_password(password):
|
||
# 使用is_admin=True生成管理员专用token
|
||
token = generate_token(user.id, is_admin=True)
|
||
logger.info(f"Admin logged in with email: {email}")
|
||
|
||
# 获取用户角色名称(取第一个分组名称)
|
||
role_name = "超级管理员" if user.is_superuser else None
|
||
if not role_name:
|
||
group = user.groups.first()
|
||
role_name = group.name if group else "管理员"
|
||
|
||
return success_response(data={
|
||
'token': token,
|
||
'is_superuser': user.is_superuser,
|
||
'role': role_name,
|
||
}, message="管理员登录成功")
|
||
except ParadiseUser.DoesNotExist:
|
||
pass
|
||
|
||
logger.warning(f"Failed admin login attempt with email: {email}")
|
||
return error_response(message="Invalid email or password")
|
||
|
||
class AdminLogoutView(APIView):
|
||
"""
|
||
管理员登出接口
|
||
|
||
使管理员的认证令牌失效。
|
||
"""
|
||
authentication_classes = [RedisTokenAuthentication]
|
||
permission_classes = [IsAuthenticated]
|
||
tags = ['管理员认证']
|
||
|
||
@swagger_auto_schema(
|
||
responses={
|
||
200: openapi.Response('登出成功', get_standardized_response_schema()),
|
||
401: openapi.Response('认证失败', get_standardized_response_schema()),
|
||
403: openapi.Response('权限不足', get_standardized_response_schema())
|
||
},
|
||
operation_description="使当前管理员的认证令牌失效,需要在请求头中提供有效的管理员token",
|
||
security=[{'Bearer': []}]
|
||
)
|
||
def post(self, request):
|
||
"""
|
||
管理员登出
|
||
|
||
使当前管理员的认证令牌失效。
|
||
"""
|
||
user = request.user
|
||
|
||
# 验证用户是否是管理员
|
||
if not user.is_staff:
|
||
logger.warning(f"Non-admin user attempted to use admin logout endpoint: {user.id}")
|
||
return error_response(
|
||
message="Access denied. Admin privileges required.",
|
||
code=403,
|
||
status_code=status.HTTP_403_FORBIDDEN
|
||
)
|
||
|
||
# 获取请求头中的token
|
||
auth_header = request.headers.get('Authorization', '')
|
||
if ' ' in auth_header:
|
||
_, token = auth_header.split(' ', 1)
|
||
# 删除Redis中的token
|
||
cache.delete(f"admin_token:{token}")
|
||
logger.info(f"Admin {user.id} logged out successfully")
|
||
return success_response(message="管理员已成功登出")
|
||
|
||
return error_response(message="Invalid authorization header")
|