""" 计算引擎 —— 所有成本分摊、损耗、效率计算逻辑集中在此模块 修改计算规则只需改此文件。 """ from sqlalchemy.orm import Session from sqlalchemy import func as sa_func, and_ from collections import defaultdict from datetime import date, timedelta from models import ( User, Project, Submission, AIToolCost, AIToolCostAllocation, OutsourceCost, CostOverride, WorkType, CostAllocationType ) from config import WORKING_DAYS_PER_MONTH # ──────────────────────────── 人力成本分摊 ──────────────────────────── def calc_labor_cost_for_project(project_id: int, db: Session) -> float: """ 计算某项目的累计人力成本 规则: - 有秒数的提交 → 按各项目产出秒数比例分摊日成本 - 无秒数的提交 → 按各项目提交条数比例分摊日成本 - 管理员手动调整优先 """ # 找出所有给此项目提交过的人 submitters = db.query(Submission.user_id).filter( Submission.project_id == project_id ).distinct().all() submitter_ids = [s[0] for s in submitters] total_labor = 0.0 for uid in submitter_ids: user = db.query(User).filter(User.id == uid).first() if not user: continue daily_cost = user.daily_cost # 找这个人在此项目的所有提交日期 dates = db.query(Submission.submit_date).filter( Submission.user_id == uid, Submission.project_id == project_id, ).distinct().all() for (d,) in dates: # 检查是否有手动调整 override = db.query(CostOverride).filter( CostOverride.user_id == uid, CostOverride.date == d, CostOverride.project_id == project_id, ).first() if override: total_labor += override.override_amount continue # 这个人这天所有项目的提交 day_subs = db.query(Submission).filter( Submission.user_id == uid, Submission.submit_date == d, ).all() # 计算这天各项目的秒数和条数 project_seconds = defaultdict(float) project_counts = defaultdict(int) total_day_seconds = 0.0 total_day_count = 0 for s in day_subs: project_seconds[s.project_id] += s.total_seconds project_counts[s.project_id] += 1 total_day_seconds += s.total_seconds total_day_count += 1 # 分摊 if total_day_seconds > 0: # 有秒数 → 按秒数比例 ratio = project_seconds.get(project_id, 0) / total_day_seconds elif total_day_count > 0: # 无秒数 → 按条数比例 ratio = project_counts.get(project_id, 0) / total_day_count else: ratio = 0 total_labor += daily_cost * ratio return round(total_labor, 2) # ──────────────────────────── AI 工具成本 ──────────────────────────── def calc_ai_tool_cost_for_project(project_id: int, db: Session) -> float: """计算某项目的 AI 工具成本""" total = 0.0 # 1. 直接指定项目的 direct = db.query(sa_func.sum(AIToolCost.amount)).filter( AIToolCost.allocation_type == CostAllocationType.PROJECT, AIToolCost.project_id == project_id, ).scalar() or 0 total += direct # 2. 手动分摊的 manual = db.query(AIToolCostAllocation).filter( AIToolCostAllocation.project_id == project_id, ).all() for alloc in manual: cost = db.query(AIToolCost).filter(AIToolCost.id == alloc.ai_tool_cost_id).first() if cost: total += cost.amount * alloc.percentage / 100 # 3. 内容组整体(按产出秒数比例分摊) team_costs = db.query(AIToolCost).filter( AIToolCost.allocation_type == CostAllocationType.TEAM, ).all() if team_costs: # 所有项目的总秒数 all_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.total_seconds > 0 ).scalar() or 0 # 此项目的秒数 proj_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).scalar() or 0 if all_secs > 0: ratio = proj_secs / all_secs for c in team_costs: total += c.amount * ratio return round(total, 2) # ──────────────────────────── 外包成本 ──────────────────────────── def calc_outsource_cost_for_project(project_id: int, db: Session) -> float: """计算某项目的外包成本""" total = db.query(sa_func.sum(OutsourceCost.amount)).filter( OutsourceCost.project_id == project_id, ).scalar() or 0 return round(total, 2) # ──────────────────────────── 损耗计算 ──────────────────────────── def calc_waste_for_project(project_id: int, db: Session) -> dict: """ 计算项目损耗 返回: {test_waste, overproduction_waste, total_waste, waste_rate, target_seconds} """ project = db.query(Project).filter(Project.id == project_id).first() if not project: return {} target = project.target_total_seconds # 测试损耗:工作类型为"测试"的全部秒数 test_waste = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.work_type == WorkType.TEST, ).scalar() or 0 # 全部有秒数的提交总量 total_submitted = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).scalar() or 0 # 超产损耗 overproduction_waste = max(0, total_submitted - target) total_waste = test_waste + overproduction_waste waste_rate = round(total_waste / target * 100, 1) if target > 0 else 0 return { "target_seconds": target, "total_submitted_seconds": round(total_submitted, 1), "test_waste_seconds": round(test_waste, 1), "overproduction_waste_seconds": round(overproduction_waste, 1), "total_waste_seconds": round(total_waste, 1), "waste_rate": waste_rate, } # ──────────────────────────── 团队效率 ──────────────────────────── def calc_team_efficiency(project_id: int, db: Session) -> list: """ 人均基准对比法: - 人均基准 = 目标秒数 ÷ 参与制作人数 - 每人超出比例 = (个人提交 - 人均基准) / 人均基准 """ project = db.query(Project).filter(Project.id == project_id).first() if not project: return [] target = project.target_total_seconds # 获取每个人的提交总秒数(仅有秒数的提交) per_user = db.query( Submission.user_id, sa_func.sum(Submission.total_seconds).label("total_secs"), sa_func.count(Submission.id).label("count"), ).filter( Submission.project_id == project_id, Submission.total_seconds > 0, ).group_by(Submission.user_id).all() if not per_user: return [] num_people = len(per_user) baseline = target / num_people if num_people > 0 else 0 result = [] for user_id, total_secs, count in per_user: user = db.query(User).filter(User.id == user_id).first() excess = total_secs - baseline excess_rate = round(excess / baseline * 100, 1) if baseline > 0 else 0 result.append({ "user_id": user_id, "user_name": user.name if user else "未知", "total_seconds": round(total_secs, 1), "submission_count": count, "baseline": round(baseline, 1), "excess_seconds": round(excess, 1), "excess_rate": excess_rate, }) result.sort(key=lambda x: x["total_seconds"], reverse=True) return result # ──────────────────────────── 项目完整结算 ──────────────────────────── def calc_project_settlement(project_id: int, db: Session) -> dict: """生成项目结算报告""" project = db.query(Project).filter(Project.id == project_id).first() if not project: return {} labor = calc_labor_cost_for_project(project_id, db) ai_tool = calc_ai_tool_cost_for_project(project_id, db) outsource = calc_outsource_cost_for_project(project_id, db) total_cost = labor + ai_tool + outsource waste = calc_waste_for_project(project_id, db) efficiency = calc_team_efficiency(project_id, db) result = { "project_id": project.id, "project_name": project.name, "project_type": project.project_type.value if hasattr(project.project_type, 'value') else project.project_type, "labor_cost": labor, "ai_tool_cost": ai_tool, "outsource_cost": outsource, "total_cost": round(total_cost, 2), **waste, "team_efficiency": efficiency, } # 客户正式项目计算盈亏 pt = project.project_type.value if hasattr(project.project_type, 'value') else project.project_type if pt == "客户正式项目" and project.contract_amount: result["contract_amount"] = project.contract_amount result["profit_loss"] = round(project.contract_amount - total_cost, 2) else: result["contract_amount"] = None result["profit_loss"] = None return result