lty/qy_lty/card/views.py
pmc c0fe1f502b
All checks were successful
Build and Deploy LTY / build-and-deploy (push) Successful in 1h5m35s
feat: update card models, admin pages, and add migrations
- Update card models, serializers, views and URLs
- Update dances, songs, users admin pages and API modules
- Add card migrations (merge furniture into decoration)
- Update middleware and settings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 16:38:48 +08:00

1088 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.shortcuts import render, get_object_or_404
from django.utils import timezone
from django.db import transaction
from django.http import HttpResponse, JsonResponse
import json
from rest_framework import viewsets, status, generics, permissions
from rest_framework.decorators import api_view, permission_classes, action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
import uuid
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
import io
import logging
from django.core.files.base import ContentFile
import secrets
import string
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework import serializers
from common.swagger_utils import swagger_schema
from userapp.authentication import RedisTokenAuthentication
import csv
from .models import CardTemplate, Card, CardBatch, CardUsageLog
from .serializers import (
CardTemplateSerializer, CardTemplateDetailSerializer,
CardSerializer, CardDetailSerializer, CardBatchSerializer, CardUsageLogSerializer,
CardScanSerializer, CardUseSerializer, CardBatchGenerateSerializer,
CardTemplatePublishSerializer, CardBatchPublishSerializer, CardBatchManufactureSerializer,
CategoryTemplateSerializer, CategoryCardDetailSerializer,
MobileProductSerializer,
ClothingAttributesSerializer, PropAttributesSerializer, SongAttributesSerializer,
DanceAttributesSerializer, DecorationAttributesSerializer
)
logger = logging.getLogger(__name__)
# Swagger Schema 定义
class CardTemplateResponseSchema(serializers.Serializer):
message = serializers.CharField(help_text="操作结果信息")
template = CardTemplateSerializer(help_text="卡片模板信息")
class CardScanResponseSchema(serializers.Serializer):
message = serializers.CharField(help_text="操作结果信息")
card = CardDetailSerializer(help_text="卡片信息")
class CardUseResponseSchema(serializers.Serializer):
message = serializers.CharField(help_text="操作结果信息")
card = CardDetailSerializer(help_text="卡片信息")
unlock_info = serializers.JSONField(help_text="解锁信息", required=False)
class CardBatchGenerateResponseSchema(serializers.Serializer):
message = serializers.CharField(help_text="操作结果信息")
batch = CardBatchSerializer(help_text="卡片批次信息")
cards_count = serializers.IntegerField(help_text="生成的卡片数量")
class CardListResponseSchema(serializers.Serializer):
count = serializers.IntegerField(help_text="卡片总数")
next = serializers.URLField(help_text="下一页URL", allow_null=True)
previous = serializers.URLField(help_text="上一页URL", allow_null=True)
results = CardSerializer(many=True, help_text="卡片列表")
class ErrorResponseSchema(serializers.Serializer):
error = serializers.CharField(help_text="错误信息")
class Meta:
ref_name = "CardErrorResponse"
# 分类参数
category_param = openapi.Parameter(
'category', openapi.IN_QUERY,
description="按分类筛选", type=openapi.TYPE_STRING, required=False
)
status_param = openapi.Parameter(
'status', openapi.IN_QUERY,
description="按状态筛选", type=openapi.TYPE_STRING, required=False
)
template_param = openapi.Parameter(
'template', openapi.IN_QUERY,
description="按模板ID筛选", type=openapi.TYPE_INTEGER, required=False
)
batch_param = openapi.Parameter(
'batch', openapi.IN_QUERY,
description="按批次ID筛选", type=openapi.TYPE_INTEGER, required=False
)
user_param = openapi.Parameter(
'user', openapi.IN_QUERY,
description="按用户ID筛选", type=openapi.TYPE_INTEGER, required=False
)
# Create your views here.
class IsAdminOrReadOnly(permissions.BasePermission):
"""
Custom permission to allow admin users to have full access and others to read only.
"""
def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return True
return request.user and request.user.is_staff
class CardTemplateViewSet(viewsets.ModelViewSet):
"""
卡片模板管理接口
提供卡片模板的增删改查功能。
支持管理不同类型的卡片模板,包括服装、道具、歌曲等。
"""
queryset = CardTemplate.objects.all()
serializer_class = CardTemplateSerializer
permission_classes = [IsAdminOrReadOnly]
authentication_classes = [RedisTokenAuthentication]
tags = ['card']
def get_serializer_class(self):
"""
根据不同的操作返回相应的序列化器
"""
if self.action == 'retrieve':
return CardTemplateDetailSerializer
return self.serializer_class
def get_queryset(self):
queryset = super().get_queryset()
# Filter by category if provided
category = self.request.query_params.get('category', None)
if category is not None:
queryset = queryset.filter(category=category)
# Filter by status if provided
status_param = self.request.query_params.get('status', None)
if status_param is not None:
queryset = queryset.filter(status=status_param)
# Filter by rarity if provided
rarity = self.request.query_params.get('rarity', None)
if rarity is not None:
queryset = queryset.filter(rarity=rarity)
# Filter by card_type if provided
card_type = self.request.query_params.get('card_type', None)
if card_type is not None:
queryset = queryset.filter(card_type=card_type)
return queryset
@swagger_schema(
responses={
200: openapi.Response('发布成功', CardTemplateResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema)
},
operation_description="将卡片模板发布,使其可用于卡片批次生成",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=True, methods=['post'], permission_classes=[IsAdminUser], authentication_classes=[RedisTokenAuthentication])
def publish(self, request, pk=None):
"""
发布卡片模板
将卡片模板发布,使其可用于卡片批次生成。
"""
template = self.get_object()
if template.status == 'published':
return Response({
'error': 'This template is already published.'
}, status=status.HTTP_400_BAD_REQUEST)
template.publish()
return Response({
'message': 'Card template successfully published.',
'template': CardTemplateSerializer(template).data
})
@swagger_schema(
responses={
200: openapi.Response('归档成功', CardTemplateResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema)
},
operation_description="将不再使用的卡片模板归档",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=True, methods=['post'], permission_classes=[IsAdminUser], authentication_classes=[RedisTokenAuthentication])
def archive(self, request, pk=None):
"""
归档卡片模板
将不再使用的卡片模板归档。
"""
template = self.get_object()
if template.status == 'archived':
return Response({
'error': 'This template is already archived.'
}, status=status.HTTP_400_BAD_REQUEST)
template.archive()
return Response({
'message': 'Card template successfully archived.',
'template': CardTemplateSerializer(template).data
})
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# 先保存主表
template = serializer.save()
# 处理专有属性
category = serializer.validated_data.get('category')
# 服装
if category == 'clothing' and 'clothing_attributes' in request.data:
clothing_data = request.data['clothing_attributes']
clothing_serializer = ClothingAttributesSerializer(data=clothing_data)
clothing_serializer.is_valid(raise_exception=True)
clothing_serializer.save(template=template)
# 道具
if category == 'prop' and 'prop_attributes' in request.data:
prop_data = request.data['prop_attributes']
prop_serializer = PropAttributesSerializer(data=prop_data)
prop_serializer.is_valid(raise_exception=True)
prop_serializer.save(template=template)
# 歌曲
if category == 'song' and 'song_attributes' in request.data:
song_data = request.data['song_attributes']
song_serializer = SongAttributesSerializer(data=song_data)
song_serializer.is_valid(raise_exception=True)
song_serializer.save(template=template)
# 舞蹈
if category == 'dance' and 'dance_attributes' in request.data:
dance_data = request.data['dance_attributes']
dance_serializer = DanceAttributesSerializer(data=dance_data)
dance_serializer.is_valid(raise_exception=True)
dance_serializer.save(template=template)
# 家居装饰
if category == 'decoration' and 'decoration_attributes' in request.data:
decoration_data = request.data['decoration_attributes']
decoration_serializer = DecorationAttributesSerializer(data=decoration_data)
decoration_serializer.is_valid(raise_exception=True)
decoration_serializer.save(template=template)
headers = self.get_success_headers(serializer.data)
return Response(
self.get_serializer(template).data,
status=status.HTTP_201_CREATED,
headers=headers
)
class CardViewSet(viewsets.ModelViewSet):
"""
卡片管理接口
提供卡片的增删改查、扫描和使用等功能。
支持卡片的生命周期管理。
"""
queryset = Card.objects.all()
serializer_class = CardSerializer
permission_classes = [IsAdminOrReadOnly]
authentication_classes = [RedisTokenAuthentication]
tags = ['card']
def get_serializer_class(self):
"""
根据不同的操作返回相应的序列化器
"""
if self.action == 'retrieve':
return CategoryCardDetailSerializer
return self.serializer_class
@swagger_auto_schema(
manual_parameters=[category_param, template_param, batch_param, status_param, user_param],
responses={
200: openapi.Response('查询成功', CardListResponseSchema)
},
security=[{'Bearer': []}]
)
def list(self, request, *args, **kwargs):
return super().list(request, *args, **kwargs)
@swagger_schema(
responses={
200: openapi.Response('获取成功', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
description='卡片详情,包含类别专有属性'
)),
404: openapi.Response('卡片不存在', ErrorResponseSchema)
},
operation_description="获取单个卡片的详细信息,包含对应卡片类别的专有属性",
security=[{'Bearer': []}]
)
def retrieve(self, request, *args, **kwargs):
"""获取单个卡片的详情,包含专有属性"""
return super().retrieve(request, *args, **kwargs)
def get_queryset(self):
queryset = super().get_queryset()
# Filter by category if provided
category = self.request.query_params.get('category', None)
if category is not None:
queryset = queryset.filter(category=category)
# Filter by template if provided
template_id = self.request.query_params.get('template', None)
if template_id is not None:
queryset = queryset.filter(template_id=template_id)
# Filter by batch if provided
batch_id = self.request.query_params.get('batch', None)
if batch_id is not None:
queryset = queryset.filter(batch_id=batch_id)
# Filter by status if provided
status_param = self.request.query_params.get('status', None)
if status_param is not None:
queryset = queryset.filter(status=status_param)
# Filter by manufactured status if provided
manufactured = self.request.query_params.get('manufactured', None)
if manufactured is not None:
manufactured = manufactured.lower() == 'true'
queryset = queryset.filter(manufactured=manufactured)
# Filter by user if provided
user_id = self.request.query_params.get('user', None)
if user_id is not None:
queryset = queryset.filter(user_id=user_id)
return queryset
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
@swagger_schema(
request_schema=CardScanSerializer,
responses={
200: openapi.Response('扫描成功', CardScanResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('卡片不存在', ErrorResponseSchema)
},
operation_description="扫描卡片,获取卡片详细信息",
tags=['卡片使用'],
security=[{'Bearer': []}]
)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated], authentication_classes=[RedisTokenAuthentication])
def scan(self, request):
"""
扫描卡片
通过卡片ID或编码扫描卡片获取卡片详细信息。
"""
serializer = CardScanSerializer(data=request.data)
if serializer.is_valid():
unique_id = serializer.validated_data.get('unique_id')
try:
if unique_id:
card = Card.objects.get(unique_id=unique_id)
else:
return Response({
'error': 'unique_id must be provided.'
}, status=status.HTTP_400_BAD_REQUEST)
# 记录扫描操作
CardUsageLog.objects.create(
user=request.user,
action='scan',
old_status=card.status,
new_status=card.status,
ip_address=self.get_client_ip(request)
)
return Response({
'message': 'Card scanned successfully.',
'card': CardDetailSerializer(card).data
})
except Card.DoesNotExist:
return Response({
'error': 'Card not found.'
}, status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@swagger_schema(
request_schema=CardScanSerializer,
responses={
200: openapi.Response('扫描成功', CardScanResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('卡片不存在', ErrorResponseSchema)
},
operation_description="公开扫描卡片API无需鉴权通过unique_id获取卡片详细信息",
tags=['公开接口']
)
@action(detail=False, methods=['post'], permission_classes=[], authentication_classes=[])
def scan_public(self, request):
"""
公开扫描卡片 (无需鉴权)
通过卡片的唯一标识(unique_id)扫描卡片,无需鉴权,获取卡片详细信息。
仅当卡片状态为'active'(可用)时才返回卡片详情,否则返回相应的提示信息。
"""
serializer = CardScanSerializer(data=request.data)
if serializer.is_valid():
unique_id = serializer.validated_data.get('unique_id')
try:
card = Card.objects.get(unique_id=unique_id)
# 检查卡片状态
if card.status != 'active':
# 不同状态返回不同的提示信息
status_messages = {
'inactive': '此卡片尚未激活,暂时不可用。',
'used': '此卡片已被使用。',
'void': '此卡片已作废。'
}
message = status_messages.get(card.status, '此卡片当前不可用。')
return Response({
'error': message
}, status=status.HTTP_400_BAD_REQUEST)
# 状态检查通过,返回卡片详情
return Response({
'message': '卡片扫描成功。',
'card': CardDetailSerializer(card).data
})
except Card.DoesNotExist:
return Response({
'error': '卡片不存在。'
}, status=status.HTTP_404_NOT_FOUND)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@swagger_schema(
request_schema=CardUseSerializer,
responses={
200: openapi.Response('使用成功', CardUseResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('卡片不存在', ErrorResponseSchema)
},
operation_description="使用卡片,激活卡片功能",
tags=['卡片使用'],
security=[{'Bearer': []}]
)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated], authentication_classes=[RedisTokenAuthentication])
def use(self, request):
"""
使用卡片
使用卡片,激活卡片功能。
根据卡片类型提供不同的功能,如解锁歌曲、获取道具等。
"""
serializer = CardUseSerializer(data=request.data)
if serializer.is_valid():
unique_id = serializer.validated_data.get('unique_id')
try:
with transaction.atomic():
if unique_id:
card = Card.objects.select_for_update().get(unique_id=unique_id)
else:
return Response({
'error': 'unique_id must be provided.'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查用户是否已经拥有此卡片
if card.user == request.user:
return Response({
'error': '您已经拥有此卡片,无需再次使用。'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查卡片是否属于当前用户
if card.user and card.user != request.user:
return Response({
'error': 'This card belongs to another user.'
}, status=status.HTTP_400_BAD_REQUEST)
# 检查卡片是否已经被使用
if card.status != 'active':
return Response({
'error': f'Card cannot be used. Current status: {card.get_status_display()}'
}, status=status.HTTP_400_BAD_REQUEST)
# 记录卡片使用前的状态
old_status = card.status
# 如果卡片没有用户,则分配给当前用户
if not card.user:
card.user = request.user
# 将卡片状态改为已使用
card.status = 'used'
card.used_at = timezone.now()
card.save()
# 记录使用操作
CardUsageLog.objects.create(
card=card,
user=request.user,
action='use',
old_status=old_status,
new_status=card.status,
ip_address=self.get_client_ip(request)
)
return Response({
'message': 'Card used successfully.',
'card': CardDetailSerializer(card).data
})
except Card.DoesNotExist:
return Response({
'error': 'Card not found.'
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error using card: {str(e)}")
return Response({
'error': str(e)
}, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class CardBatchViewSet(viewsets.ModelViewSet):
"""
卡片批次管理接口
提供卡片批次的增删改查功能。
支持卡片批次的生成、导出、发布等功能。
"""
queryset = CardBatch.objects.all()
serializer_class = CardBatchSerializer
permission_classes = [IsAdminUser]
authentication_classes = [RedisTokenAuthentication]
tags = ['card']
def get_queryset(self):
queryset = super().get_queryset()
# Filter by template if provided
template_id = self.request.query_params.get('template', None)
if template_id is not None:
queryset = queryset.filter(template_id=template_id)
# Filter by status if provided
status_param = self.request.query_params.get('status', None)
if status_param is not None:
queryset = queryset.filter(status=status_param)
# Filter by published status if provided
published = self.request.query_params.get('published', None)
if published is not None:
published = published.lower() == 'true'
queryset = queryset.filter(published=published)
# Filter by manufactured status if provided
manufactured = self.request.query_params.get('manufactured', None)
if manufactured is not None:
manufactured = manufactured.lower() == 'true'
queryset = queryset.filter(manufactured=manufactured)
return queryset
@swagger_schema(
request_schema=CardBatchGenerateSerializer,
responses={
201: openapi.Response('生成成功', CardBatchGenerateResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('模板不存在', ErrorResponseSchema)
},
operation_description="根据模板生成卡片批次",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=False, methods=['post'], authentication_classes=[RedisTokenAuthentication])
def generate(self, request):
"""
生成卡片批次
根据卡片模板生成一个新的卡片批次,并创建指定数量的卡片。
"""
serializer = CardBatchGenerateSerializer(data=request.data)
if serializer.is_valid():
template_id = serializer.validated_data['template'].id
quantity = serializer.validated_data['quantity']
description = serializer.validated_data.get('description', '')
try:
template = CardTemplate.objects.get(id=template_id)
# Check if template is published
if template.status != 'published':
return Response({
'error': 'Template is not published. Only published templates can be used to generate cards.'
}, status=status.HTTP_400_BAD_REQUEST)
# Create batch
batch_number = self.generate_batch_number(template)
batch = CardBatch.objects.create(
template=template,
batch_number=batch_number,
quantity=quantity,
description=description,
category=template.category,
status='draft',
)
# 只创建少量样本卡片不是全部quantity
sample_count = min(10, quantity) # 最多创建10张样本卡片
# Generate sample cards
card_ids = []
for i in range(1, sample_count + 1):
unique_id = f"{batch_number}-{i:03d}"
card = Card(
batch=batch,
template=template,
unique_id=unique_id,
name=template.name,
category=template.category,
price=template.price,
status='inactive', # 默认为未激活状态
)
card_ids.append(unique_id)
card.save()
# 设置批次的起始ID和结束ID
if card_ids:
# 起始ID是第一张卡的ID
batch.start_id = f"{batch_number}-001"
# 结束ID是根据quantity计算的最后一张卡的ID
batch.end_id = f"{batch_number}-{quantity:03d}"
batch.save(update_fields=["start_id", "end_id"])
return Response({
'message': 'Batch and cards generated successfully.',
'batch': CardBatchSerializer(batch).data,
'cards_count': sample_count
}, status=status.HTTP_201_CREATED)
except CardTemplate.DoesNotExist:
return Response({
'error': 'Template not found.'
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error generating batch: {str(e)}")
return Response({
'error': f'An error occurred: {str(e)}'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def generate_batch_number(self, template):
"""生成唯一的批次号使用时间戳和模板ID确保唯一性"""
import random
prefix = template.category.upper()
timestamp = timezone.now().strftime('%y%m%d%H%M')
# 使用模板ID作为后缀的一部分
batch_count = CardBatch.objects.filter(template=template).count() + 1
return f"{prefix}-{timestamp}-{template.id}{batch_count:02d}"
@swagger_schema(
responses={
200: openapi.Response('导出成功'),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('批次不存在', ErrorResponseSchema)
},
operation_description="导出卡片批次为Excel或CSV文件",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=True, methods=['get'], url_path='export(?:/(?P<export_format>[^/.]+))?', authentication_classes=[RedisTokenAuthentication])
def export(self, request, pk=None, export_format=None):
"""
导出卡片批次
将卡片批次导出为Excel或CSV文件包含批次内所有卡片的详细信息。
可通过URL路径指定导出格式(如: /export/xlsx 或 /export/csv)支持xlsx和csv。
默认格式为xlsx。
"""
try:
batch = self.get_object()
# Get requested format (default to xlsx)
export_format = export_format or request.query_params.get('format', 'xlsx').lower()
# Get cards in this batch
cards = Card.objects.filter(batch=batch).order_by('id')
# Create a map of existing cards by their unique_id
card_map = {card.unique_id: card for card in cards}
# Define file headers - remove ID and Created At columns
headers = ['Unique ID', 'Status', 'Template', 'Category', 'Rarity', 'Type']
# Get batch number and total quantity
batch_number = batch.batch_number
total_quantity = batch.quantity
if export_format == 'csv':
# Create CSV response
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename=card_batch_{batch.id}.csv'
# Create CSV writer
writer = csv.writer(response)
# Write header
writer.writerow(headers)
# Write data rows for all cards in batch quantity
for i in range(1, total_quantity + 1):
unique_id = f"{batch_number}-{i:03d}"
# Check if card exists
if unique_id in card_map:
card = card_map[unique_id]
writer.writerow([
card.unique_id,
card.get_status_display(),
card.template.name if card.template else '',
card.get_category_display(),
card.template.get_rarity_display() if card.template else '',
card.template.get_card_type_display() if card.template else ''
])
else:
# For non-existent cards, create a placeholder row
writer.writerow([
unique_id,
'未激活', # Inactive status
batch.template.name if batch.template else '',
batch.get_category_display(),
batch.template.get_rarity_display() if batch.template else '',
batch.template.get_card_type_display() if batch.template else ''
])
return response
else: # xlsx format
# Create workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = f"Batch {batch.id}"
# Add header
for col_num, column_title in enumerate(headers, 1):
cell = ws.cell(row=1, column=col_num)
cell.value = column_title
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal='center')
cell.fill = PatternFill(start_color='E6E6E6', end_color='E6E6E6', fill_type='solid')
# Add data rows for all cards in batch quantity
for i in range(1, total_quantity + 1):
row_num = i + 1 # +1 for header row
unique_id = f"{batch_number}-{i:03d}"
# Check if card exists
if unique_id in card_map:
card = card_map[unique_id]
ws.cell(row=row_num, column=1).value = card.unique_id
ws.cell(row=row_num, column=2).value = card.get_status_display()
ws.cell(row=row_num, column=3).value = card.template.name if card.template else ''
ws.cell(row=row_num, column=4).value = card.get_category_display()
ws.cell(row=row_num, column=5).value = card.template.get_rarity_display() if card.template else ''
ws.cell(row=row_num, column=6).value = card.template.get_card_type_display() if card.template else ''
else:
# For non-existent cards, create a placeholder row
ws.cell(row=row_num, column=1).value = unique_id
ws.cell(row=row_num, column=2).value = '未激活' # Inactive status
ws.cell(row=row_num, column=3).value = batch.template.name if batch.template else ''
ws.cell(row=row_num, column=4).value = batch.get_category_display()
ws.cell(row=row_num, column=5).value = batch.template.get_rarity_display() if batch.template else ''
ws.cell(row=row_num, column=6).value = batch.template.get_card_type_display() if batch.template else ''
# Adjust column width
for column in ws.columns:
max_length = 0
column_letter = openpyxl.utils.get_column_letter(column[0].column)
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column_letter].width = adjusted_width
# Save to buffer
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
# Create response
response = HttpResponse(
buffer.getvalue(),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
response['Content-Disposition'] = f'attachment; filename=card_batch_{batch.id}.xlsx'
return response
except Exception as e:
logger.error(f"Error exporting batch: {str(e)}")
return Response({
'error': f'An error occurred: {str(e)}'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@swagger_schema(
responses={
200: openapi.Response('标记成功', CardTemplateResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('批次不存在', ErrorResponseSchema)
},
operation_description="将批次标记为已制造",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=True, methods=['post'], authentication_classes=[RedisTokenAuthentication])
def mark_produced(self, request, pk=None):
"""
标记批次为已制造
将卡片批次标记为已制造,表示实体卡片已经生产完成。
"""
try:
batch = self.get_object()
if batch.status == 'produced':
return Response({
'error': 'This batch is already marked as produced.'
}, status=status.HTTP_400_BAD_REQUEST)
# Update batch status to produced
batch.status = 'produced'
batch.manufactured_at = timezone.now()
batch.save()
# Update all cards in this batch
Card.objects.filter(batch=batch).update(status='produced')
return Response({
'message': 'Batch successfully marked as produced.',
'batch': CardBatchSerializer(batch).data
})
except Exception as e:
logger.error(f"Error marking batch as manufactured: {str(e)}")
return Response({
'error': f'An error occurred: {str(e)}'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@swagger_schema(
responses={
200: openapi.Response('发布成功', CardTemplateResponseSchema),
400: openapi.Response('请求错误', ErrorResponseSchema),
404: openapi.Response('批次不存在', ErrorResponseSchema)
},
operation_description="发布卡片批次,激活其中的卡片",
tags=['卡片管理'],
security=[{'Bearer': []}]
)
@action(detail=True, methods=['post'], authentication_classes=[RedisTokenAuthentication])
def publish(self, request, pk=None):
"""
发布卡片批次
发布卡片批次,使批次中的卡片变为可激活状态。
发布后,卡片可以被用户扫描和使用。
"""
try:
batch = self.get_object()
if batch.published:
return Response({
'error': 'This batch is already published.'
}, status=status.HTTP_400_BAD_REQUEST)
# Check if batch is manufactured
if not batch.manufactured:
return Response({
'error': 'Batch must be marked as manufactured before publishing.'
}, status=status.HTTP_400_BAD_REQUEST)
# Update batch and all cards
batch.published = True
batch.published_at = timezone.now()
batch.save()
Card.objects.filter(batch=batch, status='inactive').update(
status='activated',
activated_at=timezone.now()
)
return Response({
'message': 'Batch successfully published.',
'batch': CardBatchSerializer(batch).data
})
except Exception as e:
logger.error(f"Error publishing batch: {str(e)}")
return Response({
'error': f'An error occurred: {str(e)}'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class UserCardListView(generics.ListAPIView):
"""
用户卡片列表接口
获取当前用户拥有的所有卡片。
"""
serializer_class = CardSerializer
permission_classes = [IsAuthenticated]
authentication_classes = [RedisTokenAuthentication]
tags = ['card']
@swagger_schema(
responses={
200: openapi.Response('查询成功', CardListResponseSchema)
},
operation_description="获取当前用户的所有卡片",
tags=['卡片使用'],
security=[{'Bearer': []}]
)
def get_queryset(self):
"""Return the cards owned by the current user"""
return Card.objects.filter(user=self.request.user)
def list(self, request, *args, **kwargs):
queryset = self.get_queryset()
# Filter by category if provided
category = self.request.query_params.get('category', None)
if category is not None:
queryset = queryset.filter(category=category)
# Filter by status if provided
status_param = self.request.query_params.get('status', None)
if status_param is not None:
queryset = queryset.filter(status=status_param)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class CategoryCardTemplateListView(generics.ListAPIView):
"""
分类卡片模板列表接口
获取特定分类的卡片模板列表。
支持分页查询。
可获取各分类特有的属性数据。
"""
serializer_class = CategoryTemplateSerializer
permission_classes = [IsAdminOrReadOnly]
authentication_classes = [RedisTokenAuthentication]
tags = ['card']
def get_queryset(self):
"""根据URL参数获取相应分类的卡片模板"""
category = self.kwargs.get('category')
queryset = CardTemplate.objects.filter(category=category)
# Filter by status if provided
status_param = self.request.query_params.get('status', None)
if status_param is not None:
queryset = queryset.filter(status=status_param)
# Filter by rarity if provided
rarity = self.request.query_params.get('rarity', None)
if rarity is not None:
queryset = queryset.filter(rarity=rarity)
# Filter by card_type if provided
card_type = self.request.query_params.get('card_type', None)
if card_type is not None:
queryset = queryset.filter(card_type=card_type)
return queryset
@swagger_schema(
responses={
200: openapi.Response('查询成功', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'count': openapi.Schema(type=openapi.TYPE_INTEGER, description='总数'),
'next': openapi.Schema(type=openapi.TYPE_STRING, description='下一页URL', nullable=True),
'previous': openapi.Schema(type=openapi.TYPE_STRING, description='上一页URL', nullable=True),
'results': openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(type=openapi.TYPE_OBJECT, description='卡片模板'),
),
},
)),
},
operation_description="获取特定分类的卡片模板列表,包含该分类特有的属性",
tags=['卡片查询'],
manual_parameters=[
openapi.Parameter('status', openapi.IN_QUERY, description="按状态筛选", type=openapi.TYPE_STRING, required=False),
openapi.Parameter('rarity', openapi.IN_QUERY, description="按稀有度筛选", type=openapi.TYPE_STRING, required=False),
openapi.Parameter('card_type', openapi.IN_QUERY, description="按卡片类型筛选", type=openapi.TYPE_STRING, required=False),
],
security=[{'Bearer': []}]
)
def list(self, request, *args, **kwargs):
"""实现分页的列表查询"""
return super().list(request, *args, **kwargs)
class MobileProductsDownloadView(generics.GenericAPIView):
"""
手机端产品数据下载接口
将所有已发布的产品(服装、道具、歌曲、舞蹈、家居装饰、食物)
整合为一个 JSON 文件供手机端下载。
"""
permission_classes = []
authentication_classes = []
def get(self, request):
from food_app.models import Food
from food_app.serializers import FoodSerializer
templates = CardTemplate.objects.filter(
status='published',
).order_by('category', '-published_at')
categories = ['clothing', 'prop', 'song', 'dance', 'decoration']
category_labels = {
'clothing': '服装',
'prop': '道具',
'song': '歌曲',
'dance': '舞蹈',
'decoration': '家居装饰',
}
def normalize_image_url(items):
"""统一 image_url 字段:有值则保留,空值设为 null删除多余的 image 字段"""
for item in items:
# 兼容 food_app 使用 image 而非 image_url
if 'image_url' not in item and 'image' in item:
item['image_url'] = item['image']
# 删除原始 image 字段,只保留 image_url
item.pop('image', None)
# 空字符串统一为 null
if not item.get('image_url'):
item['image_url'] = None
return items
data = {}
for cat in categories:
cat_templates = templates.filter(category=cat)
if cat_templates.exists():
serializer = MobileProductSerializer(cat_templates, many=True)
items = normalize_image_url(serializer.data)
data[cat] = {
'label': category_labels.get(cat, cat),
'count': cat_templates.count(),
'items': items,
}
# 添加食物数据(来自独立的 food_app 模块)
published_foods = Food.objects.filter(status='published').order_by('-published_at')
if published_foods.exists():
food_serializer = FoodSerializer(published_foods, many=True)
items = normalize_image_url(food_serializer.data)
data['food'] = {
'label': '食物',
'count': published_foods.count(),
'items': items,
}
content = json.dumps(data, ensure_ascii=False, indent=2)
response = HttpResponse(content, content_type='application/json; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="products.json"'
return response