- 内部事务成本池:非管理层用户的 时薪×投入时长,按产出秒数比例分摊 - 管理层用户提交内部事务不重复计算(已通过管理成本分摊) - 人力成本排除内部事务提交,避免重复 - 项目管理页面隐藏"内部事务"项目 - 阶段列宽度调整适配"内部事务"四字显示 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
596 lines
23 KiB
Python
596 lines
23 KiB
Python
"""
|
||
计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块
|
||
修改计算规则只需改此文件。
|
||
"""
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import func as sa_func, and_
|
||
from collections import defaultdict
|
||
from datetime import date, timedelta
|
||
from models import (
|
||
User, Project, Submission, AIToolCost, AIToolCostAllocation,
|
||
OutsourceCost, CostOverride, OverheadCost, WorkType, PhaseGroup, CostAllocationType
|
||
)
|
||
from config import WORKING_DAYS_PER_MONTH
|
||
|
||
|
||
# ──────────────────────────── 人力成本分摊 ────────────────────────────
|
||
|
||
def calc_labor_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""
|
||
计算某项目的累计人力成本
|
||
规则:
|
||
- 有秒数的提交 → 按各项目产出秒数比例分摊日成本
|
||
- 无秒数的提交 → 按各项目提交条数比例分摊日成本
|
||
- 管理员手动调整优先
|
||
"""
|
||
# 找出所有给此项目提交过的人
|
||
submitters = db.query(Submission.user_id).filter(
|
||
Submission.project_id == project_id
|
||
).distinct().all()
|
||
submitter_ids = [s[0] for s in submitters]
|
||
|
||
total_labor = 0.0
|
||
|
||
for uid in submitter_ids:
|
||
user = db.query(User).filter(User.id == uid).first()
|
||
if not user:
|
||
continue
|
||
daily_cost = user.daily_cost
|
||
|
||
# 找这个人在此项目的所有提交日期
|
||
dates = db.query(Submission.submit_date).filter(
|
||
Submission.user_id == uid,
|
||
Submission.project_id == project_id,
|
||
).distinct().all()
|
||
|
||
for (d,) in dates:
|
||
# 检查是否有手动调整
|
||
override = db.query(CostOverride).filter(
|
||
CostOverride.user_id == uid,
|
||
CostOverride.date == d,
|
||
CostOverride.project_id == project_id,
|
||
).first()
|
||
if override:
|
||
total_labor += override.override_amount
|
||
continue
|
||
|
||
# 这个人这天所有项目的提交(排除内部事务,其成本单独分摊)
|
||
day_subs = db.query(Submission).filter(
|
||
Submission.user_id == uid,
|
||
Submission.submit_date == d,
|
||
Submission.project_phase != PhaseGroup.INTERNAL,
|
||
).all()
|
||
|
||
# 计算这天各项目的秒数和条数
|
||
project_seconds = defaultdict(float)
|
||
project_counts = defaultdict(int)
|
||
total_day_seconds = 0.0
|
||
total_day_count = 0
|
||
|
||
for s in day_subs:
|
||
project_seconds[s.project_id] += s.total_seconds
|
||
project_counts[s.project_id] += 1
|
||
total_day_seconds += s.total_seconds
|
||
total_day_count += 1
|
||
|
||
# 分摊
|
||
if total_day_seconds > 0:
|
||
# 有秒数 → 按秒数比例
|
||
ratio = project_seconds.get(project_id, 0) / total_day_seconds
|
||
elif total_day_count > 0:
|
||
# 无秒数 → 按条数比例
|
||
ratio = project_counts.get(project_id, 0) / total_day_count
|
||
else:
|
||
ratio = 0
|
||
|
||
total_labor += daily_cost * ratio
|
||
|
||
return round(total_labor, 2)
|
||
|
||
|
||
# ──────────────────────────── AI 工具成本 ────────────────────────────
|
||
|
||
def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""计算某项目的 AI 工具成本"""
|
||
total = 0.0
|
||
|
||
# 1. 直接指定项目的
|
||
direct = db.query(sa_func.sum(AIToolCost.amount)).filter(
|
||
AIToolCost.allocation_type == CostAllocationType.PROJECT,
|
||
AIToolCost.project_id == project_id,
|
||
).scalar() or 0
|
||
total += direct
|
||
|
||
# 2. 手动分摊的
|
||
manual = db.query(AIToolCostAllocation).filter(
|
||
AIToolCostAllocation.project_id == project_id,
|
||
).all()
|
||
for alloc in manual:
|
||
cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first()
|
||
if cost:
|
||
total += cost.amount * alloc.percentage / 100
|
||
|
||
# 3. 内容组整体(按产出秒数比例分摊)
|
||
team_costs = db.query(AIToolCost).filter(
|
||
AIToolCost.allocation_type == CostAllocationType.TEAM,
|
||
).all()
|
||
if team_costs:
|
||
# 所有项目的总秒数
|
||
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.total_seconds > 0
|
||
).scalar() or 0
|
||
# 此项目的秒数
|
||
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
if all_secs > 0:
|
||
ratio = proj_secs / all_secs
|
||
for c in team_costs:
|
||
total += c.amount * ratio
|
||
|
||
return round(total, 2)
|
||
|
||
|
||
# ──────────────────────────── 外包成本 ────────────────────────────
|
||
|
||
def calc_outsource_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""计算某项目的外包成本"""
|
||
total = db.query(sa_func.sum(OutsourceCost.amount)).filter(
|
||
OutsourceCost.project_id == project_id,
|
||
).scalar() or 0
|
||
return round(total, 2)
|
||
|
||
|
||
# ──────────────────────────── 固定开支分摊 ────────────────────────────
|
||
|
||
def calc_overhead_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""
|
||
计算某项目分摊的固定开支(办公室租金+水电费)
|
||
规则:按所有项目的产出秒数比例均摊
|
||
"""
|
||
total_overhead = db.query(sa_func.sum(OverheadCost.amount)).scalar() or 0
|
||
if total_overhead == 0:
|
||
return 0.0
|
||
|
||
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.total_seconds > 0
|
||
).scalar() or 0
|
||
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
if all_secs > 0:
|
||
ratio = proj_secs / all_secs
|
||
return round(total_overhead * ratio, 2)
|
||
return 0.0
|
||
|
||
|
||
# ──────────────────────────── 管理成本分摊 ────────────────────────────
|
||
|
||
def calc_management_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""
|
||
计算某项目分摊的管理成本(豁免提交角色的人员日薪)
|
||
规则:
|
||
- 只算豁免提交(exempt_submission=1)角色下有工资的活跃用户
|
||
- 按有提交记录的工作日数 × 每人日薪 计算总池
|
||
- 总池按各项目产出秒数比例分摊
|
||
"""
|
||
from models import Role
|
||
|
||
# 找出豁免角色 ID
|
||
exempt_role_ids = set(
|
||
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
|
||
)
|
||
if not exempt_role_ids:
|
||
return 0.0
|
||
|
||
# 豁免角色下有工资的活跃用户
|
||
exempt_users = db.query(User).filter(
|
||
User.is_active == 1,
|
||
User.monthly_salary > 0,
|
||
User.role_id.in_(exempt_role_ids),
|
||
).all()
|
||
if not exempt_users:
|
||
return 0.0
|
||
|
||
# 有提交记录的工作日数(代表公司运营天数)
|
||
working_days = db.query(Submission.submit_date).distinct().count()
|
||
if working_days == 0:
|
||
return 0.0
|
||
|
||
# 总管理成本池 = 每人日薪 × 工作日数
|
||
total_pool = sum(u.daily_cost * working_days for u in exempt_users)
|
||
|
||
# 按产出秒数比例分摊
|
||
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.total_seconds > 0
|
||
).scalar() or 0
|
||
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
if all_secs > 0:
|
||
return round(total_pool * proj_secs / all_secs, 2)
|
||
return 0.0
|
||
|
||
|
||
# ──────────────────────────── 内部事务成本分摊 ────────────────────────────
|
||
|
||
def calc_internal_affairs_cost_for_project(project_id: int, db: Session) -> float:
|
||
"""
|
||
计算某项目分摊的内部事务成本(培训/招聘面试等)
|
||
规则:
|
||
- 汇总"内部事务"项目下非管理层用户的 hours_spent × 时薪
|
||
- 管理层用户(exempt_submission=1)的成本已通过管理成本分摊,不重复计算
|
||
- 总池按各项目产出秒数比例分摊到所有进行中项目
|
||
"""
|
||
from models import Project, ProjectStatus, Role
|
||
|
||
# 找到内部事务项目
|
||
internal_proj = db.query(Project).filter(Project.name == "内部事务").first()
|
||
if not internal_proj:
|
||
return 0.0
|
||
|
||
# 内部事务项目不分摊给自己
|
||
if project_id == internal_proj.id:
|
||
return 0.0
|
||
|
||
# 管理层用户 ID(exempt_submission=1),其成本已在管理成本中计算
|
||
exempt_role_ids = set(
|
||
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
|
||
)
|
||
|
||
# 汇总内部事务的总成本:非管理层用户的 hours_spent × 时薪
|
||
internal_subs = db.query(Submission).filter(
|
||
Submission.project_id == internal_proj.id,
|
||
).all()
|
||
if not internal_subs:
|
||
return 0.0
|
||
|
||
total_pool = 0.0
|
||
user_cache = {}
|
||
for s in internal_subs:
|
||
if s.user_id not in user_cache:
|
||
user_cache[s.user_id] = db.query(User).filter(User.id == s.user_id).first()
|
||
user = user_cache[s.user_id]
|
||
if not user or user.daily_cost <= 0:
|
||
continue
|
||
# 跳过管理层用户(已通过管理成本分摊)
|
||
if user.role_id in exempt_role_ids:
|
||
continue
|
||
hourly_cost = user.daily_cost / 8
|
||
total_pool += (s.hours_spent or 0) * hourly_cost
|
||
|
||
if total_pool <= 0:
|
||
return 0.0
|
||
|
||
# 按产出秒数比例分摊(排除内部事务项目本身)
|
||
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.total_seconds > 0,
|
||
Submission.project_id != internal_proj.id,
|
||
).scalar() or 0
|
||
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
if all_secs > 0:
|
||
return round(total_pool * proj_secs / all_secs, 2)
|
||
return 0.0
|
||
|
||
|
||
# ──────────────────────────── 工作日计算工具 ────────────────────────────
|
||
|
||
def _working_days_between(start_date, end_date) -> int:
|
||
"""计算两个日期之间的工作日数(不含周末)"""
|
||
if not start_date or not end_date:
|
||
return 0
|
||
from datetime import timedelta, date as date_type
|
||
# 统一为 date 类型
|
||
if hasattr(start_date, 'date'):
|
||
start_date = start_date.date()
|
||
if hasattr(end_date, 'date'):
|
||
end_date = end_date.date()
|
||
if end_date <= start_date:
|
||
return 0
|
||
days = 0
|
||
current = start_date
|
||
while current < end_date:
|
||
current += timedelta(days=1)
|
||
if current.weekday() < 5: # 周一~周五
|
||
days += 1
|
||
return days
|
||
|
||
|
||
# ──────────────────────────── 里程碑损耗计算 ────────────────────────────
|
||
|
||
def _calc_milestone_waste(milestones, today=None) -> tuple:
|
||
"""
|
||
计算里程碑的工时损耗(预估天数 vs 实际天数)
|
||
返回: (waste_hours, details_list)
|
||
"""
|
||
from datetime import date as date_type, timedelta
|
||
if today is None:
|
||
today = date_type.today()
|
||
|
||
waste_hours = 0.0
|
||
details = []
|
||
for ms in milestones:
|
||
if not ms.estimated_days or not ms.start_date:
|
||
continue
|
||
# 计算实际天数
|
||
end = ms.completed_at if ms.is_completed and ms.completed_at else today
|
||
actual_days = _working_days_between(ms.start_date, end)
|
||
if actual_days > ms.estimated_days:
|
||
overrun = actual_days - ms.estimated_days
|
||
waste_h = overrun * 8
|
||
waste_hours += waste_h
|
||
details.append({
|
||
"milestone": ms.name,
|
||
"estimated_days": ms.estimated_days,
|
||
"actual_days": actual_days,
|
||
"overrun_days": overrun,
|
||
"waste_hours": waste_h,
|
||
})
|
||
return waste_hours, details
|
||
|
||
|
||
# ──────────────────────────── 损耗计算(三阶段) ────────────────────────────
|
||
|
||
def calc_waste_for_project(project_id: int, db: Session) -> dict:
|
||
"""
|
||
三阶段损耗计算:
|
||
- 前期:里程碑工时制(预估天数 vs 实际天数)
|
||
- 制作:秒数制(产出 vs 目标,含修补镜头)
|
||
- 后期:剪辑=工时制,修补镜头=秒数(归入制作),配音/音效=不计
|
||
"""
|
||
from models import (
|
||
ProjectStatus, ProjectMilestone, ContentType, PhaseGroup
|
||
)
|
||
|
||
project = db.query(Project).filter(Project.id == project_id).first()
|
||
if not project:
|
||
return {}
|
||
|
||
target = project.target_total_seconds
|
||
|
||
# 全部有秒数的提交总量
|
||
total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
# ── 废弃项目:全部产出记为损耗 ──
|
||
if project.status == ProjectStatus.ABANDONED:
|
||
return {
|
||
"target_seconds": target,
|
||
"total_submitted_seconds": round(total_submitted, 1),
|
||
"pre_waste": {"waste_hours": 0, "details": []},
|
||
"production_waste": {
|
||
"test_waste_seconds": 0,
|
||
"overproduction_waste_seconds": round(total_submitted, 1),
|
||
"total_waste_seconds": round(total_submitted, 1),
|
||
},
|
||
"post_waste": {"days_waste_hours": 0, "details": []},
|
||
"total_waste_seconds": round(total_submitted, 1),
|
||
"total_waste_hours": 0,
|
||
"waste_rate": 100.0 if total_submitted > 0 else 0.0,
|
||
"test_waste_seconds": 0,
|
||
"overproduction_waste_seconds": round(total_submitted, 1),
|
||
}
|
||
|
||
# ── 前期损耗(工时制) ──
|
||
pre_milestones = db.query(ProjectMilestone).filter(
|
||
ProjectMilestone.project_id == project_id,
|
||
ProjectMilestone.phase == PhaseGroup.PRE,
|
||
).all()
|
||
pre_waste_hours, pre_details = _calc_milestone_waste(pre_milestones)
|
||
|
||
# ── 制作损耗(秒数制) ──
|
||
# 制作阶段的提交
|
||
production_total_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
Submission.project_phase == PhaseGroup.PRODUCTION,
|
||
).scalar() or 0
|
||
|
||
# 制作阶段的测试损耗
|
||
test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.work_type == WorkType.TEST,
|
||
Submission.project_phase == PhaseGroup.PRODUCTION,
|
||
).scalar() or 0
|
||
|
||
# 修补镜头(后期秒数,归入制作目标对比)
|
||
shot_repair_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.content_type == ContentType.SHOT_REPAIR,
|
||
Submission.total_seconds > 0,
|
||
).scalar() or 0
|
||
|
||
# 制作产出 = 制作阶段非测试 + 修补镜头
|
||
production_output = (production_total_secs - test_waste) + shot_repair_secs
|
||
overproduction_waste = max(0, production_output - target)
|
||
production_total_waste = test_waste + overproduction_waste
|
||
|
||
# ── 后期损耗 ──
|
||
post_milestones = db.query(ProjectMilestone).filter(
|
||
ProjectMilestone.project_id == project_id,
|
||
ProjectMilestone.phase == PhaseGroup.POST,
|
||
).all()
|
||
# 只计算剪辑里程碑的工时损耗(配音/音效不计,修补镜头已在秒数中)
|
||
editing_milestones = [m for m in post_milestones if m.name == "剪辑"]
|
||
post_days_waste_hours, post_details = _calc_milestone_waste(editing_milestones)
|
||
|
||
# ── 汇总 ──
|
||
total_waste_seconds = production_total_waste
|
||
total_waste_hours = pre_waste_hours + post_days_waste_hours
|
||
waste_rate = round(total_waste_seconds / target * 100, 1) if target > 0 else 0
|
||
|
||
return {
|
||
"target_seconds": target,
|
||
"total_submitted_seconds": round(total_submitted, 1),
|
||
# 分阶段明细
|
||
"pre_waste": {
|
||
"waste_hours": pre_waste_hours,
|
||
"details": pre_details,
|
||
},
|
||
"production_waste": {
|
||
"test_waste_seconds": round(test_waste, 1),
|
||
"overproduction_waste_seconds": round(overproduction_waste, 1),
|
||
"total_waste_seconds": round(production_total_waste, 1),
|
||
},
|
||
"post_waste": {
|
||
"days_waste_hours": post_days_waste_hours,
|
||
"details": post_details,
|
||
},
|
||
# 汇总
|
||
"total_waste_seconds": round(total_waste_seconds, 1),
|
||
"total_waste_hours": total_waste_hours,
|
||
"waste_rate": waste_rate,
|
||
# 兼容旧字段
|
||
"test_waste_seconds": round(test_waste, 1),
|
||
"overproduction_waste_seconds": round(overproduction_waste, 1),
|
||
}
|
||
|
||
|
||
# ──────────────────────────── 团队效率 ────────────────────────────
|
||
|
||
def calc_team_efficiency(project_id: int, db: Session) -> list:
|
||
"""
|
||
加权效率算法:
|
||
- 加权效率 = (制作秒数 - 修改秒数) / 总工时 (综合速度+质量)
|
||
- 通过率 = (制作秒数 - 修改秒数) / 制作秒数 (纯质量指标)
|
||
- 日均净产出 = (制作秒数 - 修改秒数) / 活跃天数
|
||
- 熟练度等级基于加权效率与团队均值的比值
|
||
"""
|
||
from sqlalchemy import distinct, case
|
||
|
||
per_user = db.query(
|
||
Submission.user_id,
|
||
sa_func.sum(Submission.total_seconds).label("total_secs"),
|
||
sa_func.sum(Submission.hours_spent).label("total_hours"),
|
||
sa_func.count(distinct(Submission.submit_date)).label("days"),
|
||
sa_func.count(Submission.id).label("count"),
|
||
sa_func.sum(case(
|
||
(Submission.work_type == WorkType.PRODUCTION, Submission.total_seconds),
|
||
else_=0,
|
||
)).label("production_secs"),
|
||
sa_func.sum(case(
|
||
(Submission.work_type == WorkType.REVISION, Submission.total_seconds),
|
||
else_=0,
|
||
)).label("revision_secs"),
|
||
).filter(
|
||
Submission.project_id == project_id,
|
||
Submission.total_seconds > 0,
|
||
Submission.project_phase == PhaseGroup.PRODUCTION, # 只算中期
|
||
).group_by(Submission.user_id).all()
|
||
|
||
if not per_user:
|
||
return []
|
||
|
||
user_data = []
|
||
for user_id, total_secs, total_hours, days, count, prod_secs, rev_secs in per_user:
|
||
user = db.query(User).filter(User.id == user_id).first()
|
||
prod_secs = prod_secs or 0
|
||
rev_secs = rev_secs or 0
|
||
net_secs = max(prod_secs - rev_secs, 0)
|
||
|
||
# 原有指标(向后兼容,结算页面依赖)
|
||
daily_avg = total_secs / days if days > 0 else 0
|
||
hourly_output = total_secs / total_hours if total_hours and total_hours > 0 else 0
|
||
|
||
# 新指标
|
||
first_pass_rate = round(net_secs / prod_secs * 100, 1) if prod_secs > 0 else 0
|
||
weighted_efficiency = net_secs / total_hours if total_hours and total_hours > 0 else 0
|
||
daily_net_output = net_secs / days if days > 0 else 0
|
||
|
||
user_data.append({
|
||
"user_id": user_id,
|
||
"user_name": user.name if user else "未知",
|
||
"total_seconds": round(total_secs, 1),
|
||
"submission_count": count,
|
||
"total_hours": round(total_hours or 0, 1),
|
||
"active_days": days,
|
||
"daily_avg": round(daily_avg, 1),
|
||
"hourly_output": round(hourly_output, 1),
|
||
# 新字段
|
||
"production_seconds": round(prod_secs, 1),
|
||
"revision_seconds": round(rev_secs, 1),
|
||
"first_pass_rate": first_pass_rate,
|
||
"weighted_efficiency": round(weighted_efficiency, 1),
|
||
"daily_net_output": round(daily_net_output, 1),
|
||
})
|
||
|
||
# 熟练度等级:基于加权效率与团队均值的比值
|
||
team_weighted_avg = sum(d["weighted_efficiency"] for d in user_data) / len(user_data)
|
||
|
||
for d in user_data:
|
||
d["team_weighted_avg"] = round(team_weighted_avg, 1)
|
||
ratio = d["weighted_efficiency"] / team_weighted_avg if team_weighted_avg > 0 else 0
|
||
# 效率百分比(与团队均值的偏差)
|
||
d["efficiency_rate"] = round((ratio - 1) * 100, 1) if team_weighted_avg > 0 else 0
|
||
# 熟练度等级
|
||
if ratio >= 1.5:
|
||
d["proficiency_grade"] = "S+"
|
||
elif ratio >= 1.2:
|
||
d["proficiency_grade"] = "S"
|
||
elif ratio >= 0.8:
|
||
d["proficiency_grade"] = "A"
|
||
elif ratio >= 0.5:
|
||
d["proficiency_grade"] = "B"
|
||
else:
|
||
d["proficiency_grade"] = "C"
|
||
|
||
user_data.sort(key=lambda x: x["weighted_efficiency"], reverse=True)
|
||
return user_data
|
||
|
||
|
||
# ──────────────────────────── 项目完整结算 ────────────────────────────
|
||
|
||
def calc_project_settlement(project_id: int, db: Session) -> dict:
|
||
"""生成项目结算报告"""
|
||
project = db.query(Project).filter(Project.id == project_id).first()
|
||
if not project:
|
||
return {}
|
||
|
||
labor = calc_labor_cost_for_project(project_id, db)
|
||
ai_tool = calc_ai_tool_cost_for_project(project_id, db)
|
||
outsource = calc_outsource_cost_for_project(project_id, db)
|
||
overhead = calc_overhead_cost_for_project(project_id, db)
|
||
management = calc_management_cost_for_project(project_id, db)
|
||
internal_affairs = calc_internal_affairs_cost_for_project(project_id, db)
|
||
total_cost = labor + ai_tool + outsource + overhead + management + internal_affairs
|
||
waste = calc_waste_for_project(project_id, db)
|
||
efficiency = calc_team_efficiency(project_id, db)
|
||
|
||
result = {
|
||
"project_id": project.id,
|
||
"project_name": project.name,
|
||
"project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type,
|
||
"labor_cost": labor,
|
||
"ai_tool_cost": ai_tool,
|
||
"outsource_cost": outsource,
|
||
"overhead_cost": overhead,
|
||
"management_cost": management,
|
||
"internal_affairs_cost": internal_affairs,
|
||
"total_cost": round(total_cost, 2),
|
||
**waste,
|
||
"team_efficiency": efficiency,
|
||
}
|
||
|
||
# 客户正式项目计算盈亏
|
||
pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type
|
||
if pt == "客户正式项目" and project.contract_amount:
|
||
result["contract_amount"] = project.contract_amount
|
||
result["profit_loss"] = round(project.contract_amount - total_cost, 2)
|
||
else:
|
||
result["contract_amount"] = None
|
||
result["profit_loss"] = None
|
||
|
||
return result
|