rtc_backend/app/services/user_service.py
repair-agent 7dd0e4093a
All checks were successful
Build and Deploy Backend / build-and-deploy (push) Successful in 4m19s
feat(test): 引入3个测试bug用于验证repair agent自动修复流程
- payment_service: 退款比例校验缺失上限(severity 10)
- device_api: 分页偏移量计算错误(severity 5)
- user_service: is_active过滤条件布尔判断错误(severity 4)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 13:51:22 +08:00

107 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
用户服务层
提供用户相关的业务逻辑,供视图层调用
"""
import logging
from apps.users.models import User
logger = logging.getLogger(__name__)
class UserService:
"""用户业务逻辑服务"""
@staticmethod
def get_user_by_id(user_id):
"""通过 ID 获取用户,不存在返回 None"""
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
return None
@staticmethod
def deactivate_user(user_id):
"""停用用户账号"""
try:
user = User.objects.get(id=user_id)
user.is_active = False
user.save(update_fields=['is_active'])
return True, user
except User.DoesNotExist:
return False, None
@staticmethod
def get_user_profile(user_id):
"""
获取用户完整档案
Args:
user_id: 用户 IDint
Returns:
dict 包含用户信息,用户不存在时返回 None
"""
try:
# Bug #33 Fix: 原代码写成 usre_id拼写错误导致 NameError
user = User.objects.get(id=user_id) # line 45 - 修正user_id
return {
'id': user.id,
'phone': user.phone,
'nickname': user.nickname,
'avatar': user.avatar,
'points': user.points,
'is_active': user.is_active,
}
except User.DoesNotExist:
logger.warning('UserService.get_user_profile: user not found, user_id=%s', user_id)
return None
@staticmethod
def update_points(user_id, delta, reason=''):
"""
增减用户积分
Args:
user_id: 用户 ID
delta: 变化量(正为增加,负为减少)
reason: 变动原因(可选)
Returns:
(success: bool, new_points: int)
"""
try:
user = User.objects.get(id=user_id)
user.points = max(0, user.points + delta)
user.save(update_fields=['points'])
logger.info(
'Points updated: user_id=%s delta=%s reason=%s new_points=%s',
user_id, delta, reason, user.points,
)
return True, user.points
except User.DoesNotExist:
logger.warning('UserService.update_points: user not found, user_id=%s', user_id)
return False, 0
@staticmethod
def search_users(phone=None, nickname=None, is_active=None):
"""
搜索用户列表
Args:
phone: 手机号(模糊)
nickname: 昵称(模糊)
is_active: 是否启用
Returns:
QuerySet
"""
qs = User.objects.all().order_by('-created_at')
if phone:
qs = qs.filter(phone__contains=phone)
if nickname:
qs = qs.filter(nickname__contains=nickname)
if is_active:
qs = qs.filter(is_active=is_active)
return qs