airlabs-manage/backend/calculations.py
seaislee1209 a5d3739eef
All checks were successful
Build and Deploy Backend / build-and-deploy (push) Successful in 1m19s
Build and Deploy Web / build-and-deploy (push) Successful in 1m9s
feat: 内部事务成本按比例分摊到所有项目 + UI优化
- 内部事务成本池:非管理层用户的 时薪×投入时长,按产出秒数比例分摊
- 管理层用户提交内部事务不重复计算(已通过管理成本分摊)
- 人力成本排除内部事务提交,避免重复
- 项目管理页面隐藏"内部事务"项目
- 阶段列宽度调整适配"内部事务"四字显示

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 11:11:15 +08:00

596 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块
修改计算规则只需改此文件。
"""
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func, and_
from collections import defaultdict
from datetime import date, timedelta
from models import (
User, Project, Submission, AIToolCost, AIToolCostAllocation,
OutsourceCost, CostOverride, OverheadCost, WorkType, PhaseGroup, CostAllocationType
)
from config import WORKING_DAYS_PER_MONTH
# ──────────────────────────── 人力成本分摊 ────────────────────────────
def calc_labor_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目的累计人力成本
规则:
- 有秒数的提交 → 按各项目产出秒数比例分摊日成本
- 无秒数的提交 → 按各项目提交条数比例分摊日成本
- 管理员手动调整优先
"""
# 找出所有给此项目提交过的人
submitters = db.query(Submission.user_id).filter(
Submission.project_id == project_id
).distinct().all()
submitter_ids = [s[0] for s in submitters]
total_labor = 0.0
for uid in submitter_ids:
user = db.query(User).filter(User.id == uid).first()
if not user:
continue
daily_cost = user.daily_cost
# 找这个人在此项目的所有提交日期
dates = db.query(Submission.submit_date).filter(
Submission.user_id == uid,
Submission.project_id == project_id,
).distinct().all()
for (d,) in dates:
# 检查是否有手动调整
override = db.query(CostOverride).filter(
CostOverride.user_id == uid,
CostOverride.date == d,
CostOverride.project_id == project_id,
).first()
if override:
total_labor += override.override_amount
continue
# 这个人这天所有项目的提交(排除内部事务,其成本单独分摊)
day_subs = db.query(Submission).filter(
Submission.user_id == uid,
Submission.submit_date == d,
Submission.project_phase != PhaseGroup.INTERNAL,
).all()
# 计算这天各项目的秒数和条数
project_seconds = defaultdict(float)
project_counts = defaultdict(int)
total_day_seconds = 0.0
total_day_count = 0
for s in day_subs:
project_seconds[s.project_id] += s.total_seconds
project_counts[s.project_id] += 1
total_day_seconds += s.total_seconds
total_day_count += 1
# 分摊
if total_day_seconds > 0:
# 有秒数 → 按秒数比例
ratio = project_seconds.get(project_id, 0) / total_day_seconds
elif total_day_count > 0:
# 无秒数 → 按条数比例
ratio = project_counts.get(project_id, 0) / total_day_count
else:
ratio = 0
total_labor += daily_cost * ratio
return round(total_labor, 2)
# ──────────────────────────── AI 工具成本 ────────────────────────────
def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的 AI 工具成本"""
total = 0.0
# 1. 直接指定项目的
direct = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.allocation_type == CostAllocationType.PROJECT,
AIToolCost.project_id == project_id,
).scalar() or 0
total += direct
# 2. 手动分摊的
manual = db.query(AIToolCostAllocation).filter(
AIToolCostAllocation.project_id == project_id,
).all()
for alloc in manual:
cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first()
if cost:
total += cost.amount * alloc.percentage / 100
# 3. 内容组整体(按产出秒数比例分摊)
team_costs = db.query(AIToolCost).filter(
AIToolCost.allocation_type == CostAllocationType.TEAM,
).all()
if team_costs:
# 所有项目的总秒数
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
# 此项目的秒数
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
for c in team_costs:
total += c.amount * ratio
return round(total, 2)
# ──────────────────────────── 外包成本 ────────────────────────────
def calc_outsource_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的外包成本"""
total = db.query(sa_func.sum(OutsourceCost.amount)).filter(
OutsourceCost.project_id == project_id,
).scalar() or 0
return round(total, 2)
# ──────────────────────────── 固定开支分摊 ────────────────────────────
def calc_overhead_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的固定开支(办公室租金+水电费)
规则:按所有项目的产出秒数比例均摊
"""
total_overhead = db.query(sa_func.sum(OverheadCost.amount)).scalar() or 0
if total_overhead == 0:
return 0.0
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
return round(total_overhead * ratio, 2)
return 0.0
# ──────────────────────────── 管理成本分摊 ────────────────────────────
def calc_management_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的管理成本(豁免提交角色的人员日薪)
规则:
- 只算豁免提交exempt_submission=1角色下有工资的活跃用户
- 按有提交记录的工作日数 × 每人日薪 计算总池
- 总池按各项目产出秒数比例分摊
"""
from models import Role
# 找出豁免角色 ID
exempt_role_ids = set(
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
)
if not exempt_role_ids:
return 0.0
# 豁免角色下有工资的活跃用户
exempt_users = db.query(User).filter(
User.is_active == 1,
User.monthly_salary > 0,
User.role_id.in_(exempt_role_ids),
).all()
if not exempt_users:
return 0.0
# 有提交记录的工作日数(代表公司运营天数)
working_days = db.query(Submission.submit_date).distinct().count()
if working_days == 0:
return 0.0
# 总管理成本池 = 每人日薪 × 工作日数
total_pool = sum(u.daily_cost * working_days for u in exempt_users)
# 按产出秒数比例分摊
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
return round(total_pool * proj_secs / all_secs, 2)
return 0.0
# ──────────────────────────── 内部事务成本分摊 ────────────────────────────
def calc_internal_affairs_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的内部事务成本(培训/招聘面试等)
规则:
- 汇总"内部事务"项目下非管理层用户的 hours_spent × 时薪
- 管理层用户exempt_submission=1的成本已通过管理成本分摊不重复计算
- 总池按各项目产出秒数比例分摊到所有进行中项目
"""
from models import Project, ProjectStatus, Role
# 找到内部事务项目
internal_proj = db.query(Project).filter(Project.name == "内部事务").first()
if not internal_proj:
return 0.0
# 内部事务项目不分摊给自己
if project_id == internal_proj.id:
return 0.0
# 管理层用户 IDexempt_submission=1其成本已在管理成本中计算
exempt_role_ids = set(
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
)
# 汇总内部事务的总成本:非管理层用户的 hours_spent × 时薪
internal_subs = db.query(Submission).filter(
Submission.project_id == internal_proj.id,
).all()
if not internal_subs:
return 0.0
total_pool = 0.0
user_cache = {}
for s in internal_subs:
if s.user_id not in user_cache:
user_cache[s.user_id] = db.query(User).filter(User.id == s.user_id).first()
user = user_cache[s.user_id]
if not user or user.daily_cost <= 0:
continue
# 跳过管理层用户(已通过管理成本分摊)
if user.role_id in exempt_role_ids:
continue
hourly_cost = user.daily_cost / 8
total_pool += (s.hours_spent or 0) * hourly_cost
if total_pool <= 0:
return 0.0
# 按产出秒数比例分摊(排除内部事务项目本身)
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0,
Submission.project_id != internal_proj.id,
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
return round(total_pool * proj_secs / all_secs, 2)
return 0.0
# ──────────────────────────── 工作日计算工具 ────────────────────────────
def _working_days_between(start_date, end_date) -> int:
"""计算两个日期之间的工作日数(不含周末)"""
if not start_date or not end_date:
return 0
from datetime import timedelta, date as date_type
# 统一为 date 类型
if hasattr(start_date, 'date'):
start_date = start_date.date()
if hasattr(end_date, 'date'):
end_date = end_date.date()
if end_date <= start_date:
return 0
days = 0
current = start_date
while current < end_date:
current += timedelta(days=1)
if current.weekday() < 5: # 周一~周五
days += 1
return days
# ──────────────────────────── 里程碑损耗计算 ────────────────────────────
def _calc_milestone_waste(milestones, today=None) -> tuple:
"""
计算里程碑的工时损耗(预估天数 vs 实际天数)
返回: (waste_hours, details_list)
"""
from datetime import date as date_type, timedelta
if today is None:
today = date_type.today()
waste_hours = 0.0
details = []
for ms in milestones:
if not ms.estimated_days or not ms.start_date:
continue
# 计算实际天数
end = ms.completed_at if ms.is_completed and ms.completed_at else today
actual_days = _working_days_between(ms.start_date, end)
if actual_days > ms.estimated_days:
overrun = actual_days - ms.estimated_days
waste_h = overrun * 8
waste_hours += waste_h
details.append({
"milestone": ms.name,
"estimated_days": ms.estimated_days,
"actual_days": actual_days,
"overrun_days": overrun,
"waste_hours": waste_h,
})
return waste_hours, details
# ──────────────────────────── 损耗计算(三阶段) ────────────────────────────
def calc_waste_for_project(project_id: int, db: Session) -> dict:
"""
三阶段损耗计算:
- 前期:里程碑工时制(预估天数 vs 实际天数)
- 制作:秒数制(产出 vs 目标,含修补镜头)
- 后期:剪辑=工时制,修补镜头=秒数(归入制作),配音/音效=不计
"""
from models import (
ProjectStatus, ProjectMilestone, ContentType, PhaseGroup
)
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
target = project.target_total_seconds
# 全部有秒数的提交总量
total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
# ── 废弃项目:全部产出记为损耗 ──
if project.status == ProjectStatus.ABANDONED:
return {
"target_seconds": target,
"total_submitted_seconds": round(total_submitted, 1),
"pre_waste": {"waste_hours": 0, "details": []},
"production_waste": {
"test_waste_seconds": 0,
"overproduction_waste_seconds": round(total_submitted, 1),
"total_waste_seconds": round(total_submitted, 1),
},
"post_waste": {"days_waste_hours": 0, "details": []},
"total_waste_seconds": round(total_submitted, 1),
"total_waste_hours": 0,
"waste_rate": 100.0 if total_submitted > 0 else 0.0,
"test_waste_seconds": 0,
"overproduction_waste_seconds": round(total_submitted, 1),
}
# ── 前期损耗(工时制) ──
pre_milestones = db.query(ProjectMilestone).filter(
ProjectMilestone.project_id == project_id,
ProjectMilestone.phase == PhaseGroup.PRE,
).all()
pre_waste_hours, pre_details = _calc_milestone_waste(pre_milestones)
# ── 制作损耗(秒数制) ──
# 制作阶段的提交
production_total_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
# 制作阶段的测试损耗
test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.work_type == WorkType.TEST,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
# 修补镜头(后期秒数,归入制作目标对比)
shot_repair_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.content_type == ContentType.SHOT_REPAIR,
Submission.total_seconds > 0,
).scalar() or 0
# 制作产出 = 制作阶段非测试 + 修补镜头
production_output = (production_total_secs - test_waste) + shot_repair_secs
overproduction_waste = max(0, production_output - target)
production_total_waste = test_waste + overproduction_waste
# ── 后期损耗 ──
post_milestones = db.query(ProjectMilestone).filter(
ProjectMilestone.project_id == project_id,
ProjectMilestone.phase == PhaseGroup.POST,
).all()
# 只计算剪辑里程碑的工时损耗(配音/音效不计,修补镜头已在秒数中)
editing_milestones = [m for m in post_milestones if m.name == "剪辑"]
post_days_waste_hours, post_details = _calc_milestone_waste(editing_milestones)
# ── 汇总 ──
total_waste_seconds = production_total_waste
total_waste_hours = pre_waste_hours + post_days_waste_hours
waste_rate = round(total_waste_seconds / target * 100, 1) if target > 0 else 0
return {
"target_seconds": target,
"total_submitted_seconds": round(total_submitted, 1),
# 分阶段明细
"pre_waste": {
"waste_hours": pre_waste_hours,
"details": pre_details,
},
"production_waste": {
"test_waste_seconds": round(test_waste, 1),
"overproduction_waste_seconds": round(overproduction_waste, 1),
"total_waste_seconds": round(production_total_waste, 1),
},
"post_waste": {
"days_waste_hours": post_days_waste_hours,
"details": post_details,
},
# 汇总
"total_waste_seconds": round(total_waste_seconds, 1),
"total_waste_hours": total_waste_hours,
"waste_rate": waste_rate,
# 兼容旧字段
"test_waste_seconds": round(test_waste, 1),
"overproduction_waste_seconds": round(overproduction_waste, 1),
}
# ──────────────────────────── 团队效率 ────────────────────────────
def calc_team_efficiency(project_id: int, db: Session) -> list:
"""
加权效率算法:
- 加权效率 = (制作秒数 - 修改秒数) / 总工时 (综合速度+质量)
- 通过率 = (制作秒数 - 修改秒数) / 制作秒数 (纯质量指标)
- 日均净产出 = (制作秒数 - 修改秒数) / 活跃天数
- 熟练度等级基于加权效率与团队均值的比值
"""
from sqlalchemy import distinct, case
per_user = db.query(
Submission.user_id,
sa_func.sum(Submission.total_seconds).label("total_secs"),
sa_func.sum(Submission.hours_spent).label("total_hours"),
sa_func.count(distinct(Submission.submit_date)).label("days"),
sa_func.count(Submission.id).label("count"),
sa_func.sum(case(
(Submission.work_type == WorkType.PRODUCTION, Submission.total_seconds),
else_=0,
)).label("production_secs"),
sa_func.sum(case(
(Submission.work_type == WorkType.REVISION, Submission.total_seconds),
else_=0,
)).label("revision_secs"),
).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION, # 只算中期
).group_by(Submission.user_id).all()
if not per_user:
return []
user_data = []
for user_id, total_secs, total_hours, days, count, prod_secs, rev_secs in per_user:
user = db.query(User).filter(User.id == user_id).first()
prod_secs = prod_secs or 0
rev_secs = rev_secs or 0
net_secs = max(prod_secs - rev_secs, 0)
# 原有指标(向后兼容,结算页面依赖)
daily_avg = total_secs / days if days > 0 else 0
hourly_output = total_secs / total_hours if total_hours and total_hours > 0 else 0
# 新指标
first_pass_rate = round(net_secs / prod_secs * 100, 1) if prod_secs > 0 else 0
weighted_efficiency = net_secs / total_hours if total_hours and total_hours > 0 else 0
daily_net_output = net_secs / days if days > 0 else 0
user_data.append({
"user_id": user_id,
"user_name": user.name if user else "未知",
"total_seconds": round(total_secs, 1),
"submission_count": count,
"total_hours": round(total_hours or 0, 1),
"active_days": days,
"daily_avg": round(daily_avg, 1),
"hourly_output": round(hourly_output, 1),
# 新字段
"production_seconds": round(prod_secs, 1),
"revision_seconds": round(rev_secs, 1),
"first_pass_rate": first_pass_rate,
"weighted_efficiency": round(weighted_efficiency, 1),
"daily_net_output": round(daily_net_output, 1),
})
# 熟练度等级:基于加权效率与团队均值的比值
team_weighted_avg = sum(d["weighted_efficiency"] for d in user_data) / len(user_data)
for d in user_data:
d["team_weighted_avg"] = round(team_weighted_avg, 1)
ratio = d["weighted_efficiency"] / team_weighted_avg if team_weighted_avg > 0 else 0
# 效率百分比(与团队均值的偏差)
d["efficiency_rate"] = round((ratio - 1) * 100, 1) if team_weighted_avg > 0 else 0
# 熟练度等级
if ratio >= 1.5:
d["proficiency_grade"] = "S+"
elif ratio >= 1.2:
d["proficiency_grade"] = "S"
elif ratio >= 0.8:
d["proficiency_grade"] = "A"
elif ratio >= 0.5:
d["proficiency_grade"] = "B"
else:
d["proficiency_grade"] = "C"
user_data.sort(key=lambda x: x["weighted_efficiency"], reverse=True)
return user_data
# ──────────────────────────── 项目完整结算 ────────────────────────────
def calc_project_settlement(project_id: int, db: Session) -> dict:
"""生成项目结算报告"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
labor = calc_labor_cost_for_project(project_id, db)
ai_tool = calc_ai_tool_cost_for_project(project_id, db)
outsource = calc_outsource_cost_for_project(project_id, db)
overhead = calc_overhead_cost_for_project(project_id, db)
management = calc_management_cost_for_project(project_id, db)
internal_affairs = calc_internal_affairs_cost_for_project(project_id, db)
total_cost = labor + ai_tool + outsource + overhead + management + internal_affairs
waste = calc_waste_for_project(project_id, db)
efficiency = calc_team_efficiency(project_id, db)
result = {
"project_id": project.id,
"project_name": project.name,
"project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type,
"labor_cost": labor,
"ai_tool_cost": ai_tool,
"outsource_cost": outsource,
"overhead_cost": overhead,
"management_cost": management,
"internal_affairs_cost": internal_affairs,
"total_cost": round(total_cost, 2),
**waste,
"team_efficiency": efficiency,
}
# 客户正式项目计算盈亏
pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type
if pt == "客户正式项目" and project.contract_amount:
result["contract_amount"] = project.contract_amount
result["profit_loss"] = round(project.contract_amount - total_cost, 2)
else:
result["contract_amount"] = None
result["profit_loss"] = None
return result