"""角色管理路由""" from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from database import get_db from models import Role, User, ALL_PERMISSIONS, PERMISSION_KEYS from auth import get_current_user, require_permission router = APIRouter(prefix="/api/roles", tags=["角色管理"]) @router.get("/permissions") def get_all_permissions(current_user: User = Depends(get_current_user)): """获取系统全部权限定义(供前端勾选面板使用)""" groups = {} for key, label, group in ALL_PERMISSIONS: if group not in groups: groups[group] = [] groups[group].append({"key": key, "label": label}) return [{"group": g, "permissions": perms} for g, perms in groups.items()] @router.get("") def list_roles( db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): roles = db.query(Role).order_by(Role.is_system.desc(), Role.id).all() return [ { "id": r.id, "name": r.name, "description": r.description, "permissions": r.permissions or [], "is_system": bool(r.is_system), "user_count": db.query(User).filter(User.role_id == r.id).count(), "created_at": r.created_at, } for r in roles ] @router.post("") def create_role( req: dict, db: Session = Depends(get_db), current_user: User = Depends(require_permission("role:manage")) ): name = req.get("name", "").strip() if not name: raise HTTPException(status_code=400, detail="角色名称不能为空") if db.query(Role).filter(Role.name == name).first(): raise HTTPException(status_code=400, detail="角色名称已存在") perms = [p for p in req.get("permissions", []) if p in PERMISSION_KEYS] role = Role( name=name, description=req.get("description", ""), permissions=perms, is_system=0, ) db.add(role) db.commit() db.refresh(role) return {"id": role.id, "name": role.name, "message": "角色已创建"} @router.put("/{role_id}") def update_role( role_id: int, req: dict, db: Session = Depends(get_db), current_user: User = Depends(require_permission("role:manage")) ): role = db.query(Role).filter(Role.id == role_id).first() if not role: raise HTTPException(status_code=404, detail="角色不存在") name = req.get("name") if name is not None: name = name.strip() existing = db.query(Role).filter(Role.name == name, Role.id != role_id).first() if existing: raise HTTPException(status_code=400, detail="角色名称已存在") role.name = name if "description" in req: role.description = req["description"] if "permissions" in req: role.permissions = [p for p in req["permissions"] if p in PERMISSION_KEYS] db.commit() return {"message": "角色已更新"} @router.delete("/{role_id}") def delete_role( role_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_permission("role:manage")) ): role = db.query(Role).filter(Role.id == role_id).first() if not role: raise HTTPException(status_code=404, detail="角色不存在") if role.is_system: raise HTTPException(status_code=400, detail="系统内置角色不可删除") user_count = db.query(User).filter(User.role_id == role_id).count() if user_count > 0: raise HTTPException(status_code=400, detail=f"该角色下还有 {user_count} 个用户,请先转移用户再删除") db.delete(role) db.commit() return {"message": "角色已删除"}