from dj_rest_auth.registration.views import RegisterView from rest_framework import viewsets from .models import ParadiseUser from device_interaction.models import Device, UserDevice from .serializers import ParadiseUserSerializer, CustomRegisterSerializer, UserInfoSerializer, ProfileUpdateSerializer from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.decorators import action, permission_classes, authentication_classes from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.decorators import action from django.conf import settings from django.contrib.auth import authenticate import random from .utils import send_sms, generate_token, get_user_id_from_token from django.core.cache import cache from .authentication import RedisTokenAuthentication from rest_framework.views import APIView from django.conf import settings from django.http import HttpResponseRedirect from django.utils import translation # 添加drf-yasg相关导入 from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi from rest_framework import serializers # 引入标准化响应工具 from common.responses import success_response, error_response, created_response from common.swagger_utils import get_standardized_response_schema, StandardizedResponseSchema import logging logger = logging.getLogger(__name__) # 定义Swagger的请求和响应Schema class RegisterRequestSchema(serializers.Serializer): username = serializers.CharField(required=True, help_text="用户名") password1 = serializers.CharField(required=True, help_text="密码") password2 = serializers.CharField(required=True, help_text="确认密码") email = serializers.EmailField(required=False, help_text="邮箱(可选)") phone_number = serializers.CharField(required=False, help_text="手机号码(可选)") class RegisterResponseSchema(serializers.Serializer): status = serializers.CharField(help_text="操作状态") code = serializers.IntegerField(help_text="状态码") message = serializers.CharField(help_text="操作结果信息") class PhoneLoginRequestSchema(serializers.Serializer): phone_number = serializers.CharField(required=True, help_text="手机号码") code = serializers.CharField(required=True, help_text="验证码") class LoginResponseSchema(serializers.Serializer): token = serializers.CharField(help_text="认证令牌") class SendVerifyCodeRequestSchema(serializers.Serializer): phone_number = serializers.CharField(required=True, help_text="手机号码") class SendVerifyCodeResponseSchema(serializers.Serializer): message = serializers.CharField(help_text="操作结果信息") class UsernameLoginRequestSchema(serializers.Serializer): username = serializers.CharField(required=True, help_text="用户名") password = serializers.CharField(required=True, help_text="密码") class EmailLoginRequestSchema(serializers.Serializer): email = serializers.EmailField(required=True, help_text="邮箱") password = serializers.CharField(required=True, help_text="密码") class ErrorResponseSchema(serializers.Serializer): error = serializers.CharField(help_text="错误信息") class Meta: ref_name = "UserErrorResponse" class MacAddressLoginRequestSchema(serializers.Serializer): mac_address = serializers.CharField(required=True, help_text="设备MAC地址") class MacAddressLoginView(APIView): """ 设备MAC地址登录接口 使用设备MAC地址进行登录。 --- 请求参数: - mac_address: 设备MAC地址 """ permission_classes = [AllowAny] tags = ['用户认证'] @swagger_auto_schema( request_body=MacAddressLoginRequestSchema, responses={ 200: openapi.Response('登录成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()), 404: openapi.Response('设备不存在或未绑定', get_standardized_response_schema()) }, operation_description="使用设备MAC地址进行登录,返回认证令牌" ) def post(self, request): """ 设备MAC地址登录 使用设备MAC地址进行登录,返回认证令牌。 """ mac_address = request.data.get('mac_address') if not mac_address: logger.warning("Attempt to login without MAC address") return error_response(message="MAC address is required") logger.info(f"Attempting MAC address login for device: {mac_address}") try: # 查找设备 device = Device.objects.get(mac_address=mac_address) # 检查设备是否已激活 if not device.is_active: logger.warning(f"Device not active: {mac_address}") return error_response(message="Device is not active") # 检查设备是否已绑定给用户 user_device = UserDevice.objects.filter(device=device).first() if not user_device: logger.warning(f"Device not bound to any user: {mac_address}") return error_response(message="Device is not bound to any user") # 生成认证令牌 token = generate_token(user_device.user.id) logger.info(f"Successfully logged in device with MAC: {mac_address}") return success_response( data={'token': token}, message="登录成功" ) except Device.DoesNotExist: logger.warning(f"Device not found: {mac_address}") return error_response(message="Device not found") except Exception as e: logger.error(f"MAC address login failed: {str(e)}") return error_response(message=f"Login failed: {str(e)}") class CustomRegisterView(RegisterView): """ 用户注册接口 使用用户名和密码创建新用户账号。 --- 请求参数: - username: 用户名 - password1: 密码 - password2: 确认密码 - email: 邮箱(可选) - phone_number: 手机号码(可选) """ serializer_class = CustomRegisterSerializer tags = ['用户认证'] @swagger_auto_schema( request_body=RegisterRequestSchema, responses={ 201: openapi.Response('注册成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()) }, operation_description="通过用户名和密码创建新用户账号" ) def create(self, request, *args, **kwargs): """ 创建新用户 通过用户名和密码创建新用户账号。 """ logger.info(f"Attempting to register new user with username: {request.data.get('username')}") serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): logger.warning(f"Registration validation failed: {serializer.errors}") return error_response(message="Registration validation failed", code=400) headers = self.get_success_headers(serializer.data) user = serializer.save(self.request) logger.info(f"Successfully registered new user with ID: {user.id}") return created_response( message="User registered successfully", code=201, headers=headers ) # 新增专门的手机登录视图类 class PhoneLoginView(APIView): """ 手机验证码登录接口 使用手机号和验证码进行登录。 --- 请求参数: - phone_number: 手机号码 - code: 验证码 """ permission_classes = [AllowAny] tags = ['用户认证'] @swagger_auto_schema( request_body=PhoneLoginRequestSchema, responses={ 200: openapi.Response('登录成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()) }, operation_description="使用手机号和验证码进行登录,返回认证令牌" ) def post(self, request): """ 手机验证码登录 使用手机号和验证码进行登录,返回认证令牌。 """ phone_number = request.data.get('phone_number') code = request.data.get('code') if not phone_number or not code: logger.warning("Attempt to verify code login without phone number or code") return error_response(message="Phone number and code are required") logger.info(f"Attempting verification code login for phone number: {phone_number}") cached_code = cache.get(phone_number) if cached_code and cached_code == code: cache.delete(phone_number) # 验证成功后删除验证码 user, created = ParadiseUser.objects.get_or_create(phone_number=phone_number) if created: logger.info(f"Created new user with phone number: {phone_number}") user.set_unusable_password() # 防止未设置密码的用户被黑客利用 user.save() token = generate_token(user.id) logger.info(f"Successfully logged in user with phone number: {phone_number}") return success_response(data={'token': token}, message="登录成功") logger.warning(f"Failed verification code login attempt for phone number: {phone_number}") return error_response(message="Invalid or expired code") # 新增发送验证码视图类 class SendVerifyCodeView(APIView): """ 发送手机验证码接口 向指定手机号发送验证码。 --- 请求参数: - phone_number: 手机号码 """ permission_classes = [AllowAny] tags = ['用户认证'] @swagger_auto_schema( request_body=SendVerifyCodeRequestSchema, responses={ 200: openapi.Response('发送成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()) }, operation_description="向指定手机号发送6位数验证码,有效期10分钟" ) def post(self, request): """ 发送手机验证码 向指定手机号发送6位数验证码,有效期10分钟。 """ phone_number = request.data.get('phone_number') if not phone_number: logger.warning("Attempt to send verification code without phone number") return error_response(message="Phone number is required") logger.info(f"Generating verification code for phone number: {phone_number}") code = str(random.randint(100000, 999999)) response = send_sms(phone_number, code) if response and 'Code' in response.decode('utf-8') and 'OK' in response.decode('utf-8'): cache.set(phone_number, code, timeout=600) # 存储验证码,超时时间为10分钟 logger.info(f"Successfully sent verification code to {phone_number}") return success_response(message="Verification code sent") logger.error(f"Failed to send verification code to {phone_number}") return error_response(message="Failed to send verification code") # 新增邮箱登录视图类 class EmailLoginView(APIView): """ 邮箱登录接口 使用邮箱和密码进行登录。 --- 请求参数: - email: 邮箱 - password: 密码 """ permission_classes = [AllowAny] tags = ['用户认证'] @swagger_auto_schema( request_body=EmailLoginRequestSchema, responses={ 200: openapi.Response('登录成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()) }, operation_description="使用邮箱和密码进行登录,返回认证令牌" ) def post(self, request): """ 邮箱登录 使用邮箱和密码进行登录,返回认证令牌。 """ email = request.data.get('email') password = request.data.get('password') if not email or not password: logger.warning("Attempt to login without email or password") return error_response(message="Email and password are required") # 检查缓存中是否有该用户邮箱的认证结果 cache_key = f"auth:email:{email}" cached_auth = cache.get(cache_key) if cached_auth: if cached_auth == password: try: user = ParadiseUser.objects.get(email=email) token = generate_token(user.id) logger.info(f"User logged in via cached email credentials: {email}") return success_response(data={'token': token}, message="登录成功") except ParadiseUser.DoesNotExist: logger.warning(f"Login attempt with cached credentials but user not found: {email}") return error_response(message="Invalid email or password") else: return error_response(message="Invalid email or password") try: user = ParadiseUser.objects.get(email=email) if user.check_password(password): cache.set(cache_key, password, timeout=3600) # 缓存认证结果,有效期1小时 token = generate_token(user.id) logger.info(f"User logged in with email: {email}") return success_response(data={'token': token}, message="登录成功") except ParadiseUser.DoesNotExist: pass logger.warning(f"Failed login attempt with email: {email}") return error_response(message="Invalid email or password") # 新增用户名登录视图类 class UsernameLoginView(APIView): """ 用户名登录接口 使用用户名和密码进行登录。 --- 请求参数: - username: 用户名 - password: 密码 """ permission_classes = [AllowAny] tags = ['用户认证'] @swagger_auto_schema( request_body=UsernameLoginRequestSchema, responses={ 200: openapi.Response('登录成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()) }, operation_description="使用用户名和密码进行登录,返回认证令牌" ) def post(self, request): """ 用户名登录 使用用户名和密码进行登录,返回认证令牌。 """ username = request.data.get('username') password = request.data.get('password') if not username or not password: return error_response(message="Username and password are required") # 检查缓存中是否有该用户的认证结果 cache_key = f"auth:{username}" cached_auth = cache.get(cache_key) if cached_auth: if cached_auth == password: user = ParadiseUser.objects.get(username=username) token = generate_token(user.id) return success_response(data={'token': token}, message="登录成功") else: return error_response(message="Invalid username or password") user = authenticate(request, username=username, password=password) if user is not None: # 缓存认证结果,有效期1小时 cache.set(cache_key, password, timeout=3600) token = generate_token(user.id) return success_response(data={'token': token}, message="登录成功") else: return error_response(message="Invalid username or password") class UserInfoResponseSchema(serializers.Serializer): status = serializers.CharField(help_text="操作状态") code = serializers.IntegerField(help_text="状态码") data = UserInfoSerializer(help_text="用户信息") class ProfileUpdateRequestSchema(serializers.Serializer): gender = serializers.ChoiceField(choices=['M', 'F', 'O'], required=False, help_text="性别") resident_city = serializers.CharField(required=False, help_text="常驻城市") birthday = serializers.DateField(required=False, help_text="生日") zodiac_sign = serializers.CharField(required=False, help_text="星座") mbti = serializers.ChoiceField( choices=[ 'INTJ', 'INTP', 'ENTJ', 'ENTP', 'INFJ', 'INFP', 'ENFJ', 'ENFP', 'ISTJ', 'ISFJ', 'ESTJ', 'ESFJ', 'ISTP', 'ISFP', 'ESTP', 'ESFP' ], required=False, help_text="MBTI性格类型" ) interests = serializers.CharField(required=False, help_text="兴趣爱好") social_identity = serializers.CharField(required=False, help_text="社会身份") class BindPhoneRequestSchema(serializers.Serializer): phone_number = serializers.CharField(required=True, help_text="手机号码") code = serializers.CharField(required=True, help_text="验证码") class ParadiseUserViewSet(viewsets.ModelViewSet): """ 用户管理接口 提供用户信息的管理功能,包括获取用户详情等。 """ queryset = ParadiseUser.objects.all() serializer_class = ParadiseUserSerializer authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['用户管理'] def get_permissions(self): """根据不同的action设置不同的权限""" if self.action == 'create': return [AllowAny()] return [IsAuthenticated()] @swagger_auto_schema( responses={ 200: openapi.Response('获取成功', get_standardized_response_schema()), 401: openapi.Response('认证失败', get_standardized_response_schema()) }, operation_description="返回当前登录用户的详细信息,不包含敏感字段如密码", security=[{'Bearer': []}] ) @action(detail=False, methods=['get']) def info(self, request): """ 获取当前用户信息 返回当前登录用户的详细信息,不包含敏感字段如密码。 """ user = request.user serializer = UserInfoSerializer(user) logger.info(f"User {user.id} retrieved their profile info") return success_response(data=serializer.data) @swagger_auto_schema( methods=['put', 'patch'], request_body=ProfileUpdateRequestSchema, responses={ 200: openapi.Response('更新成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()), 401: openapi.Response('认证失败', get_standardized_response_schema()) }, operation_description="更新当前登录用户的个人资料信息", security=[{'Bearer': []}] ) @action(detail=False, methods=['put', 'patch']) def update_profile(self, request): """ 更新用户个人资料 允许用户更新自己的个人资料信息,包括性别、常驻城市、生日等。 """ user = request.user serializer = ProfileUpdateSerializer(user, data=request.data, partial=True) if serializer.is_valid(): serializer.save() logger.info(f"User {user.id} updated their profile") return success_response( data=serializer.data, message="个人资料已更新" ) else: logger.warning(f"User {user.id} profile update failed: {serializer.errors}") return error_response( message="个人资料更新失败", data=serializer.errors, code=400 ) @swagger_auto_schema( methods=['post'], request_body=BindPhoneRequestSchema, responses={ 200: openapi.Response('绑定成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()), 401: openapi.Response('认证失败', get_standardized_response_schema()), 409: openapi.Response('手机号已被使用', get_standardized_response_schema()) }, operation_description="为当前用户绑定或更新手机号,需要通过验证码验证", security=[{'Bearer': []}] ) @action(detail=False, methods=['post']) def bind_phone(self, request): """ 绑定或修改手机号 允许用户绑定新手机号或修改已有手机号,需通过验证码验证。 """ user = request.user phone_number = request.data.get('phone_number') code = request.data.get('code') if not phone_number or not code: logger.warning(f"User {user.id} attempted to bind phone without providing phone number or code") return error_response(message="手机号和验证码不能为空", code=400) # 验证手机号格式(简单验证) if not phone_number.isdigit() or len(phone_number) < 5 or len(phone_number) > 20: logger.warning(f"User {user.id} attempted to bind invalid phone number format: {phone_number}") return error_response(message="手机号格式不正确", code=400) # 检查手机号是否被其他用户使用 if ParadiseUser.objects.filter(phone_number=phone_number).exclude(id=user.id).exists(): logger.warning(f"User {user.id} attempted to bind phone number already in use: {phone_number}") return error_response(message="该手机号已被其他账号使用", code=409) # 验证验证码 cached_code = cache.get(phone_number) if not cached_code or cached_code != code: logger.warning(f"User {user.id} provided invalid verification code for phone: {phone_number}") return error_response(message="验证码无效或已过期", code=400) # 验证通过,更新手机号 old_phone = user.phone_number user.phone_number = phone_number user.save() # 删除缓存中的验证码 cache.delete(phone_number) if old_phone: logger.info(f"User {user.id} changed phone number from {old_phone} to {phone_number}") return success_response(message="手机号已更新") else: logger.info(f"User {user.id} bound new phone number: {phone_number}") return success_response(message="手机号已绑定") def custom_set_language(request): lang_code = request.GET.get('language', None) next_url = request.GET.get('next', '/') if lang_code and lang_code in dict(settings.LANGUAGES): logger.info(f"Setting language to: {lang_code}") # 设置语言 translation.activate(lang_code) response = HttpResponseRedirect(next_url) response.set_cookie(settings.LANGUAGE_COOKIE_NAME, lang_code) return response logger.warning(f"Attempt to set invalid language code: {lang_code}") return HttpResponseRedirect(next_url) class AdminEmailLoginRequestSchema(serializers.Serializer): email = serializers.EmailField(required=True, help_text="管理员邮箱") password = serializers.CharField(required=True, help_text="密码") class AdminEmailLoginView(APIView): """ 管理员邮箱登录接口 专用于管理员通过邮箱和密码进行登录,拒绝普通用户登录。 --- 请求参数: - email: 管理员邮箱 - password: 密码 """ permission_classes = [AllowAny] tags = ['管理员认证'] @swagger_auto_schema( request_body=AdminEmailLoginRequestSchema, responses={ 200: openapi.Response('登录成功', get_standardized_response_schema()), 400: openapi.Response('请求参数错误', get_standardized_response_schema()), 403: openapi.Response('权限不足', get_standardized_response_schema()) }, operation_description="专用于管理员通过邮箱和密码登录,拒绝普通用户登录,返回认证令牌" ) def post(self, request): """ 管理员邮箱登录 使用邮箱和密码进行管理员登录,返回认证令牌。普通用户无法通过此接口登录。 """ email = request.data.get('email') password = request.data.get('password') if not email or not password: logger.warning("Attempt to admin login without email or password") return error_response(message="Email and password are required") try: user = ParadiseUser.objects.get(email=email) # 验证用户是否是管理员 if not user.is_staff: logger.warning(f"Non-admin user attempted to login via admin endpoint: {email}") return error_response( message="Access denied. Admin privileges required.", code=403, status_code=status.HTTP_403_FORBIDDEN ) if user.check_password(password): # 使用is_admin=True生成管理员专用token token = generate_token(user.id, is_admin=True) logger.info(f"Admin logged in with email: {email}") return success_response(data={'token': token}, message="管理员登录成功") except ParadiseUser.DoesNotExist: pass logger.warning(f"Failed admin login attempt with email: {email}") return error_response(message="Invalid email or password") class AdminLogoutView(APIView): """ 管理员登出接口 使管理员的认证令牌失效。 """ authentication_classes = [RedisTokenAuthentication] permission_classes = [IsAuthenticated] tags = ['管理员认证'] @swagger_auto_schema( responses={ 200: openapi.Response('登出成功', get_standardized_response_schema()), 401: openapi.Response('认证失败', get_standardized_response_schema()), 403: openapi.Response('权限不足', get_standardized_response_schema()) }, operation_description="使当前管理员的认证令牌失效,需要在请求头中提供有效的管理员token", security=[{'Bearer': []}] ) def post(self, request): """ 管理员登出 使当前管理员的认证令牌失效。 """ user = request.user # 验证用户是否是管理员 if not user.is_staff: logger.warning(f"Non-admin user attempted to use admin logout endpoint: {user.id}") return error_response( message="Access denied. Admin privileges required.", code=403, status_code=status.HTTP_403_FORBIDDEN ) # 获取请求头中的token auth_header = request.headers.get('Authorization', '') if ' ' in auth_header: _, token = auth_header.split(' ', 1) # 删除Redis中的token cache.delete(f"admin_token:{token}") logger.info(f"Admin {user.id} logged out successfully") return success_response(message="管理员已成功登出") return error_response(message="Invalid authorization header")