"""用户管理路由""" from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from database import get_db from models import User, Role, PhaseGroup from schemas import UserCreate, UserUpdate, UserOut, ChangePasswordRequest from auth import get_current_user, hash_password, verify_password, require_permission router = APIRouter(prefix="/api/users", tags=["用户管理"]) def user_to_out(u: User, hide_cost: bool = False) -> UserOut: return UserOut( id=u.id, username=u.username, name=u.name, phase_group=u.phase_group.value if hasattr(u.phase_group, 'value') else u.phase_group, role_id=u.role_id, role_name=u.role_name, permissions=u.permissions, monthly_salary=0 if hide_cost else u.monthly_salary, bonus=0 if hide_cost else (u.bonus or 0), social_insurance=0 if hide_cost else (u.social_insurance or 0), monthly_total_cost=0 if hide_cost else u.monthly_total_cost, daily_cost=0 if hide_cost else u.daily_cost, is_active=u.is_active, created_at=u.created_at, ) def _can_view_cost(user: User) -> bool: return "user:view_cost" in (user.permissions or []) @router.get("", response_model=List[UserOut]) def list_users( db: Session = Depends(get_db), current_user: User = Depends(require_permission("user:view")) ): hide = not _can_view_cost(current_user) users = db.query(User).order_by(User.created_at.desc()).all() return [user_to_out(u, hide_cost=hide) for u in users] @router.post("", response_model=UserOut) def create_user( req: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(require_permission("user:manage")) ): if db.query(User).filter(User.username == req.username).first(): raise HTTPException(status_code=400, detail="用户名已存在") user = User( username=req.username, password_hash=hash_password(req.password), name=req.name, phase_group=PhaseGroup(req.phase_group), role_id=req.role_id, monthly_salary=req.monthly_salary, bonus=req.bonus, social_insurance=req.social_insurance, ) db.add(user) db.commit() db.refresh(user) return user_to_out(user) @router.put("/{user_id}", response_model=UserOut) def update_user( user_id: int, req: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_permission("user:manage")) ): user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") if req.name is not None: user.name = req.name if req.phase_group is not None: user.phase_group = PhaseGroup(req.phase_group) if req.role_id is not None: user.role_id = req.role_id if req.monthly_salary is not None: user.monthly_salary = req.monthly_salary if req.bonus is not None: user.bonus = req.bonus if req.social_insurance is not None: user.social_insurance = req.social_insurance if req.is_active is not None: user.is_active = req.is_active if req.password: user.password_hash = hash_password(req.password) db.commit() db.refresh(user) return user_to_out(user) @router.post("/change-password") def change_password( req: ChangePasswordRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): if not verify_password(req.old_password, current_user.password_hash): raise HTTPException(status_code=400, detail="原密码错误") if len(req.new_password) < 4: raise HTTPException(status_code=400, detail="新密码至少4位") current_user.password_hash = hash_password(req.new_password) db.commit() return {"message": "密码修改成功"} @router.get("/{user_id}", response_model=UserOut) def get_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="用户不存在") hide = not _can_view_cost(current_user) return user_to_out(user, hide_cost=hide)