airlabs-manage/backend/calculations.py
seaislee1209 530f02a66a
Some checks failed
Build and Deploy Web / build-and-deploy (push) Has been cancelled
Build and Deploy Backend / build-and-deploy (push) Has been cancelled
feat: 飞书报告卡片化 + 报告权限系统 + 产出过滤优化
- 日报/周报/月报改为结构化卡片推送(column_set布局)
- 新增 report:daily/weekly/monthly 权限到角色管理
- 产出统计只算中期制作阶段动画秒数
- 效率之星改为跨项目加权通过率
- AI点评补充风险数据源
- 禁用多余admin账号,股东角色加报告权限

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 20:43:35 +08:00

528 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块
修改计算规则只需改此文件。
"""
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func, and_
from collections import defaultdict
from datetime import date, timedelta
from models import (
User, Project, Submission, AIToolCost, AIToolCostAllocation,
OutsourceCost, CostOverride, OverheadCost, WorkType, PhaseGroup, CostAllocationType
)
from config import WORKING_DAYS_PER_MONTH
# ──────────────────────────── 人力成本分摊 ────────────────────────────
def calc_labor_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目的累计人力成本
规则:
- 有秒数的提交 → 按各项目产出秒数比例分摊日成本
- 无秒数的提交 → 按各项目提交条数比例分摊日成本
- 管理员手动调整优先
"""
# 找出所有给此项目提交过的人
submitters = db.query(Submission.user_id).filter(
Submission.project_id == project_id
).distinct().all()
submitter_ids = [s[0] for s in submitters]
total_labor = 0.0
for uid in submitter_ids:
user = db.query(User).filter(User.id == uid).first()
if not user:
continue
daily_cost = user.daily_cost
# 找这个人在此项目的所有提交日期
dates = db.query(Submission.submit_date).filter(
Submission.user_id == uid,
Submission.project_id == project_id,
).distinct().all()
for (d,) in dates:
# 检查是否有手动调整
override = db.query(CostOverride).filter(
CostOverride.user_id == uid,
CostOverride.date == d,
CostOverride.project_id == project_id,
).first()
if override:
total_labor += override.override_amount
continue
# 这个人这天所有项目的提交
day_subs = db.query(Submission).filter(
Submission.user_id == uid,
Submission.submit_date == d,
).all()
# 计算这天各项目的秒数和条数
project_seconds = defaultdict(float)
project_counts = defaultdict(int)
total_day_seconds = 0.0
total_day_count = 0
for s in day_subs:
project_seconds[s.project_id] += s.total_seconds
project_counts[s.project_id] += 1
total_day_seconds += s.total_seconds
total_day_count += 1
# 分摊
if total_day_seconds > 0:
# 有秒数 → 按秒数比例
ratio = project_seconds.get(project_id, 0) / total_day_seconds
elif total_day_count > 0:
# 无秒数 → 按条数比例
ratio = project_counts.get(project_id, 0) / total_day_count
else:
ratio = 0
total_labor += daily_cost * ratio
return round(total_labor, 2)
# ──────────────────────────── AI 工具成本 ────────────────────────────
def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的 AI 工具成本"""
total = 0.0
# 1. 直接指定项目的
direct = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.allocation_type == CostAllocationType.PROJECT,
AIToolCost.project_id == project_id,
).scalar() or 0
total += direct
# 2. 手动分摊的
manual = db.query(AIToolCostAllocation).filter(
AIToolCostAllocation.project_id == project_id,
).all()
for alloc in manual:
cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first()
if cost:
total += cost.amount * alloc.percentage / 100
# 3. 内容组整体(按产出秒数比例分摊)
team_costs = db.query(AIToolCost).filter(
AIToolCost.allocation_type == CostAllocationType.TEAM,
).all()
if team_costs:
# 所有项目的总秒数
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
# 此项目的秒数
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
for c in team_costs:
total += c.amount * ratio
return round(total, 2)
# ──────────────────────────── 外包成本 ────────────────────────────
def calc_outsource_cost_for_project(project_id: int, db: Session) -> float:
"""计算某项目的外包成本"""
total = db.query(sa_func.sum(OutsourceCost.amount)).filter(
OutsourceCost.project_id == project_id,
).scalar() or 0
return round(total, 2)
# ──────────────────────────── 固定开支分摊 ────────────────────────────
def calc_overhead_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的固定开支(办公室租金+水电费)
规则:按所有项目的产出秒数比例均摊
"""
total_overhead = db.query(sa_func.sum(OverheadCost.amount)).scalar() or 0
if total_overhead == 0:
return 0.0
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
ratio = proj_secs / all_secs
return round(total_overhead * ratio, 2)
return 0.0
# ──────────────────────────── 管理成本分摊 ────────────────────────────
def calc_management_cost_for_project(project_id: int, db: Session) -> float:
"""
计算某项目分摊的管理成本(豁免提交角色的人员日薪)
规则:
- 只算豁免提交exempt_submission=1角色下有工资的活跃用户
- 按有提交记录的工作日数 × 每人日薪 计算总池
- 总池按各项目产出秒数比例分摊
"""
from models import Role
# 找出豁免角色 ID
exempt_role_ids = set(
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
)
if not exempt_role_ids:
return 0.0
# 豁免角色下有工资的活跃用户
exempt_users = db.query(User).filter(
User.is_active == 1,
User.monthly_salary > 0,
User.role_id.in_(exempt_role_ids),
).all()
if not exempt_users:
return 0.0
# 有提交记录的工作日数(代表公司运营天数)
working_days = db.query(Submission.submit_date).distinct().count()
if working_days == 0:
return 0.0
# 总管理成本池 = 每人日薪 × 工作日数
total_pool = sum(u.daily_cost * working_days for u in exempt_users)
# 按产出秒数比例分摊
all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.total_seconds > 0
).scalar() or 0
proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
if all_secs > 0:
return round(total_pool * proj_secs / all_secs, 2)
return 0.0
# ──────────────────────────── 工作日计算工具 ────────────────────────────
def _working_days_between(start_date, end_date) -> int:
"""计算两个日期之间的工作日数(不含周末)"""
if not start_date or not end_date:
return 0
from datetime import timedelta, date as date_type
# 统一为 date 类型
if hasattr(start_date, 'date'):
start_date = start_date.date()
if hasattr(end_date, 'date'):
end_date = end_date.date()
if end_date <= start_date:
return 0
days = 0
current = start_date
while current < end_date:
current += timedelta(days=1)
if current.weekday() < 5: # 周一~周五
days += 1
return days
# ──────────────────────────── 里程碑损耗计算 ────────────────────────────
def _calc_milestone_waste(milestones, today=None) -> tuple:
"""
计算里程碑的工时损耗(预估天数 vs 实际天数)
返回: (waste_hours, details_list)
"""
from datetime import date as date_type, timedelta
if today is None:
today = date_type.today()
waste_hours = 0.0
details = []
for ms in milestones:
if not ms.estimated_days or not ms.start_date:
continue
# 计算实际天数
end = ms.completed_at if ms.is_completed and ms.completed_at else today
actual_days = _working_days_between(ms.start_date, end)
if actual_days > ms.estimated_days:
overrun = actual_days - ms.estimated_days
waste_h = overrun * 8
waste_hours += waste_h
details.append({
"milestone": ms.name,
"estimated_days": ms.estimated_days,
"actual_days": actual_days,
"overrun_days": overrun,
"waste_hours": waste_h,
})
return waste_hours, details
# ──────────────────────────── 损耗计算(三阶段) ────────────────────────────
def calc_waste_for_project(project_id: int, db: Session) -> dict:
"""
三阶段损耗计算:
- 前期:里程碑工时制(预估天数 vs 实际天数)
- 制作:秒数制(产出 vs 目标,含修补镜头)
- 后期:剪辑=工时制,修补镜头=秒数(归入制作),配音/音效=不计
"""
from models import (
ProjectStatus, ProjectMilestone, ContentType, PhaseGroup
)
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
target = project.target_total_seconds
# 全部有秒数的提交总量
total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
).scalar() or 0
# ── 废弃项目:全部产出记为损耗 ──
if project.status == ProjectStatus.ABANDONED:
return {
"target_seconds": target,
"total_submitted_seconds": round(total_submitted, 1),
"pre_waste": {"waste_hours": 0, "details": []},
"production_waste": {
"test_waste_seconds": 0,
"overproduction_waste_seconds": round(total_submitted, 1),
"total_waste_seconds": round(total_submitted, 1),
},
"post_waste": {"days_waste_hours": 0, "details": []},
"total_waste_seconds": round(total_submitted, 1),
"total_waste_hours": 0,
"waste_rate": 100.0 if total_submitted > 0 else 0.0,
"test_waste_seconds": 0,
"overproduction_waste_seconds": round(total_submitted, 1),
}
# ── 前期损耗(工时制) ──
pre_milestones = db.query(ProjectMilestone).filter(
ProjectMilestone.project_id == project_id,
ProjectMilestone.phase == PhaseGroup.PRE,
).all()
pre_waste_hours, pre_details = _calc_milestone_waste(pre_milestones)
# ── 制作损耗(秒数制) ──
# 制作阶段的提交
production_total_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
# 制作阶段的测试损耗
test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.work_type == WorkType.TEST,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
# 修补镜头(后期秒数,归入制作目标对比)
shot_repair_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == project_id,
Submission.content_type == ContentType.SHOT_REPAIR,
Submission.total_seconds > 0,
).scalar() or 0
# 制作产出 = 制作阶段非测试 + 修补镜头
production_output = (production_total_secs - test_waste) + shot_repair_secs
overproduction_waste = max(0, production_output - target)
production_total_waste = test_waste + overproduction_waste
# ── 后期损耗 ──
post_milestones = db.query(ProjectMilestone).filter(
ProjectMilestone.project_id == project_id,
ProjectMilestone.phase == PhaseGroup.POST,
).all()
# 只计算剪辑里程碑的工时损耗(配音/音效不计,修补镜头已在秒数中)
editing_milestones = [m for m in post_milestones if m.name == "剪辑"]
post_days_waste_hours, post_details = _calc_milestone_waste(editing_milestones)
# ── 汇总 ──
total_waste_seconds = production_total_waste
total_waste_hours = pre_waste_hours + post_days_waste_hours
waste_rate = round(total_waste_seconds / target * 100, 1) if target > 0 else 0
return {
"target_seconds": target,
"total_submitted_seconds": round(total_submitted, 1),
# 分阶段明细
"pre_waste": {
"waste_hours": pre_waste_hours,
"details": pre_details,
},
"production_waste": {
"test_waste_seconds": round(test_waste, 1),
"overproduction_waste_seconds": round(overproduction_waste, 1),
"total_waste_seconds": round(production_total_waste, 1),
},
"post_waste": {
"days_waste_hours": post_days_waste_hours,
"details": post_details,
},
# 汇总
"total_waste_seconds": round(total_waste_seconds, 1),
"total_waste_hours": total_waste_hours,
"waste_rate": waste_rate,
# 兼容旧字段
"test_waste_seconds": round(test_waste, 1),
"overproduction_waste_seconds": round(overproduction_waste, 1),
}
# ──────────────────────────── 团队效率 ────────────────────────────
def calc_team_efficiency(project_id: int, db: Session) -> list:
"""
加权效率算法:
- 加权效率 = (制作秒数 - 修改秒数) / 总工时 (综合速度+质量)
- 通过率 = (制作秒数 - 修改秒数) / 制作秒数 (纯质量指标)
- 日均净产出 = (制作秒数 - 修改秒数) / 活跃天数
- 熟练度等级基于加权效率与团队均值的比值
"""
from sqlalchemy import distinct, case
per_user = db.query(
Submission.user_id,
sa_func.sum(Submission.total_seconds).label("total_secs"),
sa_func.sum(Submission.hours_spent).label("total_hours"),
sa_func.count(distinct(Submission.submit_date)).label("days"),
sa_func.count(Submission.id).label("count"),
sa_func.sum(case(
(Submission.work_type == WorkType.PRODUCTION, Submission.total_seconds),
else_=0,
)).label("production_secs"),
sa_func.sum(case(
(Submission.work_type == WorkType.REVISION, Submission.total_seconds),
else_=0,
)).label("revision_secs"),
).filter(
Submission.project_id == project_id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION, # 只算中期
).group_by(Submission.user_id).all()
if not per_user:
return []
user_data = []
for user_id, total_secs, total_hours, days, count, prod_secs, rev_secs in per_user:
user = db.query(User).filter(User.id == user_id).first()
prod_secs = prod_secs or 0
rev_secs = rev_secs or 0
net_secs = max(prod_secs - rev_secs, 0)
# 原有指标(向后兼容,结算页面依赖)
daily_avg = total_secs / days if days > 0 else 0
hourly_output = total_secs / total_hours if total_hours and total_hours > 0 else 0
# 新指标
first_pass_rate = round(net_secs / prod_secs * 100, 1) if prod_secs > 0 else 0
weighted_efficiency = net_secs / total_hours if total_hours and total_hours > 0 else 0
daily_net_output = net_secs / days if days > 0 else 0
user_data.append({
"user_id": user_id,
"user_name": user.name if user else "未知",
"total_seconds": round(total_secs, 1),
"submission_count": count,
"total_hours": round(total_hours or 0, 1),
"active_days": days,
"daily_avg": round(daily_avg, 1),
"hourly_output": round(hourly_output, 1),
# 新字段
"production_seconds": round(prod_secs, 1),
"revision_seconds": round(rev_secs, 1),
"first_pass_rate": first_pass_rate,
"weighted_efficiency": round(weighted_efficiency, 1),
"daily_net_output": round(daily_net_output, 1),
})
# 熟练度等级:基于加权效率与团队均值的比值
team_weighted_avg = sum(d["weighted_efficiency"] for d in user_data) / len(user_data)
for d in user_data:
d["team_weighted_avg"] = round(team_weighted_avg, 1)
ratio = d["weighted_efficiency"] / team_weighted_avg if team_weighted_avg > 0 else 0
# 效率百分比(与团队均值的偏差)
d["efficiency_rate"] = round((ratio - 1) * 100, 1) if team_weighted_avg > 0 else 0
# 熟练度等级
if ratio >= 1.5:
d["proficiency_grade"] = "S+"
elif ratio >= 1.2:
d["proficiency_grade"] = "S"
elif ratio >= 0.8:
d["proficiency_grade"] = "A"
elif ratio >= 0.5:
d["proficiency_grade"] = "B"
else:
d["proficiency_grade"] = "C"
user_data.sort(key=lambda x: x["weighted_efficiency"], reverse=True)
return user_data
# ──────────────────────────── 项目完整结算 ────────────────────────────
def calc_project_settlement(project_id: int, db: Session) -> dict:
"""生成项目结算报告"""
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return {}
labor = calc_labor_cost_for_project(project_id, db)
ai_tool = calc_ai_tool_cost_for_project(project_id, db)
outsource = calc_outsource_cost_for_project(project_id, db)
overhead = calc_overhead_cost_for_project(project_id, db)
management = calc_management_cost_for_project(project_id, db)
total_cost = labor + ai_tool + outsource + overhead + management
waste = calc_waste_for_project(project_id, db)
efficiency = calc_team_efficiency(project_id, db)
result = {
"project_id": project.id,
"project_name": project.name,
"project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type,
"labor_cost": labor,
"ai_tool_cost": ai_tool,
"outsource_cost": outsource,
"overhead_cost": overhead,
"management_cost": management,
"total_cost": round(total_cost, 2),
**waste,
"team_efficiency": efficiency,
}
# 客户正式项目计算盈亏
pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type
if pt == "客户正式项目" and project.contract_amount:
result["contract_amount"] = project.contract_amount
result["profit_loss"] = round(project.contract_amount - total_cost, 2)
else:
result["contract_amount"] = None
result["profit_loss"] = None
return result