"""报告生成服务 —— 汇总数据库数据 + 调用 AI 生成摘要""" import logging from datetime import date, timedelta from sqlalchemy.orm import Session from sqlalchemy import func as sa_func from models import ( User, Project, Submission, AIToolCost, ProjectStatus, WorkType, ) from calculations import ( calc_waste_for_project, calc_labor_cost_for_project, calc_ai_tool_cost_for_project, calc_outsource_cost_for_project, calc_overhead_cost_for_project, calc_team_efficiency, calc_project_settlement, ) from services.ai_service import generate_report_summary logger = logging.getLogger(__name__) def _fmt_seconds(secs: float) -> str: """秒数格式化为 Xm Xs""" if secs <= 0: return "0s" m = int(secs) // 60 s = int(secs) % 60 if m > 0: return f"{m}m {s}s" if s > 0 else f"{m}m" return f"{s}s" def _fmt_money(amount: float) -> str: """金额格式化""" if amount >= 10000: return f"¥{amount/10000:.1f}万" return f"¥{amount:,.0f}" # ──────────────────────────── 日报 ──────────────────────────── def generate_daily_report(db: Session) -> dict: """ 生成日报 返回 {"title": str, "content": str, "data": dict} """ today = date.today() # 今日提交 today_subs = db.query(Submission).filter( Submission.submit_date == today ).all() today_submitter_ids = set(s.user_id for s in today_subs) today_total_secs = sum(s.total_seconds for s in today_subs if s.total_seconds > 0) # 所有活跃用户(有提交记录的) all_active_user_ids = set( uid for (uid,) in db.query(Submission.user_id).distinct().all() ) not_submitted = [] for uid in all_active_user_ids: if uid not in today_submitter_ids: user = db.query(User).filter(User.id == uid).first() if user and user.is_active: not_submitted.append(user.name) # 进行中项目 active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() project_lines = [] risk_lines = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) total_secs = waste.get("total_submitted_seconds", 0) target = p.target_total_seconds progress = round(total_secs / target * 100, 1) if target > 0 else 0 # 今日该项目产出 proj_today_secs = sum( s.total_seconds for s in today_subs if s.project_id == p.id and s.total_seconds > 0 ) project_lines.append( f"- **{p.name}**:进度 {progress}%,今日产出 {_fmt_seconds(proj_today_secs)}" ) # 风险检测 if p.estimated_completion_date: days_left = (p.estimated_completion_date - today).days if days_left < 0: risk_lines.append(f"- **{p.name}**:已超期 {-days_left} 天,进度 {progress}%") elif days_left <= 7 and progress < 80: risk_lines.append(f"- **{p.name}**:距截止仅剩 {days_left} 天,进度仅 {progress}%") # 组装数据上下文(供 AI 使用) data_context = ( f"日期:{today}\n" f"进行中项目:{len(active_projects)} 个\n" f"今日提交人次:{len(today_submitter_ids)}\n" f"今日总产出:{_fmt_seconds(today_total_secs)}\n" f"今日未提交人员:{', '.join(not_submitted) if not_submitted else '无'}\n" f"各项目情况:\n" + "\n".join(project_lines) + "\n" f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "无") ) # 调用 AI 生成摘要 ai_summary = generate_report_summary(data_context, "daily") # 组装飞书 markdown 内容 title = f"AirLabs 日报 — {today}" lines = [ f"**【今日概览】**", f"- 进行中项目:{len(active_projects)} 个", f"- 今日提交:{len(today_submitter_ids)} 人次,总产出 {_fmt_seconds(today_total_secs)}", ] if not_submitted: lines.append(f"- 今日未提交:{', '.join(not_submitted)}") lines.append("") lines.append("**【各项目进展】**") lines.extend(project_lines if project_lines else ["- 暂无进行中项目"]) if risk_lines: lines.append("") lines.append("**【风险提醒】**") lines.extend(risk_lines) if ai_summary: lines.append("") lines.append("**【AI 点评】**") lines.append(ai_summary) content = "\n".join(lines) return {"title": title, "content": content, "data": {"date": str(today)}} # ──────────────────────────── 周报 ──────────────────────────── def generate_weekly_report(db: Session) -> dict: """生成周报(本周一到当天的数据)""" today = date.today() # 本周一 monday = today - timedelta(days=today.weekday()) # 本周提交 week_subs = db.query(Submission).filter( Submission.submit_date >= monday, Submission.submit_date <= today, ).all() week_submitter_ids = set(s.user_id for s in week_subs) week_total_secs = sum(s.total_seconds for s in week_subs if s.total_seconds > 0) working_days = min((today - monday).days + 1, 5) avg_daily = round(week_total_secs / max(1, len(week_submitter_ids)) / max(1, working_days), 1) # 进行中项目 active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() # 各项目周报数据 project_lines = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) total_secs = waste.get("total_submitted_seconds", 0) target = p.target_total_seconds progress = round(total_secs / target * 100, 1) if target > 0 else 0 proj_week_secs = sum( s.total_seconds for s in week_subs if s.project_id == p.id and s.total_seconds > 0 ) project_lines.append( f"- **{p.name}**:当前进度 {progress}%,本周产出 {_fmt_seconds(proj_week_secs)}" ) # 本周成本(简化:统计提交人的日成本) week_labor = 0.0 processed = set() for s in week_subs: key = (s.user_id, s.submit_date) if key not in processed: processed.add(key) user = db.query(User).filter(User.id == s.user_id).first() if user: week_labor += user.daily_cost week_ai_cost = db.query(sa_func.sum(AIToolCost.amount)).filter( AIToolCost.record_date >= monday, AIToolCost.record_date <= today, ).scalar() or 0 # 损耗排行 waste_ranking = [] for p in active_projects: w = calc_waste_for_project(p.id, db) if w.get("total_waste_seconds", 0) > 0: waste_ranking.append({ "name": p.name, "rate": w["waste_rate"], }) waste_ranking.sort(key=lambda x: x["rate"], reverse=True) # 效率排行(找产出最高的人) user_week_secs = {} for s in week_subs: if s.total_seconds > 0: user_week_secs[s.user_id] = user_week_secs.get(s.user_id, 0) + s.total_seconds top_producer = None if user_week_secs: top_uid = max(user_week_secs, key=user_week_secs.get) top_user = db.query(User).filter(User.id == top_uid).first() if top_user: top_daily = round(user_week_secs[top_uid] / max(1, working_days), 1) top_producer = f"{top_user.name}(日均 {_fmt_seconds(top_daily)})" # AI 数据上下文 data_context = ( f"周期:{monday} ~ {today}\n" f"进行中项目:{len(active_projects)} 个\n" f"本周总产出:{_fmt_seconds(week_total_secs)}\n" f"人均日产出:{_fmt_seconds(avg_daily)}\n" f"效率最高:{top_producer or '暂无'}\n" f"本周人力成本:{_fmt_money(week_labor)}\n" f"本周AI工具成本:{_fmt_money(week_ai_cost)}\n" f"各项目:\n" + "\n".join(project_lines) + "\n" f"损耗排行:\n" + "\n".join( f"- {w['name']}:{w['rate']}%" for w in waste_ranking[:5] ) if waste_ranking else "损耗排行:无" ) ai_summary = generate_report_summary(data_context, "weekly") # 组装内容 title = f"AirLabs 周报 — 第{today.isocalendar()[1]}周({monday} ~ {today})" lines = [ "**【项目进展】**", ] lines.extend(project_lines if project_lines else ["- 暂无进行中项目"]) lines.append("") lines.append("**【团队产出】**") lines.append(f"- 本周总产出:{_fmt_seconds(week_total_secs)}") lines.append(f"- 人均日产出:{_fmt_seconds(avg_daily)}") if top_producer: lines.append(f"- 效率最高:{top_producer}") lines.append("") lines.append("**【成本概览】**") lines.append(f"- 本周人力成本:{_fmt_money(week_labor)}") lines.append(f"- 本周 AI 工具支出:{_fmt_money(week_ai_cost)}") if waste_ranking: lines.append("") lines.append("**【损耗排行】**") for w in waste_ranking[:5]: lines.append(f"- {w['name']}:损耗率 {w['rate']}%") if ai_summary: lines.append("") lines.append("**【AI 分析与建议】**") lines.append(ai_summary) content = "\n".join(lines) return {"title": title, "content": content, "data": {"week_start": str(monday), "week_end": str(today)}} # ──────────────────────────── 月报 ──────────────────────────── def generate_monthly_report(db: Session) -> dict: """生成月报(上月完整数据,在每月1号调用)""" today = date.today() # 上月日期范围 first_of_this_month = today.replace(day=1) last_of_prev_month = first_of_this_month - timedelta(days=1) first_of_prev_month = last_of_prev_month.replace(day=1) month_label = f"{last_of_prev_month.year}年{last_of_prev_month.month}月" # 上月提交 month_subs = db.query(Submission).filter( Submission.submit_date >= first_of_prev_month, Submission.submit_date <= last_of_prev_month, ).all() month_total_secs = sum(s.total_seconds for s in month_subs if s.total_seconds > 0) month_submitters = set(s.user_id for s in month_subs) # 所有项目(含进行中和上月完成的) all_projects = db.query(Project).filter( Project.status.in_([ProjectStatus.IN_PROGRESS, ProjectStatus.COMPLETED]) ).all() # 上月完成的项目 completed_this_month = [ p for p in all_projects if p.status == ProjectStatus.COMPLETED and p.actual_completion_date and first_of_prev_month <= p.actual_completion_date <= last_of_prev_month ] active_projects = [p for p in all_projects if p.status == ProjectStatus.IN_PROGRESS] # 各项目成本 project_cost_lines = [] total_all_cost = 0.0 for p in active_projects + completed_this_month: labor = calc_labor_cost_for_project(p.id, db) ai_tool = calc_ai_tool_cost_for_project(p.id, db) outsource = calc_outsource_cost_for_project(p.id, db) overhead = calc_overhead_cost_for_project(p.id, db) total = labor + ai_tool + outsource + overhead total_all_cost += total project_cost_lines.append( f"- **{p.name}**:人力 {_fmt_money(labor)} / AI工具 {_fmt_money(ai_tool)} / " f"外包 {_fmt_money(outsource)} / 固定 {_fmt_money(overhead)} → 总计 {_fmt_money(total)}" ) # 盈亏概览(已结算的客户正式项目) profit_lines = [] total_profit = 0.0 total_contract = 0.0 for p in completed_this_month: settlement = calc_project_settlement(p.id, db) if settlement.get("contract_amount"): pl = settlement.get("profit_loss", 0) total_profit += pl total_contract += settlement["contract_amount"] sign = "+" if pl >= 0 else "" profit_lines.append( f"- **{p.name}**:回款 {_fmt_money(settlement['contract_amount'])}," f"成本 {_fmt_money(settlement['total_cost'])},利润 {sign}{_fmt_money(pl)}" ) # 损耗汇总 total_waste_secs = 0.0 total_target_secs = 0.0 for p in active_projects + completed_this_month: w = calc_waste_for_project(p.id, db) total_waste_secs += w.get("total_waste_seconds", 0) total_target_secs += p.target_total_seconds or 0 waste_rate = round(total_waste_secs / total_target_secs * 100, 1) if total_target_secs > 0 else 0 # 人均产出 working_days_month = 22 avg_per_person = round(month_total_secs / max(1, len(month_submitters)), 1) # AI 数据上下文 data_context = ( f"月份:{month_label}\n" f"进行中项目:{len(active_projects)} 个\n" f"本月完成项目:{len(completed_this_month)} 个\n" f"月度总产出:{_fmt_seconds(month_total_secs)}\n" f"月度总成本:{_fmt_money(total_all_cost)}\n" f"总损耗率:{waste_rate}%\n" f"参与人数:{len(month_submitters)}\n" f"人均产出:{_fmt_seconds(avg_per_person)}\n" f"各项目成本:\n" + "\n".join(project_cost_lines) + "\n" f"盈亏:\n" + ("\n".join(profit_lines) if profit_lines else "本月无结算项目") ) ai_summary = generate_report_summary(data_context, "monthly") # 组装内容 title = f"AirLabs 月报 — {month_label}" lines = [ "**【月度总览】**", f"- 进行中项目:{len(active_projects)} 个", f"- 本月完成项目:{len(completed_this_month)} 个", f"- 月度总产出:{_fmt_seconds(month_total_secs)}", f"- 月度总成本:{_fmt_money(total_all_cost)}", ] if project_cost_lines: lines.append("") lines.append("**【各项目成本明细】**") lines.extend(project_cost_lines) if profit_lines: lines.append("") lines.append("**【盈亏概览】**") lines.extend(profit_lines) if total_contract > 0: profit_rate = round(total_profit / total_contract * 100, 1) lines.append(f"- 总利润率:{profit_rate}%") lines.append("") lines.append("**【月度损耗】**") lines.append(f"- 总损耗:{_fmt_seconds(total_waste_secs)}(损耗率 {waste_rate}%)") lines.append("") lines.append("**【人均产出】**") lines.append(f"- 参与人数:{len(month_submitters)} 人") lines.append(f"- 月度人均产出:{_fmt_seconds(avg_per_person)}") if ai_summary: lines.append("") lines.append("**【AI 深度分析】**") lines.append(ai_summary) content = "\n".join(lines) return { "title": title, "content": content, "data": {"month": month_label, "start": str(first_of_prev_month), "end": str(last_of_prev_month)}, } # ──────────────────────────── 风险预警 ──────────────────────────── def analyze_project_risks(db: Session) -> list: """ 分析所有进行中项目的风险,返回风险列表 纯规则引擎判断,不依赖 AI """ today = date.today() active_projects = db.query(Project).filter( Project.status == ProjectStatus.IN_PROGRESS ).all() risks = [] for p in active_projects: waste = calc_waste_for_project(p.id, db) total_secs = waste.get("total_submitted_seconds", 0) target = p.target_total_seconds progress = round(total_secs / target * 100, 1) if target > 0 else 0 risk_factors = [] risk_level = "low" # 1. 超期检测 if p.estimated_completion_date: days_left = (p.estimated_completion_date - today).days if days_left < 0: risk_factors.append(f"已超期 {-days_left} 天") risk_level = "high" elif days_left <= 7 and progress < 80: risk_factors.append(f"距截止仅剩 {days_left} 天,进度仅 {progress}%") risk_level = "high" elif days_left <= 14 and progress < 60: risk_factors.append(f"距截止 {days_left} 天,进度 {progress}% 偏低") risk_level = "medium" # 进度落后于时间线 if p.estimated_completion_date and days_left > 0: # 估算已用天数 if hasattr(p, 'created_at') and p.created_at: created = p.created_at.date() if hasattr(p.created_at, 'date') else p.created_at total_days = (p.estimated_completion_date - created).days elapsed_days = (today - created).days if total_days > 0: expected_progress = round(elapsed_days / total_days * 100, 1) if progress < expected_progress * 0.7: risk_factors.append( f"预期进度 {expected_progress}%,实际 {progress}%,严重落后" ) risk_level = "high" elif progress < expected_progress * 0.85: risk_factors.append( f"预期进度 {expected_progress}%,实际 {progress}%" ) if risk_level != "high": risk_level = "medium" # 2. 损耗率检测 waste_rate = waste.get("waste_rate", 0) if waste_rate > 80: risk_factors.append(f"损耗率 {waste_rate}%,严重偏高") risk_level = "high" elif waste_rate > 50: risk_factors.append(f"损耗率 {waste_rate}%,偏高") if risk_level != "high": risk_level = "medium" # 3. 近7天无提交 week_ago = today - timedelta(days=7) recent_subs = db.query(Submission).filter( Submission.project_id == p.id, Submission.submit_date >= week_ago, ).count() if recent_subs == 0 and total_secs > 0: risk_factors.append("近 7 天无提交,产出停滞") if risk_level != "high": risk_level = "medium" if risk_factors: risks.append({ "project_id": p.id, "project_name": p.name, "risk_level": risk_level, "progress": progress, "risk_factors": risk_factors, }) # 高风险排前面 level_order = {"high": 0, "medium": 1, "low": 2} risks.sort(key=lambda x: level_order.get(x["risk_level"], 99)) return risks