airlabs-manage/backend/calculations.py
2026-02-12 17:41:27 +08:00

299 lines
11 KiB
Python

"""
计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块
修改计算规则只需改此文件。
"""
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func, and_
from collections import defaultdict
from datetime import date, timedelta
from models import (
User, Project, Submission, AIToolCost, AIToolCostAllocation,
OutsourceCost, CostOverride, OverheadCost, WorkType, CostAllocationType
)
from config import WORKING_DAYS_PER_MONTH
# ──────────────────────────── 人力成本分摊 ────────────────────────────
def calc_labor_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目的累计人力成本
规则:
- 有秒数的提交 → 按各项目产出秒数比例分摊日成本
- 无秒数的提交 → 按各项目提交条数比例分摊日成本
- 管理员手动调整优先
"""
# 找出所有给此项目提交过的人
submitters = db.query(Submission.user_id).filter(
Submission.project_id == project_id
).distinct().all()
submitter_ids = [s[0] for s in submitters]
total_labor = 0.0
for uid in submitter_ids:
user = db.query(User).filter(User.id == uid).first()
if not user:
continue
daily_cost = user.daily_cost
# 找这个人在此项目的所有提交日期
dates = db.query(Submission.submit_date).filter(
Submission.user_id == uid,
Submission.project_id == project_id,
).distinct().all()
for (d,) in dates:
# 检查是否有手动调整
override = db.query(CostOverride).filter(
CostOverride.user_id == uid,
CostOverride.date == d,
CostOverride.project_id == project_id,
).first()
if override:
total_labor += override.override_amount
continue
# 这个人这天所有项目的提交
day_subs = db.query(Submission).filter(
Submission.user_id == uid,
Submission.submit_date == d,
).all()
# 计算这天各项目的秒数和条数
project_seconds = defaultdict(float)
project_counts = defaultdict(int)
total_day_seconds = 0.0
total_day_count = 0
for s in day_subs:
project_seconds[s.project_id] += s.total_seconds
project_counts[s.project_id] += 1
total_day_seconds += s.total_seconds
total_day_count += 1
# 分摊
if total_day_seconds > 0:
# 有秒数 → 按秒数比例
ratio = project_seconds.get(project_id, 0) / total_day_seconds
elif total_day_count > 0:
# 无秒数 → 按条数比例
ratio = project_counts.get(project_id, 0) / total_day_count
else:
ratio = 0
total_labor += daily_cost * ratio
return round(total_labor, 2)
# ──────────────────────────── AI 工具成本 ────────────────────────────
def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的 AI 工具成本"""
total = 0.0
# 1. 直接指定项目的
direct = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.allocation_type == CostAllocationType.PROJECT,
AIToolCost.project_id == project_id,
).scalar() or 0
total += direct
# 2. 手动分摊的
manual = db.query(AIToolCostAllocation).filter(
AIToolCostAllocation.project_id == project_id,
).all()
for alloc in manual:
cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first()
if cost:
total += cost.amount * alloc.percentage / 100
# 3. 内容组整体(按产出秒数比例分摊)
team_costs = db.query(AIToolCost).filter(
AIToolCost.allocation_type == CostAllocationType.TEAM,
).all()
if team_costs:
# 所有项目的总秒数
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
# 此项目的秒数
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
for c in team_costs:
total += c.amount * ratio
return round(total, 2)
# ──────────────────────────── 外包成本 ────────────────────────────
def calc_outsource_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的外包成本"""
total = db.query(sa_func.sum(OutsourceCost.amount)).filter(
OutsourceCost.project_id == project_id,
).scalar() or 0
return round(total, 2)
# ──────────────────────────── 固定开支分摊 ────────────────────────────
def calc_overhead_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的固定开支(办公室租金+水电费)
规则:按所有项目的产出秒数比例均摊
"""
total_overhead = db.query(sa_func.sum(OverheadCost.amount)).scalar() or 0
if total_overhead == 0:
return 0.0
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
return round(total_overhead * ratio, 2)
return 0.0
# ──────────────────────────── 损耗计算 ────────────────────────────
def calc_waste_for_project(project_id: int, db: Session) -> dict:
"""
计算项目损耗
返回: {test_waste, overproduction_waste, total_waste, waste_rate, target_seconds}
"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
target = project.target_total_seconds
# 测试损耗:工作类型为"测试"的全部秒数
test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.work_type == WorkType.TEST,
).scalar() or 0
# 全部有秒数的提交总量
total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
# 超产损耗
overproduction_waste = max(0, total_submitted - target)
total_waste = test_waste + overproduction_waste
waste_rate = round(total_waste / target * 100, 1) if target > 0 else 0
return {
"target_seconds": target,
"total_submitted_seconds": round(total_submitted, 1),
"test_waste_seconds": round(test_waste, 1),
"overproduction_waste_seconds": round(overproduction_waste, 1),
"total_waste_seconds": round(total_waste, 1),
"waste_rate": waste_rate,
}
# ──────────────────────────── 团队效率 ────────────────────────────
def calc_team_efficiency(project_id: int, db: Session) -> list:
"""
人均基准对比法:
- 人均基准 = 目标秒数 ÷ 参与制作人数
- 每人超出比例 = (个人提交 - 人均基准) / 人均基准
"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return []
target = project.target_total_seconds
# 获取每个人的提交总秒数(仅有秒数的提交)
per_user = db.query(
Submission.user_id,
sa_func.sum(Submission.total_seconds).label("total_secs"),
sa_func.count(Submission.id).label("count"),
).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).group_by(Submission.user_id).all()
if not per_user:
return []
num_people = len(per_user)
baseline = target / num_people if num_people > 0 else 0
result = []
for user_id, total_secs, count in per_user:
user = db.query(User).filter(User.id == user_id).first()
excess = total_secs - baseline
excess_rate = round(excess / baseline * 100, 1) if baseline > 0 else 0
result.append({
"user_id": user_id,
"user_name": user.name if user else "未知",
"total_seconds": round(total_secs, 1),
"submission_count": count,
"baseline": round(baseline, 1),
"excess_seconds": round(excess, 1),
"excess_rate": excess_rate,
})
result.sort(key=lambda x: x["total_seconds"], reverse=True)
return result
# ──────────────────────────── 项目完整结算 ────────────────────────────
def calc_project_settlement(project_id: int, db: Session) -> dict:
"""生成项目结算报告"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
labor = calc_labor_cost_for_project(project_id, db)
ai_tool = calc_ai_tool_cost_for_project(project_id, db)
outsource = calc_outsource_cost_for_project(project_id, db)
overhead = calc_overhead_cost_for_project(project_id, db)
total_cost = labor + ai_tool + outsource + overhead
waste = calc_waste_for_project(project_id, db)
efficiency = calc_team_efficiency(project_id, db)
result = {
"project_id": project.id,
"project_name": project.name,
"project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type,
"labor_cost": labor,
"ai_tool_cost": ai_tool,
"outsource_cost": outsource,
"overhead_cost": overhead,
"total_cost": round(total_cost, 2),
**waste,
"team_efficiency": efficiency,
}
# 客户正式项目计算盈亏
pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type
if pt == "客户正式项目" and project.contract_amount:
result["contract_amount"] = project.contract_amount
result["profit_loss"] = round(project.contract_amount - total_cost, 2)
else:
result["contract_amount"] = None
result["profit_loss"] = None
return result