- C1/C2: 移除 settings.py 中硬编码的数据库密码和 SECRET_KEY 默认值 - K8s: DB_PASSWORD/DB_HOST/DB_USER/DJANGO_SECRET_KEY 改为 secretKeyRef - H1: DEBUG 默认值从 True 改为 False - H2: 登录接口添加 ScopedRateThrottle (5/min),全局限流 (anon 30/min, user 120/min) - H4: Django Admin 仅在 DEBUG=True 时注册 - H6: PromptInput innerHTML 使用 DOMPurify 消毒防止 XSS - H7: ALLOWED_HOSTS 从 "*" 收紧为实际域名 - H9: Nginx 添加安全响应头 + server_tokens off - M1: 密码策略加强 (min 8 + CommonPassword + NumericPassword) - M5: Django 生产环境安全头配置 - L1: 登录接口改为 POST-only Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
115 lines
3.7 KiB
Python
115 lines
3.7 KiB
Python
from rest_framework import status
|
|
from rest_framework.decorators import api_view, permission_classes, throttle_classes
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.throttling import ScopedRateThrottle
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
from django.contrib.auth import authenticate, get_user_model
|
|
from django.utils import timezone
|
|
from django.db.models import Sum
|
|
|
|
from .serializers import UserSerializer
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
class LoginRateThrottle(ScopedRateThrottle):
|
|
scope = 'login'
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
def register_view(request):
|
|
"""POST /api/v1/auth/register — disabled, all accounts created by admins."""
|
|
return Response(
|
|
{'error': 'registration_disabled', 'message': '公开注册已关闭,请联系管理员'},
|
|
status=status.HTTP_403_FORBIDDEN,
|
|
)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([AllowAny])
|
|
@throttle_classes([LoginRateThrottle])
|
|
def login_view(request):
|
|
"""POST /api/v1/auth/login"""
|
|
|
|
username = request.data.get('username', '').strip()
|
|
password = request.data.get('password', '')
|
|
|
|
# Try authenticate with username first, then email
|
|
user = authenticate(username=username, password=password)
|
|
if user is None:
|
|
# Try email login
|
|
try:
|
|
user_by_email = User.objects.get(email=username)
|
|
user = authenticate(username=user_by_email.username, password=password)
|
|
except User.DoesNotExist:
|
|
pass
|
|
|
|
if user is None:
|
|
return Response(
|
|
{'error': 'invalid_credentials', 'message': '用户名或密码错误'},
|
|
status=status.HTTP_401_UNAUTHORIZED
|
|
)
|
|
|
|
refresh = RefreshToken.for_user(user)
|
|
return Response({
|
|
'user': UserSerializer(user).data,
|
|
'tokens': {
|
|
'access': str(refresh.access_token),
|
|
'refresh': str(refresh),
|
|
}
|
|
})
|
|
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def me_view(request):
|
|
"""GET /api/v1/auth/me — returns role, team info, and quota."""
|
|
user = request.user
|
|
today = timezone.now().date()
|
|
first_of_month = today.replace(day=1)
|
|
|
|
daily_seconds_used = user.generation_records.filter(
|
|
created_at__date=today
|
|
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
|
|
|
|
monthly_seconds_used = user.generation_records.filter(
|
|
created_at__date__gte=first_of_month
|
|
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
|
|
|
|
data = UserSerializer(user).data
|
|
data['quota'] = {
|
|
'daily_seconds_limit': user.daily_seconds_limit,
|
|
'daily_seconds_used': daily_seconds_used,
|
|
'monthly_seconds_limit': user.monthly_seconds_limit,
|
|
'monthly_seconds_used': monthly_seconds_used,
|
|
}
|
|
|
|
# Team info
|
|
team = user.team
|
|
if team:
|
|
# Team monthly consumption
|
|
from apps.generation.models import GenerationRecord
|
|
team_monthly_used = GenerationRecord.objects.filter(
|
|
user__team=team,
|
|
created_at__date__gte=first_of_month,
|
|
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
|
|
|
|
data['team'] = {
|
|
'id': team.id,
|
|
'name': team.name,
|
|
'total_seconds_pool': team.total_seconds_pool,
|
|
'total_seconds_used': team.total_seconds_used,
|
|
'remaining_seconds': team.remaining_seconds,
|
|
'monthly_seconds_limit': team.monthly_seconds_limit,
|
|
'monthly_seconds_used': team_monthly_used,
|
|
'is_active': team.is_active,
|
|
}
|
|
data['team_disabled'] = not team.is_active
|
|
else:
|
|
data['team'] = None
|
|
data['team_disabled'] = False
|
|
|
|
return Response(data)
|