seaislee1209 aad9bd683b feat(core/backend): account settings endpoints (profile PATCH / change password / avatar upload)
§4 settings-save backend (no schema change; User already has phone/avatar_url):
- me/ now GET+PATCH (update name/phone/email)
- POST me/password/ — verify old password, set new (>=8), reissue token
- POST me/avatar/ — multipart -> TOS upload -> presigned avatar_url
Verified: profile PATCH 200, password change round-trip 200, original login restored.
Note: notification/theme prefs have no User storage field -> will persist client-side (no migrate per rules).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 15:41:05 +08:00

226 lines
9.2 KiB
Python

import uuid
from pathlib import Path
from django.contrib.auth import authenticate
from django.db import transaction
from rest_framework import status
from rest_framework.authtoken.models import Token
from rest_framework.decorators import api_view, parser_classes, permission_classes
from rest_framework.parsers import FormParser, MultiPartParser
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from apps.common.api import get_current_team
from .models import TeamMember, User
from .serializers import LoginSerializer, RegisterSerializer, TeamMemberSerializer, TeamSerializer, UserSerializer
def auth_payload(user, team, token):
return {
"token": token.key,
"user": UserSerializer(user).data,
"team": TeamSerializer(team).data,
}
@api_view(["POST"])
@permission_classes([])
def register(request):
serializer = RegisterSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.save()
token, _ = Token.objects.get_or_create(user=data["user"])
return Response(auth_payload(data["user"], data["team"], token), status=status.HTTP_201_CREATED)
@api_view(["POST"])
@permission_classes([])
def login(request):
serializer = LoginSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = authenticate(
request,
username=serializer.validated_data["username"],
password=serializer.validated_data["password"],
)
if user is None or user.is_disabled:
return Response({"detail": "invalid credentials"}, status=status.HTTP_400_BAD_REQUEST)
team = get_current_team(user)
token, _ = Token.objects.get_or_create(user=user)
return Response(auth_payload(user, team, token))
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def logout(request):
Token.objects.filter(user=request.user).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
@api_view(["GET", "PATCH"])
@permission_classes([IsAuthenticated])
def me(request):
user = request.user
if request.method == "PATCH":
if "name" in request.data:
user.first_name = str(request.data.get("name") or "").strip()
if "phone" in request.data:
user.phone = str(request.data.get("phone") or "").strip()[:32]
email = str(request.data.get("email") or "").strip()
if email:
user.email = email
user.save(update_fields=["first_name", "phone", "email"])
team = get_current_team(user)
return Response(
{
"user": UserSerializer(user).data,
"team": TeamSerializer(team).data,
}
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def change_password(request):
user = request.user
old_password = str(request.data.get("old_password") or "")
new_password = str(request.data.get("new_password") or "").strip()
if not user.check_password(old_password):
return Response({"old_password": ["原密码不正确"]}, status=status.HTTP_400_BAD_REQUEST)
if len(new_password) < 8:
return Response({"new_password": ["新密码至少 8 位"]}, status=status.HTTP_400_BAD_REQUEST)
user.set_password(new_password)
user.save(update_fields=["password"])
Token.objects.filter(user=user).delete()
token, _ = Token.objects.get_or_create(user=user)
return Response({"token": token.key})
@api_view(["POST"])
@parser_classes([MultiPartParser, FormParser])
@permission_classes([IsAuthenticated])
def update_avatar(request):
from apps.assets.storage import TosStorage
upload = request.FILES.get("file")
if upload is None:
return Response({"detail": "no file"}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
suffix = Path(upload.name).suffix.lower() or ".png"
object_key = f"users/{user.id}/avatar/{uuid.uuid4()}{suffix}"
storage = TosStorage()
storage.upload_fileobj(
fileobj=upload.file,
object_key=object_key,
content_type=upload.content_type or "image/png",
)
# 头像直接存可访问的预签名 URL(长有效期);后续如需永久化可改为读时签发
user.avatar_url = storage.presigned_get_url(object_key=object_key, expires_in=7 * 24 * 3600)
user.save(update_fields=["avatar_url"])
return Response(UserSerializer(user).data)
def normalize_member_role(role):
if role == "super":
return TeamMember.Role.OWNER
if role in {TeamMember.Role.OWNER, TeamMember.Role.ADMIN, TeamMember.Role.MEMBER, TeamMember.Role.VIEWER}:
return role
return TeamMember.Role.MEMBER
def can_manage_team(user, team):
member = TeamMember.objects.filter(team=team, user=user, status=TeamMember.Status.ACTIVE).first()
return bool(member and member.role in {TeamMember.Role.OWNER, TeamMember.Role.ADMIN})
@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def team_members(request):
team = get_current_team(request.user)
if request.method == "GET":
members = TeamMember.objects.filter(team=team).select_related("user").order_by("created_at")
return Response(TeamMemberSerializer(members, many=True).data)
if not can_manage_team(request.user, team):
return Response({"detail": "permission denied"}, status=status.HTTP_403_FORBIDDEN)
username = str(request.data.get("username") or "").strip()
password = str(request.data.get("password") or "").strip()
if not username:
return Response({"username": ["This field is required."]}, status=status.HTTP_400_BAD_REQUEST)
if len(password) < 8:
return Response({"password": ["Ensure this field has at least 8 characters."]}, status=status.HTTP_400_BAD_REQUEST)
if User.objects.filter(username=username).exists():
return Response({"username": ["username already exists"]}, status=status.HTTP_400_BAD_REQUEST)
email = str(request.data.get("email") or "").strip() or f"{username}@airshelf.local"
role = normalize_member_role(request.data.get("role"))
if role == TeamMember.Role.OWNER:
role = TeamMember.Role.ADMIN
with transaction.atomic():
user = User.objects.create_user(username=username, password=password, email=email)
user.first_name = str(request.data.get("name") or "").strip()
user.save(update_fields=["first_name"])
member = TeamMember.objects.create(
team=team,
user=user,
role=role,
monthly_credit_limit=request.data.get("monthly_credit_limit") or request.data.get("monthly") or 0,
)
return Response(TeamMemberSerializer(member).data, status=status.HTTP_201_CREATED)
@api_view(["PATCH", "DELETE"])
@permission_classes([IsAuthenticated])
def team_member_detail(request, member_id):
team = get_current_team(request.user)
if not can_manage_team(request.user, team):
return Response({"detail": "permission denied"}, status=status.HTTP_403_FORBIDDEN)
member = TeamMember.objects.select_related("user").filter(team=team, id=member_id).first()
if member is None:
return Response({"detail": "not found"}, status=status.HTTP_404_NOT_FOUND)
if member.user_id == team.owner_id:
return Response({"detail": "team owner cannot be changed"}, status=status.HTTP_400_BAD_REQUEST)
if request.method == "DELETE":
user = member.user
member.delete()
if not TeamMember.objects.filter(user=user).exists():
user.status = User.Status.DISABLED
user.save(update_fields=["status"])
Token.objects.filter(user=user).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
role = request.data.get("role")
if role:
member.role = normalize_member_role(role)
if member.role == TeamMember.Role.OWNER:
member.role = TeamMember.Role.ADMIN
if "monthly_credit_limit" in request.data or "monthly" in request.data:
member.monthly_credit_limit = request.data.get("monthly_credit_limit", request.data.get("monthly")) or 0
name = str(request.data.get("name") or "").strip()
if name:
member.user.first_name = name
member.user.save(update_fields=["first_name"])
member.save(update_fields=["role", "monthly_credit_limit", "updated_at"])
return Response(TeamMemberSerializer(member).data)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def team_member_password(request, member_id):
team = get_current_team(request.user)
if not can_manage_team(request.user, team):
return Response({"detail": "permission denied"}, status=status.HTTP_403_FORBIDDEN)
member = TeamMember.objects.select_related("user").filter(team=team, id=member_id).first()
if member is None:
return Response({"detail": "not found"}, status=status.HTTP_404_NOT_FOUND)
if member.user_id == team.owner_id:
return Response({"detail": "team owner password cannot be reset here"}, status=status.HTTP_400_BAD_REQUEST)
password = str(request.data.get("password") or "").strip()
if len(password) < 8:
return Response({"password": ["Ensure this field has at least 8 characters."]}, status=status.HTTP_400_BAD_REQUEST)
member.user.set_password(password)
member.user.save(update_fields=["password"])
Token.objects.filter(user=member.user).delete()
return Response(status=status.HTTP_204_NO_CONTENT)