2026-02-12 14:24:05 +08:00

153 lines
5.4 KiB
Python

"""仪表盘 + 结算路由"""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func
from datetime import date, timedelta
from database import get_db
from models import (
User, UserRole, Project, Submission, AIToolCost,
ProjectStatus, ProjectType, WorkType
)
from auth import get_current_user, require_role
from calculations import (
calc_project_settlement, calc_waste_for_project,
calc_labor_cost_for_project, calc_ai_tool_cost_for_project,
calc_outsource_cost_for_project, calc_team_efficiency
)
router = APIRouter(prefix="/api", tags=["仪表盘与结算"])
@router.get("/dashboard")
def get_dashboard(
db: Session = Depends(get_db),
current_user: User = Depends(require_role(UserRole.OWNER))
):
"""全局仪表盘数据"""
# 项目概览
active = db.query(Project).filter(Project.status == ProjectStatus.IN_PROGRESS).all()
completed = db.query(Project).filter(Project.status == ProjectStatus.COMPLETED).all()
# 当月日期范围
today = date.today()
month_start = today.replace(day=1)
# 本月人力成本(简化:统计本月所有有提交的人的日成本)
monthly_submitters = db.query(Submission.user_id, Submission.submit_date).filter(
Submission.submit_date >= month_start,
Submission.submit_date <= today,
).distinct().all()
monthly_labor = 0.0
processed_user_dates = set()
for uid, d in monthly_submitters:
key = (uid, d)
if key not in processed_user_dates:
processed_user_dates.add(key)
user = db.query(User).filter(User.id == uid).first()
if user:
monthly_labor += user.daily_cost
# 本月 AI 工具成本
monthly_ai = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.record_date >= month_start,
AIToolCost.record_date <= today,
).scalar() or 0
# 本月总产出秒数
monthly_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.submit_date >= month_start,
Submission.submit_date <= today,
Submission.total_seconds > 0,
).scalar() or 0
# 活跃人数
active_users = db.query(Submission.user_id).filter(
Submission.submit_date >= month_start,
).distinct().count()
working_days = max(1, (today - month_start).days + 1)
avg_daily = round(monthly_secs / max(1, active_users) / working_days, 1)
# 各项目摘要
project_summaries = []
for p in active:
waste = calc_waste_for_project(p.id, db)
total_secs = waste.get("total_submitted_seconds", 0)
target = p.target_total_seconds
progress = round(total_secs / target * 100, 1) if target > 0 else 0
is_overdue = (
p.estimated_completion_date and today > p.estimated_completion_date
)
project_summaries.append({
"id": p.id,
"name": p.name,
"project_type": p.project_type.value if hasattr(p.project_type, 'value') else p.project_type,
"progress_percent": progress,
"target_seconds": target,
"submitted_seconds": total_secs,
"waste_rate": waste.get("waste_rate", 0),
"is_overdue": bool(is_overdue),
"estimated_completion_date": str(p.estimated_completion_date) if p.estimated_completion_date else None,
})
# 损耗排行
waste_ranking = []
for p in active + completed:
w = calc_waste_for_project(p.id, db)
if w.get("total_waste_seconds", 0) > 0:
waste_ranking.append({
"project_id": p.id,
"project_name": p.name,
"waste_seconds": w["total_waste_seconds"],
"waste_rate": w["waste_rate"],
})
waste_ranking.sort(key=lambda x: x["waste_rate"], reverse=True)
# 已结算项目
settled = []
for p in completed:
settlement = calc_project_settlement(p.id, db)
settled.append({
"project_id": p.id,
"project_name": p.name,
"project_type": settlement.get("project_type", ""),
"total_cost": settlement.get("total_cost", 0),
"contract_amount": settlement.get("contract_amount"),
"profit_loss": settlement.get("profit_loss"),
})
return {
"active_projects": len(active),
"completed_projects": len(completed),
"monthly_labor_cost": round(monthly_labor, 2),
"monthly_ai_tool_cost": round(monthly_ai, 2),
"monthly_total_seconds": round(monthly_secs, 1),
"avg_daily_seconds_per_person": avg_daily,
"projects": project_summaries,
"waste_ranking": waste_ranking,
"settled_projects": settled,
}
@router.get("/projects/{project_id}/settlement")
def get_settlement(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_role(UserRole.OWNER))
):
"""项目结算报告"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
return calc_project_settlement(project_id, db)
@router.get("/projects/{project_id}/efficiency")
def get_efficiency(
project_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_role(UserRole.OWNER, UserRole.SUPERVISOR, UserRole.LEADER))
):
"""项目团队效率数据"""
return calc_team_efficiency(project_id, db)