"""报告生成服务 —— 汇总数据库数据 + 调用 AI 生成摘要""" import logging from datetime import date, timedelta from sqlalchemy.orm import Session from sqlalchemy import func as sa_func from models import ( User, Project, Submission, AIToolCost, Role, ProjectStatus, WorkType, PhaseGroup, ) from calculations import ( calc_waste_for_project, calc_labor_cost_for_project, calc_ai_tool_cost_for_project, calc_outsource_cost_for_project, calc_overhead_cost_for_project, calc_team_efficiency, calc_project_settlement, ) from services.ai_service import generate_report_summary logger = logging.getLogger(__name__) def _fmt_seconds(secs: float) -> str: """秒数格式化为 Xh Xm 或 Xm Xs""" if secs <= 0: return "0s" h = int(secs) // 3600 m = (int(secs) % 3600) // 60 s = int(secs) % 60 if h > 0: return f"{h}h {m}m" if m > 0 else f"{h}h" if m > 0: return f"{m}m {s}s" if s > 0 else f"{m}m" return f"{s}s" def _fmt_money(amount: float) -> str: """金额格式化""" if amount >= 10000: return f"¥{amount/10000:.1f}万" return f"¥{amount:,.0f}" def _is_weekend(d: date) -> bool: """判断是否为周末(周六=5, 周日=6)""" return d.weekday() >= 5 # ──────────────────────────── 日报 ──────────────────────────── def generate_daily_report(db: Session) -> dict: """ 生成日报,返回结构化数据供飞书卡片使用 """ today = date.today() is_weekend = _is_weekend(today) weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] weekday_label = weekday_names[today.weekday()] # 今日提交 today_subs = db.query(Submission).filter( Submission.submit_date == today ).all() today_submitter_ids = set(s.user_id for s in today_subs) # 产出只算中期制作阶段的动画秒数 today_total_secs = sum( s.total_seconds for s in today_subs if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION ) # 未提交人员(仅工作日统计,且排除豁免角色) not_submitted = [] if not is_weekend: exempt_role_ids = set( r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all() ) all_active_user_ids = set( uid for (uid,) in db.query(Submission.user_id).distinct().all() ) for uid in all_active_user_ids: if uid not in today_submitter_ids: user = db.query(User).filter(User.id == uid).first() if user and user.is_active and user.role_id not in exempt_role_ids: not_submitted.append(user.name) # 进行中项目 active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() projects_data = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) total_secs = waste.get("total_submitted_seconds", 0) target = p.target_total_seconds progress = round(total_secs / target * 100, 1) if target > 0 else 0 proj_today_secs = sum( s.total_seconds for s in today_subs if s.project_id == p.id and s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION ) projects_data.append({ "name": p.name, "progress": progress, "today_output": _fmt_seconds(proj_today_secs), }) # 风险检测(使用完整风险引擎) full_risks = analyze_project_risks(db) risks_data = [ { "name": r["project_name"], "level": r["risk_level"], "detail": ";".join(r["risk_factors"]), } for r in full_risks ] # AI 数据上下文 project_lines = [ f"- {p['name']}:进度 {p['progress']}%,今日产出 {p['today_output']}" for p in projects_data ] risk_lines = [f"- {r['name']}:{r['detail']}" for r in risks_data] data_context = ( f"日期:{today}({weekday_label}{',周末' if is_weekend else ''})\n" f"进行中项目:{len(active_projects)} 个\n" f"今日提交人次:{len(today_submitter_ids)}\n" f"今日总产出:{_fmt_seconds(today_total_secs)}\n" ) if not is_weekend: data_context += f"今日未提交人员:{', '.join(not_submitted) if not_submitted else '无'}\n" else: data_context += "今天是周末,不统计未提交\n" data_context += ( f"各项目情况:\n" + "\n".join(project_lines) + "\n" f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "无") ) ai_summary = generate_report_summary(data_context, "daily") title = f"AirLabs 日报 — {today}({weekday_label})" return { "title": title, "report_type": "daily", "card_data": { "is_weekend": is_weekend, "weekday_label": weekday_label, "active_project_count": len(active_projects), "submitter_count": len(today_submitter_ids), "total_output": _fmt_seconds(today_total_secs), "not_submitted": not_submitted, "projects": projects_data, "risks": risks_data, "ai_summary": ai_summary, }, } # ──────────────────────────── 周报 ──────────────────────────── def generate_weekly_report(db: Session) -> dict: """生成周报(本周一到当天的数据),返回结构化数据""" today = date.today() monday = today - timedelta(days=today.weekday()) week_subs = db.query(Submission).filter( Submission.submit_date >= monday, Submission.submit_date <= today, ).all() week_submitter_ids = set(s.user_id for s in week_subs) # 产出只算中期制作阶段的动画秒数 week_total_secs = sum( s.total_seconds for s in week_subs if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION ) working_days = min((today - monday).days + 1, 5) avg_daily = round(week_total_secs / max(1, len(week_submitter_ids)) / max(1, working_days), 1) active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() projects_data = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) total_secs = waste.get("total_submitted_seconds", 0) target = p.target_total_seconds progress = round(total_secs / target * 100, 1) if target > 0 else 0 proj_week_secs = sum( s.total_seconds for s in week_subs if s.project_id == p.id and s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION ) projects_data.append({ "name": p.name, "progress": progress, "week_output": _fmt_seconds(proj_week_secs), }) # 本周成本 week_labor = 0.0 processed = set() for s in week_subs: key = (s.user_id, s.submit_date) if key not in processed: processed.add(key) user = db.query(User).filter(User.id == s.user_id).first() if user: week_labor += user.daily_cost week_ai_cost = db.query(sa_func.sum(AIToolCost.amount)).filter( AIToolCost.record_date >= monday, AIToolCost.record_date <= today, ).scalar() or 0 # 损耗排行 waste_ranking = [] for p in active_projects: w = calc_waste_for_project(p.id, db) if w.get("total_waste_seconds", 0) > 0: waste_ranking.append({"name": p.name, "rate": w["waste_rate"]}) waste_ranking.sort(key=lambda x: x["rate"], reverse=True) # 效率之星 —— 跨项目加权通过率(有效产出 / 总产出,扣除返工) from collections import defaultdict user_prod_secs = defaultdict(float) # 制作秒数 user_rev_secs = defaultdict(float) # 返工秒数 for s in week_subs: if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION: if s.work_type == WorkType.PRODUCTION: user_prod_secs[s.user_id] += s.total_seconds elif s.work_type == WorkType.REVISION: user_rev_secs[s.user_id] += s.total_seconds top_producer = None best_rate = -1 best_uid = None for uid, prod in user_prod_secs.items(): if prod < 60: # 至少 1 分钟制作产出才参与排名 continue rev = user_rev_secs.get(uid, 0) rate = (prod - rev) / prod if prod > 0 else 0 if rate > best_rate: best_rate = rate best_uid = uid if best_uid is not None and best_rate >= 0: top_user = db.query(User).filter(User.id == best_uid).first() if top_user: top_producer = f"{top_user.name}(通过率 {round(best_rate * 100, 1)}%)" # AI 数据上下文 project_lines = [ f"- {p['name']}:当前进度 {p['progress']}%,本周产出 {p['week_output']}" for p in projects_data ] # 风险检测 full_risks = analyze_project_risks(db) risk_lines = [ f"- {r['project_name']}:{';'.join(r['risk_factors'])}" for r in full_risks ] data_context = ( f"周期:{monday} ~ {today}\n" f"进行中项目:{len(active_projects)} 个\n" f"本周总产出:{_fmt_seconds(week_total_secs)}\n" f"人均日产出:{_fmt_seconds(avg_daily)}\n" f"效率最高:{top_producer or '暂无'}\n" f"本周人力成本:{_fmt_money(week_labor)}\n" f"本周AI工具成本:{_fmt_money(week_ai_cost)}\n" f"各项目:\n" + "\n".join(project_lines) + "\n" f"损耗排行:\n" + ("\n".join( f"- {w['name']}:{w['rate']}%" for w in waste_ranking[:5] ) if waste_ranking else "无") + "\n" f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "无") ) ai_summary = generate_report_summary(data_context, "weekly") week_num = today.isocalendar()[1] title = f"AirLabs 周报 — 第{week_num}周({monday} ~ {today})" return { "title": title, "report_type": "weekly", "card_data": { "week_range": f"{monday} ~ {today}", "week_num": week_num, "total_output": _fmt_seconds(week_total_secs), "avg_daily_output": _fmt_seconds(avg_daily), "top_producer": top_producer, "projects": projects_data, "labor_cost": _fmt_money(week_labor), "ai_tool_cost": _fmt_money(week_ai_cost), "waste_ranking": waste_ranking[:5], "ai_summary": ai_summary, }, } # ──────────────────────────── 月报 ──────────────────────────── def generate_monthly_report(db: Session) -> dict: """生成月报(上月完整数据),返回结构化数据""" today = date.today() first_of_this_month = today.replace(day=1) last_of_prev_month = first_of_this_month - timedelta(days=1) first_of_prev_month = last_of_prev_month.replace(day=1) month_label = f"{last_of_prev_month.year}年{last_of_prev_month.month}月" month_subs = db.query(Submission).filter( Submission.submit_date >= first_of_prev_month, Submission.submit_date <= last_of_prev_month, ).all() # 产出只算中期制作阶段的动画秒数 month_total_secs = sum( s.total_seconds for s in month_subs if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION ) month_submitters = set(s.user_id for s in month_subs) all_projects = db.query(Project).filter( Project.status.in_([ProjectStatus.IN_PROGRESS, ProjectStatus.COMPLETED]) ).all() completed_this_month = [ p for p in all_projects if p.status == ProjectStatus.COMPLETED and p.actual_completion_date and first_of_prev_month <= p.actual_completion_date <= last_of_prev_month ] active_projects = [p for p in all_projects if p.status == ProjectStatus.IN_PROGRESS] # 各项目成本 project_costs = [] total_all_cost = 0.0 for p in active_projects + completed_this_month: labor = calc_labor_cost_for_project(p.id, db) ai_tool = calc_ai_tool_cost_for_project(p.id, db) outsource = calc_outsource_cost_for_project(p.id, db) overhead = calc_overhead_cost_for_project(p.id, db) total = labor + ai_tool + outsource + overhead total_all_cost += total project_costs.append({ "name": p.name, "labor": _fmt_money(labor), "ai_tool": _fmt_money(ai_tool), "outsource": _fmt_money(outsource), "overhead": _fmt_money(overhead), "total": _fmt_money(total), }) # 盈亏 profit_items = [] total_profit = 0.0 total_contract = 0.0 for p in completed_this_month: settlement = calc_project_settlement(p.id, db) if settlement.get("contract_amount"): pl = settlement.get("profit_loss", 0) total_profit += pl total_contract += settlement["contract_amount"] profit_items.append({ "name": p.name, "contract": _fmt_money(settlement["contract_amount"]), "cost": _fmt_money(settlement["total_cost"]), "profit": _fmt_money(pl), "is_positive": pl >= 0, }) profit_rate = round(total_profit / total_contract * 100, 1) if total_contract > 0 else None # 损耗汇总 total_waste_secs = 0.0 total_target_secs = 0.0 for p in active_projects + completed_this_month: w = calc_waste_for_project(p.id, db) total_waste_secs += w.get("total_waste_seconds", 0) total_target_secs += p.target_total_seconds or 0 waste_rate = round(total_waste_secs / total_target_secs * 100, 1) if total_target_secs > 0 else 0 avg_per_person = round(month_total_secs / max(1, len(month_submitters)), 1) # AI 数据上下文 project_cost_lines = [ f"- {pc['name']}:人力 {pc['labor']} / AI工具 {pc['ai_tool']} / " f"外包 {pc['outsource']} / 固定 {pc['overhead']} → 总计 {pc['total']}" for pc in project_costs ] profit_lines = [ f"- {pi['name']}:回款 {pi['contract']},成本 {pi['cost']},利润 {'+'if pi['is_positive'] else ''}{pi['profit']}" for pi in profit_items ] # 风险检测 monthly_risks = analyze_project_risks(db) monthly_risk_lines = [ f"- {r['project_name']}:{';'.join(r['risk_factors'])}" for r in monthly_risks ] data_context = ( f"月份:{month_label}\n" f"进行中项目:{len(active_projects)} 个\n" f"本月完成项目:{len(completed_this_month)} 个\n" f"月度总产出:{_fmt_seconds(month_total_secs)}\n" f"月度总成本:{_fmt_money(total_all_cost)}\n" f"总损耗率:{waste_rate}%\n" f"参与人数:{len(month_submitters)}\n" f"人均产出:{_fmt_seconds(avg_per_person)}\n" f"各项目成本:\n" + "\n".join(project_cost_lines) + "\n" f"盈亏:\n" + ("\n".join(profit_lines) if profit_lines else "本月无结算项目") + "\n" f"风险项目:\n" + ("\n".join(monthly_risk_lines) if monthly_risk_lines else "无") ) ai_summary = generate_report_summary(data_context, "monthly") title = f"AirLabs 月报 — {month_label}" return { "title": title, "report_type": "monthly", "card_data": { "month_label": month_label, "active_count": len(active_projects), "completed_count": len(completed_this_month), "total_output": _fmt_seconds(month_total_secs), "total_cost": _fmt_money(total_all_cost), "project_costs": project_costs, "profit_items": profit_items, "profit_rate": profit_rate, "waste_total": _fmt_seconds(total_waste_secs), "waste_rate": waste_rate, "participant_count": len(month_submitters), "avg_per_person": _fmt_seconds(avg_per_person), "ai_summary": ai_summary, }, } # ──────────────────────────── 风险预警 ──────────────────────────── def analyze_project_risks(db: Session) -> list: """ 分析所有进行中项目的风险,返回风险列表 纯规则引擎判断,不依赖 AI """ today = date.today() active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() risks = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) # 进度只算中期制作产出,不把后期算进去 production_secs = db.query(sa_func.sum(Submission.total_seconds)).filter( Submission.project_id == p.id, Submission.total_seconds > 0, Submission.project_phase == PhaseGroup.PRODUCTION, ).scalar() or 0 target = p.target_total_seconds progress = round(production_secs / target * 100, 1) if target > 0 else 0 risk_factors = [] risk_level = "low" # 1. 超期检测 if p.estimated_completion_date: days_left = (p.estimated_completion_date - today).days if days_left < 0: risk_factors.append(f"已超期 {-days_left} 天") risk_level = "high" elif days_left <= 7 and progress < 80: risk_factors.append(f"距截止仅剩 {days_left} 天,进度仅 {progress}%") risk_level = "high" elif days_left <= 14 and progress < 60: risk_factors.append(f"距截止 {days_left} 天,进度 {progress}% 偏低") risk_level = "medium" # 进度落后于时间线 if p.estimated_completion_date and days_left > 0: if hasattr(p, 'created_at') and p.created_at: created = p.created_at.date() if hasattr(p.created_at, 'date') else p.created_at total_days = (p.estimated_completion_date - created).days elapsed_days = (today - created).days if total_days > 0: expected_progress = round(elapsed_days / total_days * 100, 1) if progress < expected_progress * 0.7: risk_factors.append( f"预期进度 {expected_progress}%,实际 {progress}%,严重落后" ) risk_level = "high" elif progress < expected_progress * 0.85: risk_factors.append( f"预期进度 {expected_progress}%,实际 {progress}%" ) if risk_level != "high": risk_level = "medium" # 2. 损耗率检测 waste_rate = waste.get("waste_rate", 0) if waste_rate > 80: risk_factors.append(f"损耗率 {waste_rate}%,严重偏高") risk_level = "high" elif waste_rate > 50: risk_factors.append(f"损耗率 {waste_rate}%,偏高") if risk_level != "high": risk_level = "medium" # 3. 近7天无提交 week_ago = today - timedelta(days=7) recent_subs = db.query(Submission).filter( Submission.project_id == p.id, Submission.submit_date >= week_ago, ).count() if recent_subs == 0 and production_secs > 0: risk_factors.append("近 7 天无提交,产出停滞") if risk_level != "high": risk_level = "medium" if risk_factors: risks.append({ "project_id": p.id, "project_name": p.name, "risk_level": risk_level, "progress": progress, "risk_factors": risk_factors, }) # 高风险排前面 level_order = {"high": 0, "medium": 1, "low": 2} risks.sort(key=lambda x: level_order.get(x["risk_level"], 99)) return risks