seaislee1209 d9a12af078 fix: v0.8.5 安全加固 — CRITICAL/HIGH 漏洞修复
- C1/C2: 移除 settings.py 中硬编码的数据库密码和 SECRET_KEY 默认值
- K8s: DB_PASSWORD/DB_HOST/DB_USER/DJANGO_SECRET_KEY 改为 secretKeyRef
- H1: DEBUG 默认值从 True 改为 False
- H2: 登录接口添加 ScopedRateThrottle (5/min),全局限流 (anon 30/min, user 120/min)
- H4: Django Admin 仅在 DEBUG=True 时注册
- H6: PromptInput innerHTML 使用 DOMPurify 消毒防止 XSS
- H7: ALLOWED_HOSTS 从 "*" 收紧为实际域名
- H9: Nginx 添加安全响应头 + server_tokens off
- M1: 密码策略加强 (min 8 + CommonPassword + NumericPassword)
- M5: Django 生产环境安全头配置
- L1: 登录接口改为 POST-only

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 02:08:50 +08:00

115 lines
3.7 KiB
Python

from rest_framework import status
from rest_framework.decorators import api_view, permission_classes, throttle_classes
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework.throttling import ScopedRateThrottle
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth import authenticate, get_user_model
from django.utils import timezone
from django.db.models import Sum
from .serializers import UserSerializer
User = get_user_model()
class LoginRateThrottle(ScopedRateThrottle):
scope = 'login'
@api_view(['POST'])
@permission_classes([AllowAny])
def register_view(request):
"""POST /api/v1/auth/register — disabled, all accounts created by admins."""
return Response(
{'error': 'registration_disabled', 'message': '公开注册已关闭,请联系管理员'},
status=status.HTTP_403_FORBIDDEN,
)
@api_view(['POST'])
@permission_classes([AllowAny])
@throttle_classes([LoginRateThrottle])
def login_view(request):
"""POST /api/v1/auth/login"""
username = request.data.get('username', '').strip()
password = request.data.get('password', '')
# Try authenticate with username first, then email
user = authenticate(username=username, password=password)
if user is None:
# Try email login
try:
user_by_email = User.objects.get(email=username)
user = authenticate(username=user_by_email.username, password=password)
except User.DoesNotExist:
pass
if user is None:
return Response(
{'error': 'invalid_credentials', 'message': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED
)
refresh = RefreshToken.for_user(user)
return Response({
'user': UserSerializer(user).data,
'tokens': {
'access': str(refresh.access_token),
'refresh': str(refresh),
}
})
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def me_view(request):
"""GET /api/v1/auth/me — returns role, team info, and quota."""
user = request.user
today = timezone.now().date()
first_of_month = today.replace(day=1)
daily_seconds_used = user.generation_records.filter(
created_at__date=today
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
monthly_seconds_used = user.generation_records.filter(
created_at__date__gte=first_of_month
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
data = UserSerializer(user).data
data['quota'] = {
'daily_seconds_limit': user.daily_seconds_limit,
'daily_seconds_used': daily_seconds_used,
'monthly_seconds_limit': user.monthly_seconds_limit,
'monthly_seconds_used': monthly_seconds_used,
}
# Team info
team = user.team
if team:
# Team monthly consumption
from apps.generation.models import GenerationRecord
team_monthly_used = GenerationRecord.objects.filter(
user__team=team,
created_at__date__gte=first_of_month,
).aggregate(total=Sum('seconds_consumed'))['total'] or 0
data['team'] = {
'id': team.id,
'name': team.name,
'total_seconds_pool': team.total_seconds_pool,
'total_seconds_used': team.total_seconds_used,
'remaining_seconds': team.remaining_seconds,
'monthly_seconds_limit': team.monthly_seconds_limit,
'monthly_seconds_used': team_monthly_used,
'is_active': team.is_active,
}
data['team_disabled'] = not team.is_active
else:
data['team'] = None
data['team_disabled'] = False
return Response(data)