From c0deacd79cc75dab634e9041bf6fc734ae68e231 Mon Sep 17 00:00:00 2001 From: zyc <1439655764@qq.com> Date: Thu, 29 Jan 2026 10:02:15 +0800 Subject: [PATCH] first commit --- .env.example | 24 + .gitignore | 32 + apps/__init__.py | 0 apps/admins/__init__.py | 1 + apps/admins/apps.py | 7 + apps/admins/authentication.py | 82 +++ apps/admins/migrations/0001_initial.py | 84 +++ apps/admins/migrations/__init__.py | 0 apps/admins/models.py | 77 +++ apps/admins/permissions.py | 46 ++ apps/admins/serializers.py | 53 ++ apps/admins/urls.py | 15 + apps/admins/views.py | 229 +++++++ apps/devices/__init__.py | 0 apps/devices/migrations/0001_initial.py | 182 ++++++ apps/devices/migrations/0002_initial.py | 111 ++++ apps/devices/migrations/__init__.py | 0 apps/devices/models.py | 131 ++++ apps/devices/serializers.py | 88 +++ apps/devices/urls.py | 15 + apps/devices/views.py | 179 ++++++ apps/inventory/__init__.py | 0 apps/inventory/models.py | 1 + apps/inventory/services.py | 224 +++++++ apps/inventory/urls.py | 16 + apps/inventory/views.py | 259 ++++++++ apps/spirits/__init__.py | 0 apps/spirits/migrations/0001_initial.py | 58 ++ apps/spirits/migrations/0002_initial.py | 28 + apps/spirits/migrations/__init__.py | 0 apps/spirits/models.py | 29 + apps/spirits/serializers.py | 41 ++ apps/spirits/urls.py | 13 + apps/spirits/views.py | 85 +++ apps/users/__init__.py | 0 apps/users/migrations/0001_initial.py | 95 +++ apps/users/migrations/__init__.py | 0 apps/users/models.py | 49 ++ apps/users/serializers.py | 44 ++ apps/users/urls.py | 14 + apps/users/views.py | 120 ++++ config/__init__.py | 0 config/asgi.py | 16 + config/settings.py | 196 ++++++ config/settings_test.py | 25 + config/urls.py | 43 ++ config/wsgi.py | 16 + manage.py | 22 + requirements.txt | 28 + tests.py | 787 ++++++++++++++++++++++++ utils/__init__.py | 0 utils/exceptions.py | 91 +++ utils/oss.py | 92 +++ utils/permissions.py | 27 + utils/response.py | 52 ++ 55 files changed, 3827 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 apps/__init__.py create mode 100644 apps/admins/__init__.py create mode 100644 apps/admins/apps.py create mode 100644 apps/admins/authentication.py create mode 100644 apps/admins/migrations/0001_initial.py create mode 100644 apps/admins/migrations/__init__.py create mode 100644 apps/admins/models.py create mode 100644 apps/admins/permissions.py create mode 100644 apps/admins/serializers.py create mode 100644 apps/admins/urls.py create mode 100644 apps/admins/views.py create mode 100644 apps/devices/__init__.py create mode 100644 apps/devices/migrations/0001_initial.py create mode 100644 apps/devices/migrations/0002_initial.py create mode 100644 apps/devices/migrations/__init__.py create mode 100644 apps/devices/models.py create mode 100644 apps/devices/serializers.py create mode 100644 apps/devices/urls.py create mode 100644 apps/devices/views.py create mode 100644 apps/inventory/__init__.py create mode 100644 apps/inventory/models.py create mode 100644 apps/inventory/services.py create mode 100644 apps/inventory/urls.py create mode 100644 apps/inventory/views.py create mode 100644 apps/spirits/__init__.py create mode 100644 apps/spirits/migrations/0001_initial.py create mode 100644 apps/spirits/migrations/0002_initial.py create mode 100644 apps/spirits/migrations/__init__.py create mode 100644 apps/spirits/models.py create mode 100644 apps/spirits/serializers.py create mode 100644 apps/spirits/urls.py create mode 100644 apps/spirits/views.py create mode 100644 apps/users/__init__.py create mode 100644 apps/users/migrations/0001_initial.py create mode 100644 apps/users/migrations/__init__.py create mode 100644 apps/users/models.py create mode 100644 apps/users/serializers.py create mode 100644 apps/users/urls.py create mode 100644 apps/users/views.py create mode 100644 config/__init__.py create mode 100644 config/asgi.py create mode 100644 config/settings.py create mode 100644 config/settings_test.py create mode 100644 config/urls.py create mode 100644 config/wsgi.py create mode 100755 manage.py create mode 100644 requirements.txt create mode 100644 tests.py create mode 100644 utils/__init__.py create mode 100644 utils/exceptions.py create mode 100644 utils/oss.py create mode 100644 utils/permissions.py create mode 100644 utils/response.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..c92fd4b --- /dev/null +++ b/.env.example @@ -0,0 +1,24 @@ +# Django Settings +DJANGO_SECRET_KEY=your-secret-key-here +DJANGO_DEBUG=True +DJANGO_ALLOWED_HOSTS=* + +# MySQL Database +DB_NAME=rtc_demo +DB_USER=root +DB_PASSWORD=your-password +DB_HOST=127.0.0.1 +DB_PORT=3306 + +# Redis +REDIS_URL=redis://127.0.0.1:6379/0 + +# Aliyun OSS +OSS_ACCESS_KEY_ID=your-access-key-id +OSS_ACCESS_KEY_SECRET=your-access-key-secret +OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com +OSS_BUCKET_NAME=your-bucket-name +OSS_CUSTOM_DOMAIN= + +# CORS (production only) +CORS_ALLOWED_ORIGINS=https://your-domain.com diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8ba1269 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# Python +__pycache__/ +*.py[cod] +*.so +*.egg-info/ +dist/ +build/ + +# Virtual environment +venv/ +.venv/ + +# Environment variables +.env + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Django +*.sqlite3 +media/ +staticfiles/ + +# Logs +*.log diff --git a/apps/__init__.py b/apps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/admins/__init__.py b/apps/admins/__init__.py new file mode 100644 index 0000000..b7840dd --- /dev/null +++ b/apps/admins/__init__.py @@ -0,0 +1 @@ +default_app_config = 'apps.admins.apps.AdminsConfig' diff --git a/apps/admins/apps.py b/apps/admins/apps.py new file mode 100644 index 0000000..3a9ea6f --- /dev/null +++ b/apps/admins/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class AdminsConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'apps.admins' + verbose_name = '管理员' diff --git a/apps/admins/authentication.py b/apps/admins/authentication.py new file mode 100644 index 0000000..a6336cd --- /dev/null +++ b/apps/admins/authentication.py @@ -0,0 +1,82 @@ +""" +管理员模块自定义JWT认证 +""" +from rest_framework_simplejwt.tokens import RefreshToken +from rest_framework_simplejwt.authentication import JWTAuthentication +from rest_framework_simplejwt.exceptions import InvalidToken, AuthenticationFailed +from django.conf import settings + + +def get_admin_tokens(admin_user): + """ + 为管理员生成JWT Token + 在token中添加 user_type='admin' 以区分App用户 + """ + refresh = RefreshToken.for_user(admin_user) + # 添加自定义声明 + refresh['user_type'] = 'admin' + refresh['username'] = admin_user.username + refresh['role'] = admin_user.role + + return { + 'access': str(refresh.access_token), + 'refresh': str(refresh), + } + + +class AdminJWTAuthentication(JWTAuthentication): + """ + 管理员专用JWT认证 + 验证token中的user_type必须为'admin' + """ + + def get_user(self, validated_token): + """ + 重写get_user方法,从AdminUser模型获取用户 + """ + from apps.admins.models import AdminUser + + # 验证user_type + user_type = validated_token.get('user_type') + if user_type != 'admin': + raise AuthenticationFailed('无效的管理员Token') + + try: + user_id = validated_token.get('user_id') + user = AdminUser.objects.get(id=user_id) + except AdminUser.DoesNotExist: + raise AuthenticationFailed('管理员用户不存在') + + if not user.is_active: + raise AuthenticationFailed('管理员账户已被禁用') + + return user + + +class AppJWTAuthentication(JWTAuthentication): + """ + App端专用JWT认证 + 验证token中的user_type必须为'app'或不存在(兼容旧token) + """ + + def get_user(self, validated_token): + """ + 重写get_user方法,从User模型获取用户 + """ + from apps.users.models import User + + # 验证user_type(兼容旧token,默认为app) + user_type = validated_token.get('user_type', 'app') + if user_type not in ['app', None]: + raise AuthenticationFailed('无效的用户Token') + + try: + user_id = validated_token.get('user_id') + user = User.objects.get(id=user_id) + except User.DoesNotExist: + raise AuthenticationFailed('用户不存在') + + if not user.is_active: + raise AuthenticationFailed('用户账户已被禁用') + + return user diff --git a/apps/admins/migrations/0001_initial.py b/apps/admins/migrations/0001_initial.py new file mode 100644 index 0000000..e6f04df --- /dev/null +++ b/apps/admins/migrations/0001_initial.py @@ -0,0 +1,84 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="AdminUser", + fields=[ + ("password", models.CharField(max_length=128, verbose_name="password")), + ( + "last_login", + models.DateTimeField( + blank=True, null=True, verbose_name="last login" + ), + ), + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "username", + models.CharField(max_length=50, unique=True, verbose_name="用户名"), + ), + ( + "name", + models.CharField( + blank=True, default="", max_length=50, verbose_name="姓名" + ), + ), + ( + "email", + models.EmailField( + blank=True, default="", max_length=254, verbose_name="邮箱" + ), + ), + ( + "phone", + models.CharField( + blank=True, default="", max_length=20, verbose_name="手机号" + ), + ), + ( + "role", + models.CharField( + choices=[ + ("super_admin", "超级管理员"), + ("admin", "管理员"), + ("operator", "操作员"), + ], + default="operator", + max_length=20, + verbose_name="角色", + ), + ), + ( + "is_active", + models.BooleanField(default=True, verbose_name="是否启用"), + ), + ( + "last_login_ip", + models.GenericIPAddressField( + blank=True, null=True, verbose_name="最后登录IP" + ), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ], + options={ + "verbose_name": "管理员", + "verbose_name_plural": "管理员", + "db_table": "admin_user", + }, + ), + ] diff --git a/apps/admins/migrations/__init__.py b/apps/admins/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/admins/models.py b/apps/admins/models.py new file mode 100644 index 0000000..486b0ff --- /dev/null +++ b/apps/admins/models.py @@ -0,0 +1,77 @@ +""" +管理员模块模型 - 独立于App用户的账户体系 +""" +from django.contrib.auth.models import AbstractBaseUser, BaseUserManager +from django.db import models + + +class AdminUserManager(BaseUserManager): + """管理员用户管理器""" + + def create_user(self, username, password=None, **extra_fields): + if not username: + raise ValueError('用户名不能为空') + user = self.model(username=username, **extra_fields) + user.set_password(password) + user.save(using=self._db) + return user + + def create_superuser(self, username, password=None, **extra_fields): + extra_fields.setdefault('role', 'super_admin') + return self.create_user(username, password, **extra_fields) + + +class AdminUser(AbstractBaseUser): + """ + 管理员用户模型 - Web后台使用 + 与App端User模型完全独立 + 不使用PermissionsMixin以避免与User模型冲突 + """ + + ROLE_CHOICES = [ + ('super_admin', '超级管理员'), + ('admin', '管理员'), + ('operator', '操作员'), + ] + + id = models.BigAutoField(primary_key=True) + username = models.CharField('用户名', max_length=50, unique=True) + name = models.CharField('姓名', max_length=50, blank=True, default='') + email = models.EmailField('邮箱', blank=True, default='') + phone = models.CharField('手机号', max_length=20, blank=True, default='') + role = models.CharField('角色', max_length=20, choices=ROLE_CHOICES, default='operator') + is_active = models.BooleanField('是否启用', default=True) + last_login_ip = models.GenericIPAddressField('最后登录IP', null=True, blank=True) + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + objects = AdminUserManager() + + USERNAME_FIELD = 'username' + REQUIRED_FIELDS = [] + + class Meta: + db_table = 'admin_user' + verbose_name = '管理员' + verbose_name_plural = '管理员' + + @property + def is_staff(self): + """Django admin兼容""" + return self.role in ['super_admin', 'admin'] + + @property + def is_superuser(self): + """超级管理员检查""" + return self.role == 'super_admin' + + def has_perm(self, perm, obj=None): + """权限检查""" + return self.role in ['super_admin', 'admin'] + + def has_module_perms(self, app_label): + """模块权限检查""" + return self.role in ['super_admin', 'admin'] + + def __str__(self): + return f"{self.username} ({self.get_role_display()})" diff --git a/apps/admins/permissions.py b/apps/admins/permissions.py new file mode 100644 index 0000000..8438e96 --- /dev/null +++ b/apps/admins/permissions.py @@ -0,0 +1,46 @@ +""" +管理员模块权限 +""" +from rest_framework.permissions import BasePermission + + +class IsAdminUser(BasePermission): + """ + 验证是否为管理员用户(来自AdminUser模型) + """ + def has_permission(self, request, view): + from apps.admins.models import AdminUser + return ( + request.user and + request.user.is_authenticated and + isinstance(request.user, AdminUser) and + request.user.is_active + ) + + +class IsSuperAdmin(BasePermission): + """ + 验证是否为超级管理员 + """ + def has_permission(self, request, view): + from apps.admins.models import AdminUser + return ( + request.user and + request.user.is_authenticated and + isinstance(request.user, AdminUser) and + request.user.role == 'super_admin' + ) + + +class IsAdminOrOperator(BasePermission): + """ + 验证是否为管理员或操作员 + """ + def has_permission(self, request, view): + from apps.admins.models import AdminUser + return ( + request.user and + request.user.is_authenticated and + isinstance(request.user, AdminUser) and + request.user.role in ['super_admin', 'admin', 'operator'] + ) diff --git a/apps/admins/serializers.py b/apps/admins/serializers.py new file mode 100644 index 0000000..7a1b1f8 --- /dev/null +++ b/apps/admins/serializers.py @@ -0,0 +1,53 @@ +""" +管理员模块序列化器 +""" +from rest_framework import serializers +from .models import AdminUser + + +class AdminUserSerializer(serializers.ModelSerializer): + """管理员用户序列化器""" + + role_display = serializers.CharField(source='get_role_display', read_only=True) + + class Meta: + model = AdminUser + fields = ['id', 'username', 'name', 'email', 'phone', 'role', 'role_display', + 'is_active', 'last_login_ip', 'created_at'] + read_only_fields = ['id', 'last_login_ip', 'created_at'] + + +class AdminLoginSerializer(serializers.Serializer): + """管理员登录序列化器""" + + username = serializers.CharField(max_length=50, help_text='用户名') + password = serializers.CharField(max_length=128, write_only=True, help_text='密码') + + +class AdminChangePasswordSerializer(serializers.Serializer): + """管理员修改密码序列化器""" + + old_password = serializers.CharField(max_length=128, write_only=True) + new_password = serializers.CharField(max_length=128, write_only=True) + + def validate_new_password(self, value): + if len(value) < 6: + raise serializers.ValidationError('密码长度不能少于6位') + return value + + +class CreateAdminUserSerializer(serializers.ModelSerializer): + """创建管理员序列化器""" + + password = serializers.CharField(max_length=128, write_only=True) + + class Meta: + model = AdminUser + fields = ['username', 'password', 'name', 'email', 'phone', 'role'] + + def create(self, validated_data): + password = validated_data.pop('password') + admin = AdminUser(**validated_data) + admin.set_password(password) + admin.save() + return admin diff --git a/apps/admins/urls.py b/apps/admins/urls.py new file mode 100644 index 0000000..29dd669 --- /dev/null +++ b/apps/admins/urls.py @@ -0,0 +1,15 @@ +""" +管理员模块URL配置 +""" +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from .views import AdminAuthViewSet, AdminProfileViewSet, AdminUserManageViewSet + +router = DefaultRouter() +router.register('auth', AdminAuthViewSet, basename='admin-auth') +router.register('profile', AdminProfileViewSet, basename='admin-profile') +router.register('admins', AdminUserManageViewSet, basename='admin-users') + +urlpatterns = [ + path('', include(router.urls)), +] diff --git a/apps/admins/views.py b/apps/admins/views.py new file mode 100644 index 0000000..66c491d --- /dev/null +++ b/apps/admins/views.py @@ -0,0 +1,229 @@ +""" +管理员模块视图 +""" +from rest_framework import viewsets, status +from rest_framework.decorators import action +from rest_framework.permissions import AllowAny +from rest_framework_simplejwt.tokens import RefreshToken + +from utils.response import success, error +from utils.exceptions import ErrorCode +from .models import AdminUser +from .serializers import ( + AdminUserSerializer, + AdminLoginSerializer, + AdminChangePasswordSerializer, + CreateAdminUserSerializer +) +from .authentication import get_admin_tokens, AdminJWTAuthentication +from .permissions import IsAdminUser, IsSuperAdmin + + +class AdminAuthViewSet(viewsets.ViewSet): + """管理员认证视图集""" + + permission_classes = [AllowAny] + + @action(detail=False, methods=['post']) + def login(self, request): + """ + 管理员登录 + POST /api/admin/auth/login + """ + serializer = AdminLoginSerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + username = serializer.validated_data['username'] + password = serializer.validated_data['password'] + + try: + admin = AdminUser.objects.get(username=username) + except AdminUser.DoesNotExist: + return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误') + + if not admin.check_password(password): + return error(code=ErrorCode.USER_NOT_FOUND, message='用户名或密码错误') + + if not admin.is_active: + return error(code=ErrorCode.USER_DISABLED, message='账户已被禁用') + + # 记录登录IP + x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') + if x_forwarded_for: + ip = x_forwarded_for.split(',')[0].strip() + else: + ip = request.META.get('REMOTE_ADDR') + admin.last_login_ip = ip + admin.save(update_fields=['last_login_ip']) + + # 生成Token + tokens = get_admin_tokens(admin) + + return success(data={ + 'token': tokens, + 'admin': AdminUserSerializer(admin).data + }, message='登录成功') + + @action(detail=False, methods=['post']) + def refresh(self, request): + """ + 刷新Token + POST /api/admin/auth/refresh + """ + refresh_token = request.data.get('refresh') + if not refresh_token: + return error(message='refresh token不能为空') + + try: + refresh = RefreshToken(refresh_token) + # 验证是否为admin token + if refresh.get('user_type') != 'admin': + return error(code=ErrorCode.TOKEN_EXPIRED, message='无效的管理员Token') + + return success(data={ + 'access': str(refresh.access_token), + 'refresh': str(refresh) + }) + except Exception as e: + return error(code=ErrorCode.TOKEN_EXPIRED, message='Token已过期或无效') + + +class AdminProfileViewSet(viewsets.ViewSet): + """管理员个人信息视图集""" + + authentication_classes = [AdminJWTAuthentication] + permission_classes = [IsAdminUser] + + @action(detail=False, methods=['get'], url_path='me') + def me(self, request): + """ + 获取当前管理员信息 + GET /api/admin/profile/me + """ + serializer = AdminUserSerializer(request.user) + return success(data=serializer.data) + + @action(detail=False, methods=['put'], url_path='change-password') + def change_password(self, request): + """ + 修改密码 + PUT /api/admin/profile/change-password + """ + serializer = AdminChangePasswordSerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + if not request.user.check_password(serializer.validated_data['old_password']): + return error(message='原密码错误') + + request.user.set_password(serializer.validated_data['new_password']) + request.user.save() + + return success(message='密码修改成功') + + +class AdminUserManageViewSet(viewsets.ModelViewSet): + """管理员用户管理视图集(仅超级管理员可用)""" + + queryset = AdminUser.objects.all().order_by('-created_at') + authentication_classes = [AdminJWTAuthentication] + permission_classes = [IsSuperAdmin] + + def get_serializer_class(self): + if self.action == 'create': + return CreateAdminUserSerializer + return AdminUserSerializer + + def list(self, request, *args, **kwargs): + """ + 管理员列表 + GET /api/admin/admins + """ + queryset = self.filter_queryset(self.get_queryset()) + + # 搜索 + username = request.query_params.get('username') + role = request.query_params.get('role') + + if username: + queryset = queryset.filter(username__contains=username) + if role: + queryset = queryset.filter(role=role) + + page = self.paginate_queryset(queryset) + if page is not None: + serializer = AdminUserSerializer(page, many=True) + return success(data={ + 'total': self.paginator.page.paginator.count, + 'items': serializer.data + }) + + serializer = AdminUserSerializer(queryset, many=True) + return success(data={'items': serializer.data}) + + def create(self, request, *args, **kwargs): + """ + 创建管理员 + POST /api/admin/admins + """ + serializer = CreateAdminUserSerializer(data=request.data) + if serializer.is_valid(): + admin = serializer.save() + return success(data=AdminUserSerializer(admin).data, message='创建成功') + return error(message=str(serializer.errors)) + + def retrieve(self, request, *args, **kwargs): + """ + 管理员详情 + GET /api/admin/admins/{id} + """ + instance = self.get_object() + serializer = AdminUserSerializer(instance) + return success(data=serializer.data) + + def update(self, request, *args, **kwargs): + """ + 更新管理员 + PUT /api/admin/admins/{id} + """ + instance = self.get_object() + serializer = AdminUserSerializer(instance, data=request.data, partial=True) + if serializer.is_valid(): + serializer.save() + return success(data=serializer.data, message='更新成功') + return error(message=str(serializer.errors)) + + @action(detail=True, methods=['post'], url_path='toggle-status') + def toggle_status(self, request, pk=None): + """ + 启用/禁用管理员 + POST /api/admin/admins/{id}/toggle-status + """ + admin = self.get_object() + + # 不能禁用自己 + if admin.id == request.user.id: + return error(message='不能禁用自己的账户') + + admin.is_active = not admin.is_active + admin.save() + + return success( + data=AdminUserSerializer(admin).data, + message='已启用' if admin.is_active else '已禁用' + ) + + @action(detail=True, methods=['post'], url_path='reset-password') + def reset_password(self, request, pk=None): + """ + 重置管理员密码 + POST /api/admin/admins/{id}/reset-password + """ + admin = self.get_object() + new_password = request.data.get('new_password', '123456') + + admin.set_password(new_password) + admin.save() + + return success(message=f'密码已重置为: {new_password}') diff --git a/apps/devices/__init__.py b/apps/devices/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devices/migrations/0001_initial.py b/apps/devices/migrations/0001_initial.py new file mode 100644 index 0000000..753cb00 --- /dev/null +++ b/apps/devices/migrations/0001_initial.py @@ -0,0 +1,182 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Device", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "sn", + models.CharField(max_length=50, unique=True, verbose_name="SN码"), + ), + ( + "mac_address", + models.CharField( + blank=True, + max_length=20, + null=True, + unique=True, + verbose_name="MAC地址", + ), + ), + ( + "name", + models.CharField( + blank=True, + default="", + max_length=100, + verbose_name="自定义名称", + ), + ), + ( + "status", + models.CharField( + choices=[ + ("in_stock", "库存中"), + ("out_stock", "已出库"), + ("bound", "已绑定"), + ], + default="in_stock", + max_length=20, + verbose_name="状态", + ), + ), + ( + "firmware_version", + models.CharField( + blank=True, default="", max_length=20, verbose_name="固件版本" + ), + ), + ( + "last_online_at", + models.DateTimeField( + blank=True, null=True, verbose_name="最后在线时间" + ), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ], + options={ + "verbose_name": "设备", + "verbose_name_plural": "设备", + "db_table": "device", + }, + ), + migrations.CreateModel( + name="DeviceBatch", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ("batch_no", models.CharField(max_length=50, verbose_name="批次号")), + ("production_date", models.DateField(verbose_name="生产日期")), + ( + "production_week", + models.CharField(blank=True, max_length=10, verbose_name="生产周"), + ), + ("quantity", models.IntegerField(verbose_name="数量")), + ( + "remark", + models.TextField(blank=True, default="", verbose_name="备注"), + ), + ( + "status", + models.CharField( + choices=[("pending", "待生成"), ("generated", "已生成")], + default="pending", + max_length=20, + verbose_name="状态", + ), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ], + options={ + "verbose_name": "设备批次", + "verbose_name_plural": "设备批次", + "db_table": "device_batch", + }, + ), + migrations.CreateModel( + name="DeviceType", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ("brand", models.CharField(max_length=50, verbose_name="品牌")), + ( + "product_code", + models.CharField( + max_length=50, unique=True, verbose_name="产品代号" + ), + ), + ("name", models.CharField(max_length=100, verbose_name="名称")), + ( + "is_network_required", + models.BooleanField(default=True, verbose_name="是否需要联网"), + ), + ( + "is_active", + models.BooleanField(default=True, verbose_name="是否启用"), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ], + options={ + "verbose_name": "设备类型", + "verbose_name_plural": "设备类型", + "db_table": "device_type", + }, + ), + migrations.CreateModel( + name="UserDevice", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "bind_type", + models.CharField( + choices=[("owner", "所有者"), ("share", "分享")], + default="owner", + max_length=20, + verbose_name="绑定类型", + ), + ), + ( + "bind_time", + models.DateTimeField(auto_now_add=True, verbose_name="绑定时间"), + ), + ( + "is_active", + models.BooleanField(default=True, verbose_name="是否有效"), + ), + ], + options={ + "verbose_name": "用户设备绑定", + "verbose_name_plural": "用户设备绑定", + "db_table": "user_device", + }, + ), + ] diff --git a/apps/devices/migrations/0002_initial.py b/apps/devices/migrations/0002_initial.py new file mode 100644 index 0000000..e06a774 --- /dev/null +++ b/apps/devices/migrations/0002_initial.py @@ -0,0 +1,111 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:03 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("devices", "0001_initial"), + ("spirits", "0001_initial"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AddField( + model_name="devicebatch", + name="created_by", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to=settings.AUTH_USER_MODEL, + verbose_name="创建人", + ), + ), + migrations.AddField( + model_name="device", + name="batch", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="devices", + to="devices.devicebatch", + verbose_name="所属批次", + ), + ), + migrations.AddField( + model_name="devicebatch", + name="device_type", + field=models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="batches", + to="devices.devicetype", + verbose_name="设备类型", + ), + ), + migrations.AddField( + model_name="device", + name="device_type", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name="devices", + to="devices.devicetype", + verbose_name="设备类型", + ), + ), + migrations.AddField( + model_name="userdevice", + name="device", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="user_devices", + to="devices.device", + verbose_name="设备", + ), + ), + migrations.AddField( + model_name="userdevice", + name="spirit", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="user_devices", + to="spirits.spirit", + verbose_name="绑定的智能体", + ), + ), + migrations.AddField( + model_name="userdevice", + name="user", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="user_devices", + to=settings.AUTH_USER_MODEL, + verbose_name="用户", + ), + ), + migrations.AlterUniqueTogether( + name="devicebatch", + unique_together={("device_type", "batch_no", "production_date")}, + ), + migrations.AddIndex( + model_name="device", + index=models.Index( + fields=["mac_address"], name="device_mac_add_09f85c_idx" + ), + ), + migrations.AddIndex( + model_name="device", + index=models.Index(fields=["status"], name="device_status_6321a6_idx"), + ), + migrations.AlterUniqueTogether( + name="userdevice", + unique_together={("user", "device")}, + ), + ] diff --git a/apps/devices/migrations/__init__.py b/apps/devices/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/devices/models.py b/apps/devices/models.py new file mode 100644 index 0000000..0ff6a02 --- /dev/null +++ b/apps/devices/models.py @@ -0,0 +1,131 @@ +""" +设备模块模型 +""" +from django.db import models +from apps.users.models import User +from apps.spirits.models import Spirit + + +class DeviceType(models.Model): + """设备类型表""" + + id = models.BigAutoField(primary_key=True) + brand = models.CharField('品牌', max_length=50) + product_code = models.CharField('产品代号', max_length=50, unique=True) + name = models.CharField('名称', max_length=100) + is_network_required = models.BooleanField('是否需要联网', default=True) + is_active = models.BooleanField('是否启用', default=True) + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + class Meta: + db_table = 'device_type' + verbose_name = '设备类型' + verbose_name_plural = '设备类型' + + def save(self, *args, **kwargs): + # 根据产品代号后缀自动判断是否需要联网 + if self.product_code: + self.is_network_required = not self.product_code.endswith('-OFF') + super().save(*args, **kwargs) + + def __str__(self): + return f"{self.brand} - {self.name}" + + +class DeviceBatch(models.Model): + """设备批次表""" + + STATUS_CHOICES = [ + ('pending', '待生成'), + ('generated', '已生成'), + ] + + id = models.BigAutoField(primary_key=True) + device_type = models.ForeignKey(DeviceType, on_delete=models.PROTECT, related_name='batches', verbose_name='设备类型') + batch_no = models.CharField('批次号', max_length=50) + production_date = models.DateField('生产日期') + production_week = models.CharField('生产周', max_length=10, blank=True) # 格式: 25W45 + quantity = models.IntegerField('数量') + remark = models.TextField('备注', blank=True, default='') + status = models.CharField('状态', max_length=20, choices=STATUS_CHOICES, default='pending') + created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, verbose_name='创建人') + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + class Meta: + db_table = 'device_batch' + verbose_name = '设备批次' + verbose_name_plural = '设备批次' + unique_together = ['device_type', 'batch_no', 'production_date'] + + def save(self, *args, **kwargs): + # 计算生产周 + if self.production_date and not self.production_week: + year = self.production_date.year + week = self.production_date.isocalendar()[1] + self.production_week = f"{year % 100}W{week:02d}" + super().save(*args, **kwargs) + + def __str__(self): + return f"{self.device_type.brand}-{self.batch_no}" + + +class Device(models.Model): + """设备表""" + + STATUS_CHOICES = [ + ('in_stock', '库存中'), + ('out_stock', '已出库'), + ('bound', '已绑定'), + ] + + id = models.BigAutoField(primary_key=True) + sn = models.CharField('SN码', max_length=50, unique=True) + device_type = models.ForeignKey(DeviceType, on_delete=models.PROTECT, null=True, related_name='devices', verbose_name='设备类型') + batch = models.ForeignKey(DeviceBatch, on_delete=models.SET_NULL, null=True, related_name='devices', verbose_name='所属批次') + mac_address = models.CharField('MAC地址', max_length=20, blank=True, null=True, unique=True) + name = models.CharField('自定义名称', max_length=100, blank=True, default='') + status = models.CharField('状态', max_length=20, choices=STATUS_CHOICES, default='in_stock') + firmware_version = models.CharField('固件版本', max_length=20, blank=True, default='') + last_online_at = models.DateTimeField('最后在线时间', null=True, blank=True) + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + class Meta: + db_table = 'device' + verbose_name = '设备' + verbose_name_plural = '设备' + indexes = [ + models.Index(fields=['mac_address']), + models.Index(fields=['status']), + ] + + def __str__(self): + return self.sn + + +class UserDevice(models.Model): + """用户设备绑定表""" + + BIND_TYPE_CHOICES = [ + ('owner', '所有者'), + ('share', '分享'), + ] + + id = models.BigAutoField(primary_key=True) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='user_devices', verbose_name='用户') + device = models.ForeignKey(Device, on_delete=models.CASCADE, related_name='user_devices', verbose_name='设备') + spirit = models.ForeignKey(Spirit, on_delete=models.SET_NULL, null=True, blank=True, related_name='user_devices', verbose_name='绑定的智能体') + bind_type = models.CharField('绑定类型', max_length=20, choices=BIND_TYPE_CHOICES, default='owner') + bind_time = models.DateTimeField('绑定时间', auto_now_add=True) + is_active = models.BooleanField('是否有效', default=True) + + class Meta: + db_table = 'user_device' + verbose_name = '用户设备绑定' + verbose_name_plural = '用户设备绑定' + unique_together = ['user', 'device'] + + def __str__(self): + return f"{self.user.phone} - {self.device.sn}" diff --git a/apps/devices/serializers.py b/apps/devices/serializers.py new file mode 100644 index 0000000..f5629ba --- /dev/null +++ b/apps/devices/serializers.py @@ -0,0 +1,88 @@ +""" +设备模块序列化器 +""" +from rest_framework import serializers +from .models import DeviceType, DeviceBatch, Device, UserDevice + + +class DeviceTypeSerializer(serializers.ModelSerializer): + """设备类型序列化器""" + + class Meta: + model = DeviceType + fields = ['id', 'brand', 'product_code', 'name', 'is_network_required', 'is_active', 'created_at'] + read_only_fields = ['id', 'is_network_required', 'created_at'] + + +class DeviceBatchSerializer(serializers.ModelSerializer): + """设备批次序列化器""" + + device_type_info = DeviceTypeSerializer(source='device_type', read_only=True) + + class Meta: + model = DeviceBatch + fields = ['id', 'device_type', 'device_type_info', 'batch_no', 'production_date', + 'production_week', 'quantity', 'remark', 'status', 'created_at'] + read_only_fields = ['id', 'production_week', 'status', 'created_at'] + + +class DeviceBatchCreateSerializer(serializers.ModelSerializer): + """创建设备批次序列化器""" + + class Meta: + model = DeviceBatch + fields = ['device_type', 'batch_no', 'production_date', 'quantity', 'remark'] + + +class DeviceSerializer(serializers.ModelSerializer): + """设备序列化器""" + + device_type_info = DeviceTypeSerializer(source='device_type', read_only=True) + + class Meta: + model = Device + fields = ['id', 'sn', 'device_type', 'device_type_info', 'mac_address', + 'name', 'status', 'firmware_version', 'last_online_at', 'created_at'] + + +class DeviceSimpleSerializer(serializers.ModelSerializer): + """设备简单序列化器""" + + class Meta: + model = Device + fields = ['id', 'sn', 'mac_address', 'status', 'created_at'] + + +class UserDeviceSerializer(serializers.ModelSerializer): + """用户设备绑定序列化器""" + + device = DeviceSerializer(read_only=True) + spirit_name = serializers.CharField(source='spirit.name', read_only=True, allow_null=True) + + class Meta: + model = UserDevice + fields = ['id', 'device', 'spirit', 'spirit_name', 'bind_type', 'bind_time', 'is_active'] + + +class BindDeviceSerializer(serializers.Serializer): + """绑定设备序列化器""" + + sn = serializers.CharField(max_length=50, help_text='设备SN码') + spirit_id = serializers.IntegerField(required=False, allow_null=True, help_text='智能体ID') + + +class DeviceVerifySerializer(serializers.Serializer): + """验证设备SN序列化器""" + + sn = serializers.CharField(max_length=50, help_text='设备SN码') + + +class QueryByMacSerializer(serializers.Serializer): + """通过MAC查询SN序列化器""" + + mac = serializers.CharField(max_length=20, help_text='MAC地址') + + def validate_mac(self, value): + # 统一MAC地址格式 + value = value.upper().replace('-', ':') + return value diff --git a/apps/devices/urls.py b/apps/devices/urls.py new file mode 100644 index 0000000..a73a882 --- /dev/null +++ b/apps/devices/urls.py @@ -0,0 +1,15 @@ +""" +设备模块URL配置 +""" +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from .views import DeviceViewSet + +router = DefaultRouter() +router.register('devices', DeviceViewSet, basename='devices') + +urlpatterns = [ + path('', include(router.urls)), +] + +admin_urlpatterns = [] diff --git a/apps/devices/views.py b/apps/devices/views.py new file mode 100644 index 0000000..4b56375 --- /dev/null +++ b/apps/devices/views.py @@ -0,0 +1,179 @@ +""" +设备模块视图 - App端 +""" +from rest_framework import viewsets, status +from rest_framework.decorators import action +from rest_framework.permissions import IsAuthenticated, AllowAny + +from utils.response import success, error +from utils.exceptions import ErrorCode +from apps.admins.authentication import AppJWTAuthentication +from .models import Device, UserDevice, DeviceType +from .serializers import ( + DeviceSerializer, + UserDeviceSerializer, + BindDeviceSerializer, + DeviceVerifySerializer, + DeviceTypeSerializer +) + + +class DeviceViewSet(viewsets.ViewSet): + """设备视图集(App端)""" + + authentication_classes = [AppJWTAuthentication] + permission_classes = [IsAuthenticated] + + @action(detail=False, methods=['get'], url_path='query-by-mac', + authentication_classes=[], permission_classes=[AllowAny]) + def query_by_mac(self, request): + """ + 通过MAC地址查询SN码(无需登录) + GET /api/v1/devices/query-by-mac?mac=AA:BB:CC:DD:EE:FF + """ + mac = request.query_params.get('mac', '') + if not mac: + return error(message='MAC地址不能为空') + + # 统一格式 + mac = mac.upper().replace('-', ':') + + try: + device = Device.objects.select_related('device_type').get(mac_address=mac) + return success(data={ + 'sn': device.sn, + 'mac_address': device.mac_address, + 'device_type': DeviceTypeSerializer(device.device_type).data if device.device_type else None, + 'status': device.status, + 'is_bound': device.status == 'bound' + }) + except Device.DoesNotExist: + return error( + code=404, + message='未找到对应的设备,请检查MAC地址是否正确或设备是否已完成入库', + status_code=status.HTTP_404_NOT_FOUND + ) + + @action(detail=False, methods=['post']) + def verify(self, request): + """ + 验证设备SN + POST /api/v1/devices/verify + """ + serializer = DeviceVerifySerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + sn = serializer.validated_data['sn'] + + try: + device = Device.objects.select_related('device_type').get(sn=sn) + except Device.DoesNotExist: + return error(code=ErrorCode.DEVICE_NOT_FOUND, message='设备不存在') + + # 检查是否已被绑定 + is_bindable = device.status != 'bound' + + return success(data={ + 'sn': device.sn, + 'is_bindable': is_bindable, + 'device_type': DeviceTypeSerializer(device.device_type).data if device.device_type else None + }) + + @action(detail=False, methods=['post']) + def bind(self, request): + """ + 绑定设备 + POST /api/v1/devices/bind + """ + serializer = BindDeviceSerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + sn = serializer.validated_data['sn'] + spirit_id = serializer.validated_data.get('spirit_id') + + try: + device = Device.objects.get(sn=sn) + except Device.DoesNotExist: + return error(code=ErrorCode.DEVICE_NOT_FOUND, message='设备不存在') + + # 检查是否已被绑定 + if device.status == 'bound': + # 检查是否是当前用户绑定的 + existing = UserDevice.objects.filter(device=device, is_active=True).first() + if existing and existing.user != request.user: + return error(code=ErrorCode.DEVICE_ALREADY_BOUND, message='设备已被其他用户绑定') + + # 创建绑定关系 + user_device, created = UserDevice.objects.update_or_create( + user=request.user, + device=device, + defaults={ + 'spirit_id': spirit_id, + 'is_active': True + } + ) + + # 更新设备状态 + device.status = 'bound' + device.save() + + return success( + data=UserDeviceSerializer(user_device).data, + message='绑定成功' if created else '更新绑定成功' + ) + + @action(detail=False, methods=['get']) + def my_devices(self, request): + """ + 我的设备列表 + GET /api/v1/devices/my_devices + """ + user_devices = UserDevice.objects.filter( + user=request.user, + is_active=True + ).select_related('device', 'device__device_type', 'spirit') + + serializer = UserDeviceSerializer(user_devices, many=True) + return success(data=serializer.data) + + @action(detail=True, methods=['delete']) + def unbind(self, request, pk=None): + """ + 解绑设备 + DELETE /api/v1/devices/{id}/unbind + """ + try: + user_device = UserDevice.objects.get(id=pk, user=request.user) + except UserDevice.DoesNotExist: + return error(code=ErrorCode.DEVICE_NOT_FOUND, message='绑定记录不存在') + + # 更新绑定状态 + user_device.is_active = False + user_device.save() + + # 检查设备是否还有其他活跃绑定 + active_bindings = UserDevice.objects.filter(device=user_device.device, is_active=True).count() + if active_bindings == 0: + user_device.device.status = 'out_stock' + user_device.device.save() + + return success(message='解绑成功') + + @action(detail=True, methods=['put'], url_path='update-spirit') + def update_spirit(self, request, pk=None): + """ + 更新设备绑定的智能体 + PUT /api/v1/devices/{id}/update-spirit + """ + try: + user_device = UserDevice.objects.get(id=pk, user=request.user, is_active=True) + except UserDevice.DoesNotExist: + return error(code=ErrorCode.DEVICE_NOT_FOUND, message='绑定记录不存在') + + spirit_id = request.data.get('spirit_id') + user_device.spirit_id = spirit_id + user_device.save() + + return success(data=UserDeviceSerializer(user_device).data, message='更新成功') diff --git a/apps/inventory/__init__.py b/apps/inventory/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/inventory/models.py b/apps/inventory/models.py new file mode 100644 index 0000000..c60a93c --- /dev/null +++ b/apps/inventory/models.py @@ -0,0 +1 @@ +# 出入库模块使用devices模块的模型,无需额外模型 diff --git a/apps/inventory/services.py b/apps/inventory/services.py new file mode 100644 index 0000000..63a01d4 --- /dev/null +++ b/apps/inventory/services.py @@ -0,0 +1,224 @@ +""" +出入库模块服务层 - SN码生成、Excel导入导出 +""" +import io +from datetime import datetime +from django.db import transaction +from openpyxl import Workbook, load_workbook +from openpyxl.drawing.image import Image as XLImage +import qrcode +from io import BytesIO + +from apps.devices.models import DeviceType, DeviceBatch, Device + + +def generate_production_week(date): + """ + 生成生产周格式: YYWXX + :param date: 日期对象 + :return: 如 25W45 + """ + year = date.year % 100 + week = date.isocalendar()[1] + return f"{year}W{week:02d}" + + +def generate_sn_code(brand, product_code, production_week, batch_no, sequence): + """ + 生成SN码 + 格式: {品牌}-{产品代号}-{生产周}-{批次号}-{5位序列号} + 示例: AL-DZBJ-ON-25W45-A01-00001 + """ + return f"{brand}-{product_code}-{production_week}-{batch_no}-{sequence:05d}" + + +def generate_batch_devices(batch): + """ + 为批次生成设备和SN码 + :param batch: DeviceBatch对象 + :return: 生成的Device列表 + """ + device_type = batch.device_type + brand = device_type.brand + product_code = device_type.product_code + production_week = batch.production_week or generate_production_week(batch.production_date) + batch_no = batch.batch_no + + # 更新生产周 + if not batch.production_week: + batch.production_week = production_week + batch.save(update_fields=['production_week']) + + devices = [] + for i in range(1, batch.quantity + 1): + sn = generate_sn_code(brand, product_code, production_week, batch_no, i) + device = Device( + sn=sn, + device_type=device_type, + batch=batch, + status='in_stock' + ) + devices.append(device) + + # 批量创建 + Device.objects.bulk_create(devices) + + # 更新批次状态 + batch.status = 'generated' + batch.save(update_fields=['status']) + + return devices + + +def generate_qrcode_image(data, size=100): + """ + 生成二维码图片 + :param data: 二维码内容 + :param size: 尺寸 + :return: BytesIO对象 + """ + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=4, + border=1, + ) + qr.add_data(data) + qr.make(fit=True) + + img = qr.make_image(fill_color="black", back_color="white") + img_io = BytesIO() + img.save(img_io, format='PNG') + img_io.seek(0) + return img_io + + +def export_batch_excel(batch): + """ + 导出批次SN码Excel + 包含四列: SN码 / MAC地址 / 创建时间 / SN码二维码 + :param batch: DeviceBatch对象 + :return: BytesIO对象(Excel文件) + """ + wb = Workbook() + ws = wb.active + ws.title = f"批次{batch.batch_no}" + + # 设置表头 + headers = ['SN码', 'MAC地址', '创建时间', 'SN码二维码'] + for col, header in enumerate(headers, 1): + ws.cell(row=1, column=col, value=header) + + # 设置列宽 + ws.column_dimensions['A'].width = 35 + ws.column_dimensions['B'].width = 20 + ws.column_dimensions['C'].width = 22 + ws.column_dimensions['D'].width = 15 + + # 获取批次设备 + devices = Device.objects.filter(batch=batch).order_by('sn') + + for row, device in enumerate(devices, 2): + ws.cell(row=row, column=1, value=device.sn) + ws.cell(row=row, column=2, value=device.mac_address or '') + ws.cell(row=row, column=3, value=device.created_at.strftime('%Y-%m-%d %H:%M:%S')) + + # 生成二维码并嵌入 + try: + qr_img = generate_qrcode_image(device.sn) + img = XLImage(qr_img) + img.width = 60 + img.height = 60 + ws.add_image(img, f'D{row}') + ws.row_dimensions[row].height = 50 + except Exception: + ws.cell(row=row, column=4, value='[QR]') + + # 保存到内存 + output = BytesIO() + wb.save(output) + output.seek(0) + + return output + + +def import_mac_addresses(batch, file_obj): + """ + 导入MAC地址 + :param batch: DeviceBatch对象 + :param file_obj: Excel文件对象 + :return: 导入结果字典 + """ + result = { + 'total': 0, + 'success': 0, + 'failed': 0, + 'errors': [] + } + + try: + wb = load_workbook(file_obj) + ws = wb.active + except Exception as e: + result['errors'].append({'row': 0, 'error': f'文件格式错误: {str(e)}'}) + return result + + # 跳过表头 + rows = list(ws.iter_rows(min_row=2, values_only=True)) + result['total'] = len(rows) + + for row_num, row in enumerate(rows, 2): + if len(row) < 2: + continue + + sn = row[0] + mac = row[1] + + if not sn or not mac: + continue + + # 统一MAC地址格式 + mac = str(mac).upper().replace('-', ':').strip() + + # 验证MAC地址格式 + import re + if not re.match(r'^([0-9A-F]{2}:){5}[0-9A-F]{2}$', mac): + result['failed'] += 1 + result['errors'].append({ + 'row': row_num, + 'sn': sn, + 'mac': mac, + 'error': 'MAC地址格式不正确' + }) + continue + + try: + device = Device.objects.get(sn=sn, batch=batch) + except Device.DoesNotExist: + result['failed'] += 1 + result['errors'].append({ + 'row': row_num, + 'sn': sn, + 'mac': mac, + 'error': 'SN码不存在或不属于当前批次' + }) + continue + + # 检查MAC地址是否已被使用 + existing = Device.objects.filter(mac_address=mac).exclude(id=device.id).first() + if existing: + result['failed'] += 1 + result['errors'].append({ + 'row': row_num, + 'sn': sn, + 'mac': mac, + 'error': f'MAC地址已被设备 {existing.sn} 使用' + }) + continue + + # 更新MAC地址 + device.mac_address = mac + device.save(update_fields=['mac_address']) + result['success'] += 1 + + return result diff --git a/apps/inventory/urls.py b/apps/inventory/urls.py new file mode 100644 index 0000000..394f009 --- /dev/null +++ b/apps/inventory/urls.py @@ -0,0 +1,16 @@ +""" +出入库模块URL配置 +""" +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from .views import DeviceTypeViewSet, DeviceBatchViewSet + +admin_router = DefaultRouter() +admin_router.register('device-types', DeviceTypeViewSet, basename='admin-device-types') +admin_router.register('device-batches', DeviceBatchViewSet, basename='admin-device-batches') + +urlpatterns = [] + +admin_urlpatterns = [ + path('', include(admin_router.urls)), +] diff --git a/apps/inventory/views.py b/apps/inventory/views.py new file mode 100644 index 0000000..ea70b93 --- /dev/null +++ b/apps/inventory/views.py @@ -0,0 +1,259 @@ +""" +出入库模块视图 - 管理端 +使用AdminJWTAuthentication进行认证 +""" +from django.http import HttpResponse +from rest_framework import viewsets, status +from rest_framework.decorators import action +from rest_framework.parsers import MultiPartParser + +from utils.response import success, error +from utils.exceptions import ErrorCode +from apps.admins.authentication import AdminJWTAuthentication +from apps.admins.permissions import IsAdminUser +from apps.devices.models import DeviceType, DeviceBatch, Device +from apps.devices.serializers import ( + DeviceTypeSerializer, + DeviceBatchSerializer, + DeviceBatchCreateSerializer, + DeviceSimpleSerializer +) +from .services import generate_batch_devices, export_batch_excel, import_mac_addresses + + +class DeviceTypeViewSet(viewsets.ModelViewSet): + """设备类型管理视图集 - 管理端""" + + queryset = DeviceType.objects.all().order_by('-created_at') + serializer_class = DeviceTypeSerializer + authentication_classes = [AdminJWTAuthentication] + permission_classes = [IsAdminUser] + + def list(self, request, *args, **kwargs): + """ + 设备类型列表 + GET /api/admin/device-types + """ + queryset = self.filter_queryset(self.get_queryset()) + + # 过滤 + brand = request.query_params.get('brand') + is_active = request.query_params.get('is_active') + + if brand: + queryset = queryset.filter(brand__contains=brand) + if is_active is not None: + queryset = queryset.filter(is_active=is_active.lower() == 'true') + + page = self.paginate_queryset(queryset) + if page is not None: + serializer = self.get_serializer(page, many=True) + return success(data={ + 'total': self.paginator.page.paginator.count, + 'items': serializer.data + }) + + serializer = self.get_serializer(queryset, many=True) + return success(data={'items': serializer.data}) + + def create(self, request, *args, **kwargs): + """ + 创建设备类型 + POST /api/admin/device-types + """ + serializer = self.get_serializer(data=request.data) + if serializer.is_valid(): + serializer.save() + return success(data=serializer.data, message='创建成功') + return error(message=str(serializer.errors)) + + def retrieve(self, request, *args, **kwargs): + """ + 设备类型详情 + GET /api/admin/device-types/{id} + """ + instance = self.get_object() + serializer = self.get_serializer(instance) + return success(data=serializer.data) + + def update(self, request, *args, **kwargs): + """ + 更新设备类型 + PUT /api/admin/device-types/{id} + """ + instance = self.get_object() + serializer = self.get_serializer(instance, data=request.data, partial=True) + if serializer.is_valid(): + serializer.save() + return success(data=serializer.data, message='更新成功') + return error(message=str(serializer.errors)) + + +class DeviceBatchViewSet(viewsets.ModelViewSet): + """设备批次管理视图集 - 管理端""" + + queryset = DeviceBatch.objects.all().select_related('device_type').order_by('-created_at') + authentication_classes = [AdminJWTAuthentication] + permission_classes = [IsAdminUser] + + def get_serializer_class(self): + if self.action == 'create': + return DeviceBatchCreateSerializer + return DeviceBatchSerializer + + def list(self, request, *args, **kwargs): + """ + 批次列表 + GET /api/admin/device-batches + """ + queryset = self.filter_queryset(self.get_queryset()) + + # 过滤 + device_type_id = request.query_params.get('device_type_id') + brand = request.query_params.get('brand') + batch_no = request.query_params.get('batch_no') + status_filter = request.query_params.get('status') + + if device_type_id: + queryset = queryset.filter(device_type_id=device_type_id) + if brand: + queryset = queryset.filter(device_type__brand__contains=brand) + if batch_no: + queryset = queryset.filter(batch_no__contains=batch_no) + if status_filter: + queryset = queryset.filter(status=status_filter) + + page = self.paginate_queryset(queryset) + if page is not None: + serializer = DeviceBatchSerializer(page, many=True) + return success(data={ + 'total': self.paginator.page.paginator.count, + 'items': serializer.data + }) + + serializer = DeviceBatchSerializer(queryset, many=True) + return success(data={'items': serializer.data}) + + def create(self, request, *args, **kwargs): + """ + 创建批次并生成SN码 + POST /api/admin/device-batches + """ + serializer = DeviceBatchCreateSerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + # 验证设备类型 + device_type_id = serializer.validated_data['device_type'].id + try: + device_type = DeviceType.objects.get(id=device_type_id, is_active=True) + except DeviceType.DoesNotExist: + return error(code=ErrorCode.DEVICE_TYPE_NOT_FOUND, message='设备类型不存在或已禁用') + + # 创建批次(注意:这里created_by需要存AdminUser的引用,暂时设为None) + batch = serializer.save(created_by=None) + + # 生成SN码和设备 + try: + devices = generate_batch_devices(batch) + except Exception as e: + batch.delete() + return error(message=f'生成SN码失败: {str(e)}') + + # 返回结果 + batch.refresh_from_db() + return success(data={ + **DeviceBatchSerializer(batch).data, + 'sn_range': { + 'start': devices[0].sn if devices else '', + 'end': devices[-1].sn if devices else '' + } + }, message='批次创建成功') + + def retrieve(self, request, *args, **kwargs): + """ + 批次详情 + GET /api/admin/device-batches/{id} + """ + instance = self.get_object() + + # 获取设备统计 + devices = Device.objects.filter(batch=instance) + stats = { + 'total': devices.count(), + 'in_stock': devices.filter(status='in_stock').count(), + 'out_stock': devices.filter(status='out_stock').count(), + 'bound': devices.filter(status='bound').count(), + 'with_mac': devices.exclude(mac_address__isnull=True).exclude(mac_address='').count() + } + + return success(data={ + **DeviceBatchSerializer(instance).data, + 'statistics': stats + }) + + @action(detail=True, methods=['get']) + def devices(self, request, pk=None): + """ + 批次设备列表 + GET /api/admin/device-batches/{id}/devices + """ + batch = self.get_object() + devices = Device.objects.filter(batch=batch).order_by('sn') + + page = self.paginate_queryset(devices) + if page is not None: + serializer = DeviceSimpleSerializer(page, many=True) + return success(data={ + 'total': self.paginator.page.paginator.count, + 'items': serializer.data + }) + + serializer = DeviceSimpleSerializer(devices, many=True) + return success(data={'items': serializer.data}) + + @action(detail=True, methods=['get']) + def export(self, request, pk=None): + """ + 导出批次SN码Excel + GET /api/admin/device-batches/{id}/export + """ + batch = self.get_object() + + # 生成Excel + excel_file = export_batch_excel(batch) + + # 返回文件 + response = HttpResponse( + excel_file.getvalue(), + content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + ) + filename = f"batch_{batch.batch_no}_{batch.production_week}.xlsx" + response['Content-Disposition'] = f'attachment; filename="{filename}"' + + return response + + @action(detail=True, methods=['post'], parser_classes=[MultiPartParser]) + def import_mac(self, request, pk=None): + """ + 导入MAC地址 + POST /api/admin/device-batches/{id}/import_mac + """ + batch = self.get_object() + + file_obj = request.FILES.get('file') + if not file_obj: + return error(message='请上传Excel文件') + + # 验证文件类型 + if not file_obj.name.endswith(('.xlsx', '.xls')): + return error(message='仅支持Excel文件格式(.xlsx, .xls)') + + # 导入MAC地址 + result = import_mac_addresses(batch, file_obj) + + message = f"导入完成,成功{result['success']}条" + if result['failed'] > 0: + message += f",失败{result['failed']}条" + + return success(data=result, message=message) diff --git a/apps/spirits/__init__.py b/apps/spirits/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/spirits/migrations/0001_initial.py b/apps/spirits/migrations/0001_initial.py new file mode 100644 index 0000000..ccc9fa9 --- /dev/null +++ b/apps/spirits/migrations/0001_initial.py @@ -0,0 +1,58 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Spirit", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ("name", models.CharField(max_length=100, verbose_name="名称")), + ( + "avatar", + models.URLField( + blank=True, default="", max_length=500, verbose_name="头像" + ), + ), + ( + "prompt", + models.TextField(blank=True, default="", verbose_name="提示词"), + ), + ( + "memory", + models.TextField(blank=True, default="", verbose_name="记忆"), + ), + ( + "voice_id", + models.CharField( + blank=True, default="", max_length=100, verbose_name="音色ID" + ), + ), + ( + "is_active", + models.BooleanField(default=True, verbose_name="是否启用"), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ], + options={ + "verbose_name": "智能体", + "verbose_name_plural": "智能体", + "db_table": "spirit", + "ordering": ["-created_at"], + }, + ), + ] diff --git a/apps/spirits/migrations/0002_initial.py b/apps/spirits/migrations/0002_initial.py new file mode 100644 index 0000000..d2fec8f --- /dev/null +++ b/apps/spirits/migrations/0002_initial.py @@ -0,0 +1,28 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:03 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("spirits", "0001_initial"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AddField( + model_name="spirit", + name="user", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="spirits", + to=settings.AUTH_USER_MODEL, + verbose_name="所属用户", + ), + ), + ] diff --git a/apps/spirits/migrations/__init__.py b/apps/spirits/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/spirits/models.py b/apps/spirits/models.py new file mode 100644 index 0000000..ad2f4d2 --- /dev/null +++ b/apps/spirits/models.py @@ -0,0 +1,29 @@ +""" +智能体模块模型 +""" +from django.db import models +from apps.users.models import User + + +class Spirit(models.Model): + """智能体/心灵模型""" + + id = models.BigAutoField(primary_key=True) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='spirits', verbose_name='所属用户') + name = models.CharField('名称', max_length=100) + avatar = models.URLField('头像', max_length=500, blank=True, default='') + prompt = models.TextField('提示词', blank=True, default='') + memory = models.TextField('记忆', blank=True, default='') + voice_id = models.CharField('音色ID', max_length=100, blank=True, default='') + is_active = models.BooleanField('是否启用', default=True) + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + class Meta: + db_table = 'spirit' + verbose_name = '智能体' + verbose_name_plural = '智能体' + ordering = ['-created_at'] + + def __str__(self): + return f"{self.name} - {self.user.phone}" diff --git a/apps/spirits/serializers.py b/apps/spirits/serializers.py new file mode 100644 index 0000000..a308e25 --- /dev/null +++ b/apps/spirits/serializers.py @@ -0,0 +1,41 @@ +""" +智能体模块序列化器 +""" +from rest_framework import serializers +from .models import Spirit + + +class SpiritSerializer(serializers.ModelSerializer): + """智能体序列化器""" + + class Meta: + model = Spirit + fields = ['id', 'name', 'avatar', 'prompt', 'memory', 'voice_id', 'is_active', 'created_at', 'updated_at'] + read_only_fields = ['id', 'created_at', 'updated_at'] + + +class CreateSpiritSerializer(serializers.ModelSerializer): + """创建智能体序列化器""" + + class Meta: + model = Spirit + fields = ['name', 'avatar', 'prompt', 'memory', 'voice_id'] + + +class SpiritListSerializer(serializers.ModelSerializer): + """智能体列表序列化器(不含memory等大字段)""" + + class Meta: + model = Spirit + fields = ['id', 'name', 'avatar', 'is_active', 'created_at'] + + +class AdminSpiritSerializer(serializers.ModelSerializer): + """管理端智能体序列化器""" + + user_phone = serializers.CharField(source='user.phone', read_only=True) + + class Meta: + model = Spirit + fields = ['id', 'user', 'user_phone', 'name', 'avatar', 'prompt', 'memory', 'voice_id', 'is_active', 'created_at', 'updated_at'] + read_only_fields = ['id', 'created_at', 'updated_at'] diff --git a/apps/spirits/urls.py b/apps/spirits/urls.py new file mode 100644 index 0000000..d1cb7c4 --- /dev/null +++ b/apps/spirits/urls.py @@ -0,0 +1,13 @@ +""" +智能体模块URL配置 - 仅App端接口 +""" +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from .views import SpiritViewSet + +router = DefaultRouter() +router.register('spirits', SpiritViewSet, basename='spirits') + +urlpatterns = [ + path('', include(router.urls)), +] diff --git a/apps/spirits/views.py b/apps/spirits/views.py new file mode 100644 index 0000000..d1b6e1d --- /dev/null +++ b/apps/spirits/views.py @@ -0,0 +1,85 @@ +""" +智能体模块视图 - App端 +""" +from rest_framework import viewsets, status +from rest_framework.permissions import IsAuthenticated + +from utils.response import success, error +from utils.exceptions import ErrorCode +from apps.admins.authentication import AppJWTAuthentication +from .models import Spirit +from .serializers import ( + SpiritSerializer, + SpiritListSerializer, + CreateSpiritSerializer, +) + + +class SpiritViewSet(viewsets.ModelViewSet): + """智能体视图集(App端)""" + + authentication_classes = [AppJWTAuthentication] + permission_classes = [IsAuthenticated] + + def get_queryset(self): + return Spirit.objects.filter(user=self.request.user) + + def get_serializer_class(self): + if self.action == 'list': + return SpiritListSerializer + if self.action == 'create': + return CreateSpiritSerializer + return SpiritSerializer + + def list(self, request, *args, **kwargs): + """ + 获取我的智能体列表 + GET /api/v1/spirits + """ + queryset = self.get_queryset() + serializer = SpiritListSerializer(queryset, many=True) + return success(data=serializer.data) + + def create(self, request, *args, **kwargs): + """ + 创建智能体 + POST /api/v1/spirits + """ + serializer = CreateSpiritSerializer(data=request.data) + if serializer.is_valid(): + spirit = serializer.save(user=request.user) + return success(data=SpiritSerializer(spirit).data, message='创建成功') + return error(message=str(serializer.errors)) + + def retrieve(self, request, *args, **kwargs): + """ + 获取智能体详情 + GET /api/v1/spirits/{id} + """ + try: + instance = self.get_object() + serializer = SpiritSerializer(instance) + return success(data=serializer.data) + except Spirit.DoesNotExist: + return error(code=ErrorCode.SPIRIT_NOT_FOUND, message='智能体不存在') + + def update(self, request, *args, **kwargs): + """ + 更新智能体 + PUT /api/v1/spirits/{id} + """ + instance = self.get_object() + serializer = SpiritSerializer(instance, data=request.data, partial=True) + if serializer.is_valid(): + serializer.save() + return success(data=serializer.data, message='更新成功') + return error(message=str(serializer.errors)) + + def destroy(self, request, *args, **kwargs): + """ + 删除智能体 + DELETE /api/v1/spirits/{id} + """ + instance = self.get_object() + instance.delete() + return success(message='删除成功') diff --git a/apps/users/__init__.py b/apps/users/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/users/migrations/0001_initial.py b/apps/users/migrations/0001_initial.py new file mode 100644 index 0000000..4c07017 --- /dev/null +++ b/apps/users/migrations/0001_initial.py @@ -0,0 +1,95 @@ +# Generated by Django 6.0.1 on 2026-01-28 10:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ("auth", "0012_alter_user_first_name_max_length"), + ] + + operations = [ + migrations.CreateModel( + name="User", + fields=[ + ("password", models.CharField(max_length=128, verbose_name="password")), + ( + "last_login", + models.DateTimeField( + blank=True, null=True, verbose_name="last login" + ), + ), + ( + "is_superuser", + models.BooleanField( + default=False, + help_text="Designates that this user has all permissions without explicitly assigning them.", + verbose_name="superuser status", + ), + ), + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ( + "phone", + models.CharField(max_length=20, unique=True, verbose_name="手机号"), + ), + ( + "nickname", + models.CharField( + blank=True, default="", max_length=50, verbose_name="昵称" + ), + ), + ( + "avatar", + models.URLField( + blank=True, default="", max_length=500, verbose_name="头像" + ), + ), + ( + "is_active", + models.BooleanField(default=True, verbose_name="是否激活"), + ), + ( + "is_staff", + models.BooleanField(default=False, verbose_name="是否管理员"), + ), + ( + "created_at", + models.DateTimeField(auto_now_add=True, verbose_name="创建时间"), + ), + ( + "updated_at", + models.DateTimeField(auto_now=True, verbose_name="更新时间"), + ), + ( + "groups", + models.ManyToManyField( + blank=True, + help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.", + related_name="user_set", + related_query_name="user", + to="auth.group", + verbose_name="groups", + ), + ), + ( + "user_permissions", + models.ManyToManyField( + blank=True, + help_text="Specific permissions for this user.", + related_name="user_set", + related_query_name="user", + to="auth.permission", + verbose_name="user permissions", + ), + ), + ], + options={ + "verbose_name": "用户", + "verbose_name_plural": "用户", + "db_table": "user", + }, + ), + ] diff --git a/apps/users/migrations/__init__.py b/apps/users/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/users/models.py b/apps/users/models.py new file mode 100644 index 0000000..95b670e --- /dev/null +++ b/apps/users/models.py @@ -0,0 +1,49 @@ +""" +用户模块模型 +""" +from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin +from django.db import models + + +class UserManager(BaseUserManager): + """用户管理器""" + + def create_user(self, phone, password=None, **extra_fields): + if not phone: + raise ValueError('手机号不能为空') + user = self.model(phone=phone, **extra_fields) + if password: + user.set_password(password) + user.save(using=self._db) + return user + + def create_superuser(self, phone, password=None, **extra_fields): + extra_fields.setdefault('is_staff', True) + extra_fields.setdefault('is_superuser', True) + return self.create_user(phone, password, **extra_fields) + + +class User(AbstractBaseUser, PermissionsMixin): + """用户模型""" + + id = models.BigAutoField(primary_key=True) + phone = models.CharField('手机号', max_length=20, unique=True) + nickname = models.CharField('昵称', max_length=50, blank=True, default='') + avatar = models.URLField('头像', max_length=500, blank=True, default='') + is_active = models.BooleanField('是否激活', default=True) + is_staff = models.BooleanField('是否管理员', default=False) + created_at = models.DateTimeField('创建时间', auto_now_add=True) + updated_at = models.DateTimeField('更新时间', auto_now=True) + + objects = UserManager() + + USERNAME_FIELD = 'phone' + REQUIRED_FIELDS = [] + + class Meta: + db_table = 'user' + verbose_name = '用户' + verbose_name_plural = '用户' + + def __str__(self): + return self.phone diff --git a/apps/users/serializers.py b/apps/users/serializers.py new file mode 100644 index 0000000..c4aec74 --- /dev/null +++ b/apps/users/serializers.py @@ -0,0 +1,44 @@ +""" +用户模块序列化器 +""" +from rest_framework import serializers +from .models import User + + +class UserSerializer(serializers.ModelSerializer): + """用户序列化器""" + + class Meta: + model = User + fields = ['id', 'phone', 'nickname', 'avatar', 'created_at'] + read_only_fields = ['id', 'phone', 'created_at'] + + +class UserDetailSerializer(serializers.ModelSerializer): + """用户详情序列化器(管理端)""" + + class Meta: + model = User + fields = ['id', 'phone', 'nickname', 'avatar', 'is_active', 'is_staff', 'created_at', 'updated_at'] + + +class PhoneLoginSerializer(serializers.Serializer): + """手机号一键登录序列化器""" + + phone = serializers.CharField(max_length=20, help_text='手机号') + # 实际项目中应该有验证码或token + # code = serializers.CharField(max_length=10, help_text='验证码') + + def validate_phone(self, value): + # 简单验证手机号格式 + if not value.isdigit() or len(value) != 11: + raise serializers.ValidationError('手机号格式不正确') + return value + + +class UpdateUserSerializer(serializers.ModelSerializer): + """更新用户信息序列化器""" + + class Meta: + model = User + fields = ['nickname', 'avatar'] diff --git a/apps/users/urls.py b/apps/users/urls.py new file mode 100644 index 0000000..30d402d --- /dev/null +++ b/apps/users/urls.py @@ -0,0 +1,14 @@ +""" +用户模块URL配置 - 仅App端接口 +""" +from django.urls import path, include +from rest_framework.routers import DefaultRouter +from .views import AuthViewSet, UserViewSet + +router = DefaultRouter() +router.register('auth', AuthViewSet, basename='auth') +router.register('users', UserViewSet, basename='users') + +urlpatterns = [ + path('', include(router.urls)), +] diff --git a/apps/users/views.py b/apps/users/views.py new file mode 100644 index 0000000..7edf932 --- /dev/null +++ b/apps/users/views.py @@ -0,0 +1,120 @@ +""" +用户模块视图 - App端用户 +""" +from rest_framework import viewsets, status +from rest_framework.decorators import action +from rest_framework.permissions import AllowAny, IsAuthenticated +from rest_framework_simplejwt.tokens import RefreshToken + +from utils.response import success, error +from apps.admins.authentication import AppJWTAuthentication +from .models import User +from .serializers import ( + UserSerializer, + UserDetailSerializer, + PhoneLoginSerializer, + UpdateUserSerializer +) + + +def get_app_tokens(user): + """ + 为App用户生成JWT Token + 在token中添加 user_type='app' 以区分管理员 + """ + refresh = RefreshToken.for_user(user) + # 添加自定义声明 + refresh['user_type'] = 'app' + refresh['phone'] = user.phone + + return { + 'access': str(refresh.access_token), + 'refresh': str(refresh), + } + + +class AuthViewSet(viewsets.ViewSet): + """认证视图集 - App端""" + + permission_classes = [AllowAny] + + @action(detail=False, methods=['post'], url_path='phone-login') + def phone_login(self, request): + """ + 手机号一键登录 + POST /api/v1/auth/phone-login + """ + serializer = PhoneLoginSerializer(data=request.data) + if not serializer.is_valid(): + return error(message=str(serializer.errors)) + + phone = serializer.validated_data['phone'] + + # 获取或创建用户 + user, created = User.objects.get_or_create( + phone=phone, + defaults={'nickname': f'用户{phone[-4:]}'} + ) + + if not user.is_active: + return error(code=101, message='账号已被禁用') + + # 生成JWT Token(带user_type='app'标识) + tokens = get_app_tokens(user) + + return success(data={ + 'user': UserSerializer(user).data, + 'token': tokens, + 'is_new_user': created + }, message='登录成功') + + @action(detail=False, methods=['post'], url_path='refresh') + def refresh_token(self, request): + """ + 刷新Token + POST /api/v1/auth/refresh + """ + refresh_token = request.data.get('refresh') + if not refresh_token: + return error(message='refresh token不能为空') + + try: + refresh = RefreshToken(refresh_token) + # 验证是否为app token + user_type = refresh.get('user_type', 'app') + if user_type not in ['app', None]: + return error(code=103, message='无效的用户Token') + + return success(data={ + 'access': str(refresh.access_token), + 'refresh': str(refresh), + }) + except Exception as e: + return error(code=103, message='Token已过期或无效') + + +class UserViewSet(viewsets.ViewSet): + """用户视图集 - App端""" + + authentication_classes = [AppJWTAuthentication] + permission_classes = [IsAuthenticated] + + @action(detail=False, methods=['get']) + def me(self, request): + """ + 获取当前用户信息 + GET /api/v1/users/me + """ + return success(data=UserSerializer(request.user).data) + + @action(detail=False, methods=['put']) + def update_me(self, request): + """ + 更新当前用户信息 + PUT /api/v1/users/update_me + """ + serializer = UpdateUserSerializer(request.user, data=request.data, partial=True) + if serializer.is_valid(): + serializer.save() + return success(data=UserSerializer(request.user).data, message='更新成功') + return error(message=str(serializer.errors)) diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/asgi.py b/config/asgi.py new file mode 100644 index 0000000..50a54f1 --- /dev/null +++ b/config/asgi.py @@ -0,0 +1,16 @@ +""" +ASGI config for config project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/6.0/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") + +application = get_asgi_application() diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..f7416ce --- /dev/null +++ b/config/settings.py @@ -0,0 +1,196 @@ +""" +Django settings for RTC_DEMO project. +""" +import os +from pathlib import Path +from datetime import timedelta + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = Path(__file__).resolve().parent.parent + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'django-insecure-dev-key-change-in-production') + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = os.environ.get('DJANGO_DEBUG', 'True').lower() == 'true' + +ALLOWED_HOSTS = os.environ.get('DJANGO_ALLOWED_HOSTS', '*').split(',') + +# Application definition +INSTALLED_APPS = [ + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + # Third party + 'rest_framework', + 'rest_framework_simplejwt', + 'corsheaders', + 'drf_spectacular', + # Local apps + 'apps.users', + 'apps.spirits', + 'apps.devices', + 'apps.inventory', + 'apps.admins', +] + +MIDDLEWARE = [ + 'corsheaders.middleware.CorsMiddleware', + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'config.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'config.wsgi.application' + +# Database - MySQL via PyMySQL +import pymysql +pymysql.version_info = (2, 2, 1, 'final', 0) # Django 6.x requires mysqlclient >= 2.2.1 +pymysql.install_as_MySQLdb() + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.mysql', + 'NAME': os.environ.get('DB_NAME', 'rtc'), + 'USER': os.environ.get('DB_USER', 'rtc'), + 'PASSWORD': os.environ.get('DB_PASSWORD', 'JogNQdtrd3WY8CBCAiYfYEGx'), + 'HOST': os.environ.get('DB_HOST', 'rm-7xv1uaw910558p1788o.mysql.rds.aliyuncs.com'), + 'PORT': os.environ.get('DB_PORT', '3306'), + 'OPTIONS': { + 'charset': 'utf8mb4', + }, + 'TEST': { + 'NAME': 'rtc', # 使用现有数据库进行测试 + }, + } +} + +# Redis Cache +REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD', 'vAhRnAA6VMco') +REDIS_URL = os.environ.get('REDIS_URL', f'redis://:{REDIS_PASSWORD}@r-7xvat0vez5clwbzk5vpd.redis.rds.aliyuncs.com:6379/8') +CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.redis.RedisCache', + 'LOCATION': REDIS_URL, + } +} + +# Password validation +AUTH_PASSWORD_VALIDATORS = [ + {'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'}, + {'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'}, + {'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'}, + {'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'}, +] + +# Custom User Model +AUTH_USER_MODEL = 'users.User' + +# Internationalization +LANGUAGE_CODE = 'zh-hans' +TIME_ZONE = 'Asia/Shanghai' +USE_I18N = True +USE_TZ = True + +# Static files +STATIC_URL = 'static/' + +# Default primary key field type +DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' + +# CORS Settings +CORS_ALLOW_ALL_ORIGINS = DEBUG +CORS_ALLOWED_ORIGINS = os.environ.get('CORS_ALLOWED_ORIGINS', '').split(',') if not DEBUG else [] + +# REST Framework +REST_FRAMEWORK = { + 'DEFAULT_AUTHENTICATION_CLASSES': [ + 'rest_framework_simplejwt.authentication.JWTAuthentication', + ], + 'DEFAULT_PERMISSION_CLASSES': [ + 'rest_framework.permissions.IsAuthenticated', + ], + 'DEFAULT_RENDERER_CLASSES': [ + 'rest_framework.renderers.JSONRenderer', + ], + 'DEFAULT_SCHEMA_CLASS': 'drf_spectacular.openapi.AutoSchema', + 'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination', + 'PAGE_SIZE': 20, + 'EXCEPTION_HANDLER': 'utils.exceptions.custom_exception_handler', +} + +# JWT Settings +SIMPLE_JWT = { + 'ACCESS_TOKEN_LIFETIME': timedelta(days=7), + 'REFRESH_TOKEN_LIFETIME': timedelta(days=30), + 'ROTATE_REFRESH_TOKENS': True, + 'ALGORITHM': 'HS256', + 'AUTH_HEADER_TYPES': ('Bearer',), +} + +# Aliyun OSS Settings +ALIYUN_OSS = { + 'ACCESS_KEY_ID': os.environ.get('OSS_ACCESS_KEY_ID', 'LTAI5tBGAkR2rra2prTAX9yc'), + 'ACCESS_KEY_SECRET': os.environ.get('OSS_ACCESS_KEY_SECRET', 'U1z3d0p5saPRD5sCxVooJYSjxSAmKB'), + 'ENDPOINT': os.environ.get('OSS_ENDPOINT', 'oss-cn-beijing.aliyuncs.com'), + 'BUCKET_NAME': os.environ.get('OSS_BUCKET_NAME', 'qy-rtc'), + 'CUSTOM_DOMAIN': os.environ.get('OSS_CUSTOM_DOMAIN', ''), +} + +# Swagger/OpenAPI Settings +SPECTACULAR_SETTINGS = { + 'TITLE': 'RTC_DEMO API', + 'DESCRIPTION': 'RTC物联网设备管理平台API文档', + 'VERSION': '1.0.0', + 'SERVE_INCLUDE_SCHEMA': False, + 'COMPONENT_SPLIT_REQUEST': True, + 'SWAGGER_UI_SETTINGS': { + 'persistAuthorization': True, + }, +} + +# Logging +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'verbose': { + 'format': '{levelname} {asctime} {module} {message}', + 'style': '{', + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'verbose', + }, + }, + 'root': { + 'handlers': ['console'], + 'level': 'INFO', + }, +} diff --git a/config/settings_test.py b/config/settings_test.py new file mode 100644 index 0000000..a804159 --- /dev/null +++ b/config/settings_test.py @@ -0,0 +1,25 @@ +""" +测试配置文件 +使用SQLite运行测试以避免云数据库权限问题 +""" +from config.settings import * + +# 使用SQLite进行测试 +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': BASE_DIR / 'test_db.sqlite3', + } +} + +# 禁用Redis缓存,使用本地内存缓存 +CACHES = { + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + } +} + +# 加速密码哈希 +PASSWORD_HASHERS = [ + 'django.contrib.auth.hashers.MD5PasswordHasher', +] diff --git a/config/urls.py b/config/urls.py new file mode 100644 index 0000000..9be7bb4 --- /dev/null +++ b/config/urls.py @@ -0,0 +1,43 @@ +""" +URL configuration for RTC_DEMO project. + +路由完全分离: +- /api/v1/ - App端接口(使用AppJWTAuthentication,User模型) +- /api/admin/ - Web管理端接口(使用AdminJWTAuthentication,AdminUser模型) +""" +from django.contrib import admin +from django.urls import path, include +from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView, SpectacularRedocView + +# ============ App端路由 (普通用户,手机一键登录) ============ +app_api_patterns = [ + path('', include('apps.users.urls')), + path('', include('apps.spirits.urls')), + path('', include('apps.devices.urls')), +] + +# ============ Web管理端路由 (管理员,用户名密码登录) ============ +from apps.inventory.urls import admin_urlpatterns as inventory_admin_urls + +admin_api_patterns = [ + # 管理员认证和个人信息 + path('', include('apps.admins.urls')), + # 业务管理接口 + path('', include(inventory_admin_urls)), +] + +urlpatterns = [ + # Django Admin + path('django-admin/', admin.site.urls), + + # Swagger/OpenAPI文档 + path('api/schema/', SpectacularAPIView.as_view(), name='schema'), + path('api/docs/', SpectacularSwaggerView.as_view(url_name='schema'), name='swagger-ui'), + path('api/redoc/', SpectacularRedocView.as_view(url_name='schema'), name='redoc'), + + # App端API - 普通用户 + path('api/v1/', include(app_api_patterns)), + + # Web管理端API - 管理员 + path('api/admin/', include(admin_api_patterns)), +] diff --git a/config/wsgi.py b/config/wsgi.py new file mode 100644 index 0000000..190f99c --- /dev/null +++ b/config/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for config project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/6.0/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") + +application = get_wsgi_application() diff --git a/manage.py b/manage.py new file mode 100755 index 0000000..d28672e --- /dev/null +++ b/manage.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" +import os +import sys + + +def main(): + """Run administrative tasks.""" + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f2b0e09 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,28 @@ +aliyun-python-sdk-core==2.16.0 +aliyun-python-sdk-kms==2.16.5 +asgiref==3.11.0 +certifi==2026.1.4 +cffi==2.0.0 +charset-normalizer==3.4.4 +crcmod==1.7 +cryptography==46.0.4 +Django==6.0.1 +django-cors-headers==4.9.0 +djangorestframework==3.16.1 +djangorestframework_simplejwt==5.5.1 +et_xmlfile==2.0.0 +idna==3.11 +jmespath==0.10.0 +openpyxl==3.1.5 +oss2==2.19.1 +pillow==12.1.0 +pycparser==3.0 +pycryptodome==3.23.0 +PyJWT==2.10.1 +PyMySQL==1.1.2 +qrcode==8.2 +redis==7.1.0 +requests==2.32.5 +six==1.17.0 +sqlparse==0.5.5 +urllib3==2.6.3 diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..c9bc745 --- /dev/null +++ b/tests.py @@ -0,0 +1,787 @@ +""" +RTC_DEMO API 完整测试用例 +覆盖所有API接口 +运行方式: python manage.py test tests --verbosity=2 --keepdb +""" +import json +from datetime import date +from django.test import TestCase +from django.urls import reverse +from rest_framework.test import APITestCase, APIClient +from rest_framework import status +from apps.users.models import User +from apps.admins.models import AdminUser +from apps.spirits.models import Spirit +from apps.devices.models import DeviceType, DeviceBatch, Device, UserDevice + + +# ==================== App端测试 ==================== + +class UserAuthTests(APITestCase): + """App端用户认证测试""" + + def test_phone_login_new_user(self): + """测试手机号一键登录 - 新用户""" + url = '/api/v1/auth/phone-login/' + data = {'phone': '13800138001'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertIn('token', response.data['data']) + self.assertTrue(response.data['data']['is_new_user']) + self.assertEqual(response.data['data']['user']['phone'], '13800138001') + + def test_phone_login_existing_user(self): + """测试手机号一键登录 - 已有用户""" + User.objects.create_user(phone='13800138002', nickname='测试用户') + + url = '/api/v1/auth/phone-login/' + data = {'phone': '13800138002'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertFalse(response.data['data']['is_new_user']) + + def test_phone_login_invalid_phone(self): + """测试手机号格式验证""" + url = '/api/v1/auth/phone-login/' + data = {'phone': '1234567'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_phone_login_disabled_user(self): + """测试禁用用户登录""" + User.objects.create_user(phone='13800138003', is_active=False) + + url = '/api/v1/auth/phone-login/' + data = {'phone': '13800138003'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_token_refresh(self): + """测试Token刷新""" + login_url = '/api/v1/auth/phone-login/' + login_response = self.client.post(login_url, {'phone': '13800138004'}, format='json') + refresh_token = login_response.data['data']['token']['refresh'] + + refresh_url = '/api/v1/auth/refresh/' + response = self.client.post(refresh_url, {'refresh': refresh_token}, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertIn('access', response.data['data']) + + def test_token_refresh_invalid(self): + """测试无效Token刷新""" + url = '/api/v1/auth/refresh/' + response = self.client.post(url, {'refresh': 'invalid_token'}, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_token_refresh_empty(self): + """测试空Token刷新""" + url = '/api/v1/auth/refresh/' + response = self.client.post(url, {}, format='json') + + self.assertNotEqual(response.data['code'], 0) + + +class UserProfileTests(APITestCase): + """App端用户信息测试""" + + def setUp(self): + self.user = User.objects.create_user(phone='13800138010', nickname='测试用户') + login_url = '/api/v1/auth/phone-login/' + response = self.client.post(login_url, {'phone': '13800138010'}, format='json') + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + def test_get_user_info(self): + """测试获取用户信息""" + url = '/api/v1/users/me/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(response.data['data']['phone'], '13800138010') + + def test_get_user_info_unauthorized(self): + """测试未登录获取用户信息""" + self.client.credentials() + url = '/api/v1/users/me/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_update_user_info(self): + """测试更新用户信息""" + url = '/api/v1/users/update_me/' + data = {'nickname': '新昵称'} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(response.data['data']['nickname'], '新昵称') + + def test_update_user_avatar(self): + """测试更新用户头像""" + url = '/api/v1/users/update_me/' + data = {'avatar': 'https://example.com/avatar.jpg'} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + +class SpiritTests(APITestCase): + """App端智能体测试""" + + def setUp(self): + self.user = User.objects.create_user(phone='13800138020', nickname='测试用户') + login_url = '/api/v1/auth/phone-login/' + response = self.client.post(login_url, {'phone': '13800138020'}, format='json') + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + def test_create_spirit(self): + """测试创建智能体""" + url = '/api/v1/spirits/' + data = { + 'name': '测试心灵', + 'prompt': '你是一个友好的助手', + 'voice_id': 'voice_001' + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(response.data['data']['name'], '测试心灵') + + def test_create_spirit_minimal(self): + """测试创建智能体 - 最少字段""" + url = '/api/v1/spirits/' + data = {'name': '简单心灵'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + def test_list_spirits(self): + """测试获取智能体列表""" + Spirit.objects.create(user=self.user, name='心灵1') + Spirit.objects.create(user=self.user, name='心灵2') + + url = '/api/v1/spirits/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(len(response.data['data']), 2) + + def test_list_spirits_empty(self): + """测试获取空智能体列表""" + url = '/api/v1/spirits/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data['data']), 0) + + def test_get_spirit_detail(self): + """测试获取智能体详情""" + spirit = Spirit.objects.create(user=self.user, name='详情测试') + + url = f'/api/v1/spirits/{spirit.id}/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['name'], '详情测试') + + def test_update_spirit(self): + """测试更新智能体""" + spirit = Spirit.objects.create(user=self.user, name='原始名称') + + url = f'/api/v1/spirits/{spirit.id}/' + data = {'name': '新名称', 'prompt': '新的提示词'} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['name'], '新名称') + + def test_delete_spirit(self): + """测试删除智能体""" + spirit = Spirit.objects.create(user=self.user, name='待删除') + + url = f'/api/v1/spirits/{spirit.id}/' + response = self.client.delete(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertFalse(Spirit.objects.filter(id=spirit.id).exists()) + + def test_spirit_isolation(self): + """测试智能体用户隔离 - 不能访问其他用户的智能体""" + other_user = User.objects.create_user(phone='13800138021') + other_spirit = Spirit.objects.create(user=other_user, name='其他用户的心灵') + + url = f'/api/v1/spirits/{other_spirit.id}/' + response = self.client.get(url) + + # 应该返回404或权限错误 + self.assertNotEqual(response.status_code, status.HTTP_200_OK) + + +class DeviceTests(APITestCase): + """App端设备测试""" + + def setUp(self): + self.user = User.objects.create_user(phone='13800138050', nickname='测试用户') + login_url = '/api/v1/auth/phone-login/' + response = self.client.post(login_url, {'phone': '13800138050'}, format='json') + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + self.device_type = DeviceType.objects.create( + brand='AL', + product_code='DZBJ-ON', + name='电子吧唧-联网版' + ) + self.device = Device.objects.create( + sn='AL-DZBJ-ON-25W45-A01-00001', + device_type=self.device_type, + mac_address='AA:BB:CC:DD:EE:FF', + status='in_stock' + ) + + def test_query_by_mac(self): + """测试通过MAC地址查询SN码(无需登录)""" + self.client.credentials() # 清除认证 + url = '/api/v1/devices/query-by-mac/?mac=AA:BB:CC:DD:EE:FF' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(response.data['data']['sn'], 'AL-DZBJ-ON-25W45-A01-00001') + + def test_query_by_mac_lowercase(self): + """测试MAC地址查询 - 小写格式""" + self.client.credentials() + url = '/api/v1/devices/query-by-mac/?mac=aa:bb:cc:dd:ee:ff' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + def test_query_by_mac_dash_format(self): + """测试MAC地址查询 - 横杠格式""" + self.client.credentials() + url = '/api/v1/devices/query-by-mac/?mac=AA-BB-CC-DD-EE-FF' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_query_by_mac_not_found(self): + """测试MAC地址查询 - 设备不存在""" + self.client.credentials() + url = '/api/v1/devices/query-by-mac/?mac=11:22:33:44:55:66' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_query_by_mac_empty(self): + """测试MAC地址查询 - 空参数""" + self.client.credentials() + url = '/api/v1/devices/query-by-mac/' + response = self.client.get(url) + + self.assertNotEqual(response.data['code'], 0) + + def test_verify_device(self): + """测试验证设备SN""" + url = '/api/v1/devices/verify/' + data = {'sn': 'AL-DZBJ-ON-25W45-A01-00001'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertTrue(response.data['data']['is_bindable']) + + def test_verify_device_not_found(self): + """测试验证设备SN - 不存在""" + url = '/api/v1/devices/verify/' + data = {'sn': 'NOT-EXIST-SN'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_bind_device(self): + """测试绑定设备""" + spirit = Spirit.objects.create(user=self.user, name='测试心灵') + + url = '/api/v1/devices/bind/' + data = { + 'sn': 'AL-DZBJ-ON-25W45-A01-00001', + 'spirit_id': spirit.id + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + # 验证设备状态 + self.device.refresh_from_db() + self.assertEqual(self.device.status, 'bound') + + def test_bind_device_without_spirit(self): + """测试绑定设备 - 不绑定智能体""" + device2 = Device.objects.create( + sn='AL-DZBJ-ON-25W45-A01-00002', + device_type=self.device_type, + status='in_stock' + ) + + url = '/api/v1/devices/bind/' + data = {'sn': 'AL-DZBJ-ON-25W45-A01-00002'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_my_devices(self): + """测试我的设备列表""" + UserDevice.objects.create(user=self.user, device=self.device, is_active=True) + + url = '/api/v1/devices/my_devices/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(len(response.data['data']), 1) + + def test_my_devices_empty(self): + """测试我的设备列表 - 空列表""" + url = '/api/v1/devices/my_devices/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data['data']), 0) + + def test_unbind_device(self): + """测试解绑设备""" + user_device = UserDevice.objects.create(user=self.user, device=self.device, is_active=True) + self.device.status = 'bound' + self.device.save() + + url = f'/api/v1/devices/{user_device.id}/unbind/' + response = self.client.delete(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + user_device.refresh_from_db() + self.assertFalse(user_device.is_active) + + def test_update_spirit_on_device(self): + """测试更新设备绑定的智能体""" + spirit1 = Spirit.objects.create(user=self.user, name='心灵1') + spirit2 = Spirit.objects.create(user=self.user, name='心灵2') + user_device = UserDevice.objects.create( + user=self.user, device=self.device, spirit=spirit1, is_active=True + ) + + url = f'/api/v1/devices/{user_device.id}/update-spirit/' + data = {'spirit_id': spirit2.id} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + user_device.refresh_from_db() + self.assertEqual(user_device.spirit_id, spirit2.id) + + +# ==================== 管理端测试 ==================== + +class AdminAuthTests(APITestCase): + """管理端认证测试""" + + def setUp(self): + self.admin = AdminUser.objects.create_user( + username='admin', + password='admin123', + role='super_admin' + ) + + def test_admin_login(self): + """测试管理员登录""" + url = '/api/admin/auth/login/' + data = {'username': 'admin', 'password': 'admin123'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertIn('token', response.data['data']) + self.assertEqual(response.data['data']['admin']['username'], 'admin') + + def test_admin_login_wrong_password(self): + """测试管理员登录 - 密码错误""" + url = '/api/admin/auth/login/' + data = {'username': 'admin', 'password': 'wrong'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_admin_login_user_not_exist(self): + """测试管理员登录 - 用户不存在""" + url = '/api/admin/auth/login/' + data = {'username': 'notexist', 'password': 'password'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_admin_login_disabled(self): + """测试管理员登录 - 账户禁用""" + disabled_admin = AdminUser.objects.create_user( + username='disabled', password='pass123', is_active=False + ) + + url = '/api/admin/auth/login/' + data = {'username': 'disabled', 'password': 'pass123'} + response = self.client.post(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + def test_admin_token_refresh(self): + """测试管理员Token刷新""" + login_url = '/api/admin/auth/login/' + login_response = self.client.post( + login_url, {'username': 'admin', 'password': 'admin123'}, format='json' + ) + refresh_token = login_response.data['data']['token']['refresh'] + + refresh_url = '/api/admin/auth/refresh/' + response = self.client.post(refresh_url, {'refresh': refresh_token}, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + +class AdminProfileTests(APITestCase): + """管理端个人信息测试""" + + def setUp(self): + self.admin = AdminUser.objects.create_user( + username='admin', password='admin123', role='admin' + ) + login_url = '/api/admin/auth/login/' + response = self.client.post( + login_url, {'username': 'admin', 'password': 'admin123'}, format='json' + ) + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + def test_get_admin_profile(self): + """测试获取管理员信息""" + url = '/api/admin/profile/me/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['username'], 'admin') + + def test_change_password(self): + """测试修改密码""" + url = '/api/admin/profile/change-password/' + data = {'old_password': 'admin123', 'new_password': 'newpass123'} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # 验证新密码可以登录 + self.client.credentials() + login_url = '/api/admin/auth/login/' + login_response = self.client.post( + login_url, {'username': 'admin', 'password': 'newpass123'}, format='json' + ) + self.assertEqual(login_response.data['code'], 0) + + def test_change_password_wrong_old(self): + """测试修改密码 - 原密码错误""" + url = '/api/admin/profile/change-password/' + data = {'old_password': 'wrongpass', 'new_password': 'newpass123'} + response = self.client.put(url, data, format='json') + + self.assertNotEqual(response.data['code'], 0) + + +class AdminUserManageTests(APITestCase): + """管理员用户管理测试""" + + def setUp(self): + self.super_admin = AdminUser.objects.create_user( + username='superadmin', password='super123', role='super_admin' + ) + login_url = '/api/admin/auth/login/' + response = self.client.post( + login_url, {'username': 'superadmin', 'password': 'super123'}, format='json' + ) + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + def test_list_admins(self): + """测试管理员列表""" + url = '/api/admin/admins/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + def test_create_admin(self): + """测试创建管理员""" + url = '/api/admin/admins/' + data = { + 'username': 'newadmin', + 'password': 'newpass123', + 'name': '新管理员', + 'role': 'operator' + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['username'], 'newadmin') + + def test_get_admin_detail(self): + """测试获取管理员详情""" + admin = AdminUser.objects.create_user(username='testadmin', password='test123') + + url = f'/api/admin/admins/{admin.id}/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['username'], 'testadmin') + + def test_toggle_admin_status(self): + """测试启用/禁用管理员""" + admin = AdminUser.objects.create_user(username='testadmin2', password='test123') + + url = f'/api/admin/admins/{admin.id}/toggle-status/' + response = self.client.post(url, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + admin.refresh_from_db() + self.assertFalse(admin.is_active) + + def test_reset_admin_password(self): + """测试重置管理员密码""" + admin = AdminUser.objects.create_user(username='testadmin3', password='oldpass') + + url = f'/api/admin/admins/{admin.id}/reset-password/' + data = {'new_password': 'resetpass123'} + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + +class AdminDeviceTypeTests(APITestCase): + """管理端设备类型测试""" + + def setUp(self): + self.admin = AdminUser.objects.create_user( + username='admin', password='admin123', role='super_admin' + ) + login_url = '/api/admin/auth/login/' + response = self.client.post( + login_url, {'username': 'admin', 'password': 'admin123'}, format='json' + ) + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + def test_create_device_type_network(self): + """测试创建联网设备类型""" + url = '/api/admin/device-types/' + data = { + 'brand': 'AL', + 'product_code': 'DZBJ-ON', + 'name': '电子吧唧-联网版' + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertTrue(response.data['data']['is_network_required']) + + def test_create_device_type_offline(self): + """测试创建非联网设备类型""" + url = '/api/admin/device-types/' + data = { + 'brand': 'AL', + 'product_code': 'DZBJ-OFF', + 'name': '电子吧唧-离线版' + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertFalse(response.data['data']['is_network_required']) + + def test_list_device_types(self): + """测试设备类型列表""" + DeviceType.objects.create(brand='AL', product_code='TEST-ON', name='测试设备') + + url = '/api/admin/device-types/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + def test_get_device_type_detail(self): + """测试设备类型详情""" + dt = DeviceType.objects.create(brand='AL', product_code='TEST-DT', name='测试类型') + + url = f'/api/admin/device-types/{dt.id}/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['name'], '测试类型') + + def test_update_device_type(self): + """测试更新设备类型""" + dt = DeviceType.objects.create(brand='AL', product_code='TEST-UP', name='原名称') + + url = f'/api/admin/device-types/{dt.id}/' + data = {'name': '新名称'} + response = self.client.put(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['data']['name'], '新名称') + + +class AdminBatchTests(APITestCase): + """管理端批次测试""" + + def setUp(self): + self.admin = AdminUser.objects.create_user( + username='admin', password='admin123', role='super_admin' + ) + login_url = '/api/admin/auth/login/' + response = self.client.post( + login_url, {'username': 'admin', 'password': 'admin123'}, format='json' + ) + self.access_token = response.data['data']['token']['access'] + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.access_token}') + + self.device_type = DeviceType.objects.create( + brand='AL', product_code='DZBJ-ON', name='电子吧唧-联网版' + ) + + def test_create_batch_and_generate_sn(self): + """测试创建批次并生成SN码""" + url = '/api/admin/device-batches/' + data = { + 'device_type': self.device_type.id, + 'batch_no': 'A01', + 'production_date': '2026-01-28', + 'quantity': 10, + 'remark': '测试批次' + } + response = self.client.post(url, data, format='json') + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + self.assertEqual(response.data['data']['quantity'], 10) + self.assertIn('sn_range', response.data['data']) + + # 验证SN码格式 + sn_start = response.data['data']['sn_range']['start'] + self.assertTrue(sn_start.startswith('AL-DZBJ-ON-')) + + # 验证设备数量 + device_count = Device.objects.filter(batch__batch_no='A01').count() + self.assertEqual(device_count, 10) + + def test_list_batches(self): + """测试批次列表""" + url = '/api/admin/device-batches/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['code'], 0) + + def test_get_batch_detail(self): + """测试批次详情""" + batch = DeviceBatch.objects.create( + device_type=self.device_type, + batch_no='B01', + production_date=date(2026, 1, 28), + quantity=5 + ) + + url = f'/api/admin/device-batches/{batch.id}/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('statistics', response.data['data']) + + def test_get_batch_devices(self): + """测试批次设备列表""" + batch = DeviceBatch.objects.create( + device_type=self.device_type, + batch_no='C01', + production_date=date(2026, 1, 28), + quantity=3 + ) + Device.objects.create(sn='TEST-001', batch=batch, device_type=self.device_type) + Device.objects.create(sn='TEST-002', batch=batch, device_type=self.device_type) + + url = f'/api/admin/device-batches/{batch.id}/devices/' + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data['data']['items']), 2) + + +class AuthSeparationTests(APITestCase): + """验证App和Admin认证分离""" + + def setUp(self): + # 创建App用户 + self.app_user = User.objects.create_user(phone='13800138099') + login_url = '/api/v1/auth/phone-login/' + response = self.client.post(login_url, {'phone': '13800138099'}, format='json') + self.app_token = response.data['data']['token']['access'] + + # 创建Admin用户 + self.admin = AdminUser.objects.create_user( + username='admin', password='admin123', role='admin' + ) + login_url = '/api/admin/auth/login/' + response = self.client.post( + login_url, {'username': 'admin', 'password': 'admin123'}, format='json' + ) + self.admin_token = response.data['data']['token']['access'] + + def test_app_token_cannot_access_admin_api(self): + """测试App Token无法访问管理端API""" + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.app_token}') + + url = '/api/admin/device-types/' + response = self.client.get(url) + + self.assertNotEqual(response.data.get('code', 0), 0) + + def test_admin_token_cannot_access_app_api(self): + """测试Admin Token无法访问App端API""" + self.client.credentials(HTTP_AUTHORIZATION=f'Bearer {self.admin_token}') + + url = '/api/v1/users/me/' + response = self.client.get(url) + + self.assertNotEqual(response.data.get('code', 0), 0) + + def test_no_token_cannot_access_protected_api(self): + """测试无Token无法访问受保护API""" + self.client.credentials() + + url = '/api/v1/users/me/' + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + url = '/api/admin/device-types/' + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/exceptions.py b/utils/exceptions.py new file mode 100644 index 0000000..8ba7aaa --- /dev/null +++ b/utils/exceptions.py @@ -0,0 +1,91 @@ +""" +自定义异常处理 +""" +from rest_framework.views import exception_handler +from rest_framework import status +from utils.response import APIResponse + + +class BusinessException(Exception): + """业务异常""" + def __init__(self, code=1, message='业务错误', status_code=status.HTTP_400_BAD_REQUEST): + self.code = code + self.message = message + self.status_code = status_code + super().__init__(message) + + +class ErrorCode: + """错误码定义""" + # 通用错误 1-99 + SUCCESS = 0 + UNKNOWN_ERROR = 1 + PARAM_ERROR = 2 + NOT_FOUND = 3 + PERMISSION_DENIED = 4 + + # 用户模块 100-199 + USER_NOT_FOUND = 100 + USER_DISABLED = 101 + PHONE_INVALID = 102 + TOKEN_EXPIRED = 103 + + # 设备模块 200-299 + DEVICE_NOT_FOUND = 200 + DEVICE_ALREADY_BOUND = 201 + DEVICE_MAC_EXISTS = 202 + DEVICE_SN_INVALID = 203 + + # 智能体模块 300-399 + SPIRIT_NOT_FOUND = 300 + SPIRIT_LIMIT_EXCEEDED = 301 + + # 批次模块 400-499 + BATCH_NOT_FOUND = 400 + BATCH_SN_EXISTS = 401 + DEVICE_TYPE_NOT_FOUND = 402 + + +def custom_exception_handler(exc, context): + """自定义异常处理器""" + # 处理业务异常 + if isinstance(exc, BusinessException): + return APIResponse( + code=exc.code, + message=exc.message, + status_code=exc.status_code + ) + + # 调用DRF默认异常处理 + response = exception_handler(exc, context) + + if response is not None: + # 统一格式化DRF异常 + error_message = '' + if isinstance(response.data, dict): + if 'detail' in response.data: + error_message = str(response.data['detail']) + else: + # 处理字段验证错误 + errors = [] + for field, messages in response.data.items(): + if isinstance(messages, list): + errors.append(f"{field}: {', '.join(str(m) for m in messages)}") + else: + errors.append(f"{field}: {messages}") + error_message = '; '.join(errors) + else: + error_message = str(response.data) + + return APIResponse( + code=ErrorCode.UNKNOWN_ERROR, + message=error_message, + status_code=response.status_code + ) + + # 处理未知异常 + return APIResponse( + code=ErrorCode.UNKNOWN_ERROR, + message=str(exc), + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR + ) diff --git a/utils/oss.py b/utils/oss.py new file mode 100644 index 0000000..a938ecc --- /dev/null +++ b/utils/oss.py @@ -0,0 +1,92 @@ +""" +阿里云OSS工具类 +""" +import os +import uuid +from datetime import datetime +from django.conf import settings + +try: + import oss2 + OSS_AVAILABLE = True +except ImportError: + OSS_AVAILABLE = False + + +class OSSClient: + """阿里云OSS客户端""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + if not OSS_AVAILABLE: + self.bucket = None + self._initialized = True + return + + oss_config = settings.ALIYUN_OSS + if not oss_config.get('ACCESS_KEY_ID'): + self.bucket = None + self._initialized = True + return + + auth = oss2.Auth( + oss_config['ACCESS_KEY_ID'], + oss_config['ACCESS_KEY_SECRET'] + ) + self.bucket = oss2.Bucket( + auth, + oss_config['ENDPOINT'], + oss_config['BUCKET_NAME'] + ) + self.custom_domain = oss_config.get('CUSTOM_DOMAIN', '') + self._initialized = True + + def upload_file(self, file_obj, folder='uploads'): + """ + 上传文件 + :param file_obj: 文件对象 + :param folder: 存储文件夹 + :return: 文件URL + """ + if not self.bucket: + raise Exception('OSS未配置') + + # 生成唯一文件名 + ext = os.path.splitext(file_obj.name)[1] if hasattr(file_obj, 'name') else '' + filename = f"{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}{ext}" + key = f"{folder}/{filename}" + + # 上传文件 + self.bucket.put_object(key, file_obj) + + # 返回URL + if self.custom_domain: + return f"https://{self.custom_domain}/{key}" + return f"https://{settings.ALIYUN_OSS['BUCKET_NAME']}.{settings.ALIYUN_OSS['ENDPOINT']}/{key}" + + def delete_file(self, file_url): + """ + 删除文件 + :param file_url: 文件URL + """ + if not self.bucket: + return + + # 从URL提取key + key = file_url.split('/')[-2] + '/' + file_url.split('/')[-1] + self.bucket.delete_object(key) + + +def get_oss_client(): + """获取OSS客户端单例""" + return OSSClient() diff --git a/utils/permissions.py b/utils/permissions.py new file mode 100644 index 0000000..eb6477b --- /dev/null +++ b/utils/permissions.py @@ -0,0 +1,27 @@ +""" +权限控制 +""" +from rest_framework.permissions import BasePermission + + +class IsAdmin(BasePermission): + """管理员权限""" + def has_permission(self, request, view): + return request.user and request.user.is_authenticated and request.user.is_staff + + +class IsOwner(BasePermission): + """资源所有者权限""" + def has_object_permission(self, request, view, obj): + # 假设对象有user字段 + if hasattr(obj, 'user'): + return obj.user == request.user + if hasattr(obj, 'owner'): + return obj.owner == request.user + return False + + +class AllowAny(BasePermission): + """允许任何访问""" + def has_permission(self, request, view): + return True diff --git a/utils/response.py b/utils/response.py new file mode 100644 index 0000000..90b271b --- /dev/null +++ b/utils/response.py @@ -0,0 +1,52 @@ +""" +统一响应格式工具类 +""" +from rest_framework.response import Response +from rest_framework import status + + +class APIResponse(Response): + """ + 统一API响应格式 + { + "code": 0, # 0表示成功,非0表示错误码 + "message": "success", # 响应消息 + "data": {} # 响应数据 + } + """ + + def __init__(self, data=None, code=0, message='success', + status_code=status.HTTP_200_OK, **kwargs): + response_data = { + 'code': code, + 'message': message, + 'data': data + } + super().__init__(data=response_data, status=status_code, **kwargs) + + +def success(data=None, message='success'): + """成功响应""" + return APIResponse(data=data, code=0, message=message) + + +def error(code=1, message='error', status_code=status.HTTP_400_BAD_REQUEST, data=None): + """错误响应""" + return APIResponse(data=data, code=code, message=message, status_code=status_code) + + +def paginated_response(paginator, serializer_class, queryset, request): + """分页响应""" + page = paginator.paginate_queryset(queryset, request) + if page is not None: + serializer = serializer_class(page, many=True) + return APIResponse( + data={ + 'total': paginator.page.paginator.count, + 'page': paginator.page.number, + 'page_size': paginator.page_size, + 'items': serializer.data + } + ) + serializer = serializer_class(queryset, many=True) + return APIResponse(data={'items': serializer.data})