""" 计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块 修改计算规则只需改此文件。 """ from sqlalchemy.orm import Session from sqlalchemy import func as sa_func, and_ from collections import defaultdict from datetime import date, timedelta from models import ( User, Project, Submission, AIToolCost, AIToolCostAllocation, OutsourceCost, CostOverride, OverheadCost, WorkType, CostAllocationType ) from config import WORKING_DAYS_PER_MONTH # ──────────────────────────── 人力成本分摊 ──────────────────────────── def calc_labor_cost_for_project(project_id: int, db: Session) -> float: """ 计算某项目的累计人力成本 规则: - 有秒数的提交 → 按各项目产出秒数比例分摊日成本 - 无秒数的提交 → 按各项目提交条数比例分摊日成本 - 管理员手动调整优先 """ # 找出所有给此项目提交过的人 submitters = db.query(Submission.user_id).filter( Submission.project_id == project_id ).distinct().all() submitter_ids = [s[0] for s in submitters] total_labor = 0.0 for uid in submitter_ids: user = db.query(User).filter(User.id == uid).first() if not user: continue daily_cost = user.daily_cost # 找这个人在此项目的所有提交日期 dates = db.query(Submission.submit_date).filter( Submission.user_id == uid, Submission.project_id == project_id, ).distinct().all() for (d,) in dates: # 检查是否有手动调整 override = db.query(CostOverride).filter( CostOverride.user_id == uid, CostOverride.date == d, CostOverride.project_id == project_id, ).first() if override: total_labor += override.override_amount continue # 这个人这天所有项目的提交 day_subs = db.query(Submission).filter( Submission.user_id == uid, Submission.submit_date == d, ).all() # 计算这天各项目的秒数和条数 project_seconds = defaultdict(float) project_counts = defaultdict(int) total_day_seconds = 0.0 total_day_count = 0 for s in day_subs: project_seconds[s.project_id] += s.total_seconds project_counts[s.project_id] += 1 total_day_seconds += s.total_seconds total_day_count += 1 # 分摊 if total_day_seconds > 0: # 有秒数 → 按秒数比例 ratio = project_seconds.get(project_id, 0) / total_day_seconds elif total_day_count > 0: # 无秒数 → 按条数比例 ratio = project_counts.get(project_id, 0) / total_day_count else: ratio = 0 total_labor += daily_cost * ratio return round(total_labor, 2) # ──────────────────────────── AI 工具成本 ──────────────────────────── def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float: """计算某项目的 AI 工具成本""" total = 0.0 # 1. 直接指定项目的 direct = db.query(sa_func.sum(AIToolCost.amount)).filter( AIToolCost.allocation_type == CostAllocationType.PROJECT, AIToolCost.project_id == project_id, ).scalar() or 0 total += direct # 2. 手动分摊的 manual = db.query(AIToolCostAllocation).filter( AIToolCostAllocation.project_id == project_id, ).all() for alloc in manual: cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first() if cost: total += cost.amount * alloc.percentage / 100 # 3. 内容组整体(按产出秒数比例分摊) team_costs = db.query(AIToolCost).filter( AIToolCost.allocation_type == CostAllocationType.TEAM, ).all() if team_costs: # 所有项目的总秒数 all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.total_seconds > 0 ).scalar() or 0 # 此项目的秒数 proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).scalar() or 0 if all_secs > 0: ratio = proj_secs / all_secs for c in team_costs: total += c.amount * ratio return round(total, 2) # ──────────────────────────── 外包成本 ──────────────────────────── def calc_outsource_cost_for_project(project_id: int, db: Session) -> float: """计算某项目的外包成本""" total = db.query(sa_func.sum(OutsourceCost.amount)).filter( OutsourceCost.project_id == project_id, ).scalar() or 0 return round(total, 2) # ──────────────────────────── 固定开支分摊 ──────────────────────────── def calc_overhead_cost_for_project(project_id: int, db: Session) -> float: """ 计算某项目分摊的固定开支(办公室租金+水电费) 规则:按所有项目的产出秒数比例均摊 """ total_overhead = db.query(sa_func.sum(OverheadCost.amount)).scalar() or 0 if total_overhead == 0: return 0.0 all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.total_seconds > 0 ).scalar() or 0 proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).scalar() or 0 if all_secs > 0: ratio = proj_secs / all_secs return round(total_overhead * ratio, 2) return 0.0 # ──────────────────────────── 工作日计算工具 ──────────────────────────── def _working_days_between(start_date, end_date) -> int: """计算两个日期之间的工作日数(不含周末)""" if not start_date or not end_date: return 0 from datetime import timedelta, date as date_type # 统一为 date 类型 if hasattr(start_date, 'date'): start_date = start_date.date() if hasattr(end_date, 'date'): end_date = end_date.date() if end_date <= start_date: return 0 days = 0 current = start_date while current < end_date: current += timedelta(days=1) if current.weekday() < 5: # 周一~周五 days += 1 return days # ──────────────────────────── 里程碑损耗计算 ──────────────────────────── def _calc_milestone_waste(milestones, today=None) -> tuple: """ 计算里程碑的工时损耗(预估天数 vs 实际天数) 返回: (waste_hours, details_list) """ from datetime import date as date_type, timedelta if today is None: today = date_type.today() waste_hours = 0.0 details = [] for ms in milestones: if not ms.estimated_days or not ms.start_date: continue # 计算实际天数 end = ms.completed_at if ms.is_completed and ms.completed_at else today actual_days = _working_days_between(ms.start_date, end) if actual_days > ms.estimated_days: overrun = actual_days - ms.estimated_days waste_h = overrun * 8 waste_hours += waste_h details.append({ "milestone": ms.name, "estimated_days": ms.estimated_days, "actual_days": actual_days, "overrun_days": overrun, "waste_hours": waste_h, }) return waste_hours, details # ──────────────────────────── 损耗计算(三阶段) ──────────────────────────── def calc_waste_for_project(project_id: int, db: Session) -> dict: """ 三阶段损耗计算: - 前期:里程碑工时制(预估天数 vs 实际天数) - 制作:秒数制(产出 vs 目标,含修补镜头) - 后期:剪辑=工时制,修补镜头=秒数(归入制作),配音/音效=不计 """ from models import ( ProjectStatus, ProjectMilestone, ContentType, PhaseGroup ) project = db.query(Project).filter(Project.id == project_id).first() if not project: return {} target = project.target_total_seconds # 全部有秒数的提交总量 total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).scalar() or 0 # ── 废弃项目:全部产出记为损耗 ── if project.status == ProjectStatus.ABANDONED: return { "target_seconds": target, "total_submitted_seconds": round(total_submitted, 1), "pre_waste": {"waste_hours": 0, "details": []}, "production_waste": { "test_waste_seconds": 0, "overproduction_waste_seconds": round(total_submitted, 1), "total_waste_seconds": round(total_submitted, 1), }, "post_waste": {"days_waste_hours": 0, "details": []}, "total_waste_seconds": round(total_submitted, 1), "total_waste_hours": 0, "waste_rate": 100.0 if total_submitted > 0 else 0.0, "test_waste_seconds": 0, "overproduction_waste_seconds": round(total_submitted, 1), } # ── 前期损耗(工时制) ── pre_milestones = db.query(ProjectMilestone).filter( ProjectMilestone.project_id == project_id, ProjectMilestone.phase == PhaseGroup.PRE, ).all() pre_waste_hours, pre_details = _calc_milestone_waste(pre_milestones) # ── 制作损耗(秒数制) ── # 制作阶段的提交 production_total_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, Submission.project_phase == PhaseGroup.PRODUCTION, ).scalar() or 0 # 制作阶段的测试损耗 test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.work_type == WorkType.TEST, Submission.project_phase == PhaseGroup.PRODUCTION, ).scalar() or 0 # 修补镜头(后期秒数,归入制作目标对比) shot_repair_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.content_type == ContentType.SHOT_REPAIR, Submission.total_seconds > 0, ).scalar() or 0 # 制作产出 = 制作阶段非测试 + 修补镜头 production_output = (production_total_secs - test_waste) + shot_repair_secs overproduction_waste = max(0, production_output - target) production_total_waste = test_waste + overproduction_waste # ── 后期损耗 ── post_milestones = db.query(ProjectMilestone).filter( ProjectMilestone.project_id == project_id, ProjectMilestone.phase == PhaseGroup.POST, ).all() # 只计算剪辑里程碑的工时损耗(配音/音效不计,修补镜头已在秒数中) editing_milestones = [m for m in post_milestones if m.name == "剪辑"] post_days_waste_hours, post_details = _calc_milestone_waste(editing_milestones) # ── 汇总 ── total_waste_seconds = production_total_waste total_waste_hours = pre_waste_hours + post_days_waste_hours waste_rate = round(total_waste_seconds / target * 100, 1) if target > 0 else 0 return { "target_seconds": target, "total_submitted_seconds": round(total_submitted, 1), # 分阶段明细 "pre_waste": { "waste_hours": pre_waste_hours, "details": pre_details, }, "production_waste": { "test_waste_seconds": round(test_waste, 1), "overproduction_waste_seconds": round(overproduction_waste, 1), "total_waste_seconds": round(production_total_waste, 1), }, "post_waste": { "days_waste_hours": post_days_waste_hours, "details": post_details, }, # 汇总 "total_waste_seconds": round(total_waste_seconds, 1), "total_waste_hours": total_waste_hours, "waste_rate": waste_rate, # 兼容旧字段 "test_waste_seconds": round(test_waste, 1), "overproduction_waste_seconds": round(overproduction_waste, 1), } # ──────────────────────────── 团队效率 ──────────────────────────── def calc_team_efficiency(project_id: int, db: Session) -> list: """ 时均产出效率法(展示日均): - 效率对比用时均产出 = 累计秒数 ÷ 累计工时(跨项目公平) - 前端展示日均产出 = 累计秒数 ÷ 提交天数(直观) """ from sqlalchemy import distinct per_user = db.query( Submission.user_id, sa_func.sum(Submission.total_seconds).label("total_secs"), sa_func.sum(Submission.hours_spent).label("total_hours"), sa_func.count(distinct(Submission.submit_date)).label("days"), sa_func.count(Submission.id).label("count"), ).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).group_by(Submission.user_id).all() if not per_user: return [] user_data = [] for user_id, total_secs, total_hours, days, count in per_user: user = db.query(User).filter(User.id == user_id).first() daily_avg = total_secs / days if days > 0 else 0 hourly_output = total_secs / total_hours if total_hours and total_hours > 0 else 0 user_data.append({ "user_id": user_id, "user_name": user.name if user else "未知", "total_seconds": round(total_secs, 1), "submission_count": count, "total_hours": round(total_hours or 0, 1), "active_days": days, "daily_avg": round(daily_avg, 1), "hourly_output": round(hourly_output, 1), }) # 效率对比用时均产出(公平) team_hourly_avg = sum(d["hourly_output"] for d in user_data) / len(user_data) for d in user_data: diff = d["hourly_output"] - team_hourly_avg d["team_hourly_avg"] = round(team_hourly_avg, 1) d["efficiency_rate"] = round(diff / team_hourly_avg * 100, 1) if team_hourly_avg > 0 else 0 user_data.sort(key=lambda x: x["hourly_output"], reverse=True) return user_data # ──────────────────────────── 项目完整结算 ──────────────────────────── def calc_project_settlement(project_id: int, db: Session) -> dict: """生成项目结算报告""" project = db.query(Project).filter(Project.id == project_id).first() if not project: return {} labor = calc_labor_cost_for_project(project_id, db) ai_tool = calc_ai_tool_cost_for_project(project_id, db) outsource = calc_outsource_cost_for_project(project_id, db) overhead = calc_overhead_cost_for_project(project_id, db) total_cost = labor + ai_tool + outsource + overhead waste = calc_waste_for_project(project_id, db) efficiency = calc_team_efficiency(project_id, db) result = { "project_id": project.id, "project_name": project.name, "project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type, "labor_cost": labor, "ai_tool_cost": ai_tool, "outsource_cost": outsource, "overhead_cost": overhead, "total_cost": round(total_cost, 2), **waste, "team_efficiency": efficiency, } # 客户正式项目计算盈亏 pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type if pt == "客户正式项目" and project.contract_amount: result["contract_amount"] = project.contract_amount result["profit_loss"] = round(project.contract_amount - total_cost, 2) else: result["contract_amount"] = None result["profit_loss"] = None return result