airlabs-manage/backend/services/report_service.py
seaislee1209 90707005ed
All checks were successful
Build and Deploy Backend / build-and-deploy (push) Successful in 1m27s
Build and Deploy Web / build-and-deploy (push) Successful in 54s
feat: V2功能增强 — 里程碑系统+圆环进度图+损耗修复+AI服务+报告系统
- 项目详情页三阶段里程碑管理(前期/制作/后期)
- 制作卡片改用180px ECharts圆环进度图+右侧数据列表
- 修复损耗率双重计算bug(测试秒数不再重复计入超产)
- 新增飞书推送服务、豆包AI风险分析、APScheduler定时报告
- 项目列表页增强(筛选/排序/批量操作/废弃功能)
- 成员详情页产出时间轴+效率对比
- 成本页固定开支管理

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 18:36:44 +08:00

508 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""报告生成服务 —— 汇总数据库数据 + 调用 AI 生成摘要"""
import logging
from datetime import date, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func
from models import (
User, Project, Submission, AIToolCost,
ProjectStatus, WorkType,
)
from calculations import (
calc_waste_for_project,
calc_labor_cost_for_project,
calc_ai_tool_cost_for_project,
calc_outsource_cost_for_project,
calc_overhead_cost_for_project,
calc_team_efficiency,
calc_project_settlement,
)
from services.ai_service import generate_report_summary
logger = logging.getLogger(__name__)
def _fmt_seconds(secs: float) -> str:
"""秒数格式化为 Xm Xs"""
if secs <= 0:
return "0s"
m = int(secs) // 60
s = int(secs) % 60
if m > 0:
return f"{m}m {s}s" if s > 0 else f"{m}m"
return f"{s}s"
def _fmt_money(amount: float) -> str:
"""金额格式化"""
if amount >= 10000:
return f"¥{amount/10000:.1f}"
return f"¥{amount:,.0f}"
# ──────────────────────────── 日报 ────────────────────────────
def generate_daily_report(db: Session) -> dict:
"""
生成日报
返回 {"title": str, "content": str, "data": dict}
"""
today = date.today()
# 今日提交
today_subs = db.query(Submission).filter(
Submission.submit_date == today
).all()
today_submitter_ids = set(s.user_id for s in today_subs)
today_total_secs = sum(s.total_seconds for s in today_subs if s.total_seconds > 0)
# 所有活跃用户(有提交记录的)
all_active_user_ids = set(
uid for (uid,) in db.query(Submission.user_id).distinct().all()
)
not_submitted = []
for uid in all_active_user_ids:
if uid not in today_submitter_ids:
user = db.query(User).filter(User.id == uid).first()
if user and user.is_active:
not_submitted.append(user.name)
# 进行中项目
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
project_lines = []
risk_lines = []
for p in active_projects:
waste = calc_waste_for_project(p.id, db)
total_secs = waste.get("total_submitted_seconds", 0)
target = p.target_total_seconds
progress = round(total_secs / target * 100, 1) if target > 0 else 0
# 今日该项目产出
proj_today_secs = sum(
s.total_seconds for s in today_subs
if s.project_id == p.id and s.total_seconds > 0
)
project_lines.append(
f"- **{p.name}**:进度 {progress}%,今日产出 {_fmt_seconds(proj_today_secs)}"
)
# 风险检测
if p.estimated_completion_date:
days_left = (p.estimated_completion_date - today).days
if days_left < 0:
risk_lines.append(f"- **{p.name}**:已超期 {-days_left} 天,进度 {progress}%")
elif days_left <= 7 and progress < 80:
risk_lines.append(f"- **{p.name}**:距截止仅剩 {days_left} 天,进度仅 {progress}%")
# 组装数据上下文(供 AI 使用)
data_context = (
f"日期:{today}\n"
f"进行中项目:{len(active_projects)}\n"
f"今日提交人次:{len(today_submitter_ids)}\n"
f"今日总产出:{_fmt_seconds(today_total_secs)}\n"
f"今日未提交人员:{', '.join(not_submitted) if not_submitted else ''}\n"
f"各项目情况:\n" + "\n".join(project_lines) + "\n"
f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "")
)
# 调用 AI 生成摘要
ai_summary = generate_report_summary(data_context, "daily")
# 组装飞书 markdown 内容
title = f"AirLabs 日报 — {today}"
lines = [
f"**【今日概览】**",
f"- 进行中项目:{len(active_projects)}",
f"- 今日提交:{len(today_submitter_ids)} 人次,总产出 {_fmt_seconds(today_total_secs)}",
]
if not_submitted:
lines.append(f"- 今日未提交:{', '.join(not_submitted)}")
lines.append("")
lines.append("**【各项目进展】**")
lines.extend(project_lines if project_lines else ["- 暂无进行中项目"])
if risk_lines:
lines.append("")
lines.append("**【风险提醒】**")
lines.extend(risk_lines)
if ai_summary:
lines.append("")
lines.append("**【AI 点评】**")
lines.append(ai_summary)
content = "\n".join(lines)
return {"title": title, "content": content, "data": {"date": str(today)}}
# ──────────────────────────── 周报 ────────────────────────────
def generate_weekly_report(db: Session) -> dict:
"""生成周报(本周一到当天的数据)"""
today = date.today()
# 本周一
monday = today - timedelta(days=today.weekday())
# 本周提交
week_subs = db.query(Submission).filter(
Submission.submit_date >= monday,
Submission.submit_date <= today,
).all()
week_submitter_ids = set(s.user_id for s in week_subs)
week_total_secs = sum(s.total_seconds for s in week_subs if s.total_seconds > 0)
working_days = min((today - monday).days + 1, 5)
avg_daily = round(week_total_secs / max(1, len(week_submitter_ids)) / max(1, working_days), 1)
# 进行中项目
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
# 各项目周报数据
project_lines = []
for p in active_projects:
waste = calc_waste_for_project(p.id, db)
total_secs = waste.get("total_submitted_seconds", 0)
target = p.target_total_seconds
progress = round(total_secs / target * 100, 1) if target > 0 else 0
proj_week_secs = sum(
s.total_seconds for s in week_subs
if s.project_id == p.id and s.total_seconds > 0
)
project_lines.append(
f"- **{p.name}**:当前进度 {progress}%,本周产出 {_fmt_seconds(proj_week_secs)}"
)
# 本周成本(简化:统计提交人的日成本)
week_labor = 0.0
processed = set()
for s in week_subs:
key = (s.user_id, s.submit_date)
if key not in processed:
processed.add(key)
user = db.query(User).filter(User.id == s.user_id).first()
if user:
week_labor += user.daily_cost
week_ai_cost = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.record_date >= monday,
AIToolCost.record_date <= today,
).scalar() or 0
# 损耗排行
waste_ranking = []
for p in active_projects:
w = calc_waste_for_project(p.id, db)
if w.get("total_waste_seconds", 0) > 0:
waste_ranking.append({
"name": p.name,
"rate": w["waste_rate"],
})
waste_ranking.sort(key=lambda x: x["rate"], reverse=True)
# 效率排行(找产出最高的人)
user_week_secs = {}
for s in week_subs:
if s.total_seconds > 0:
user_week_secs[s.user_id] = user_week_secs.get(s.user_id, 0) + s.total_seconds
top_producer = None
if user_week_secs:
top_uid = max(user_week_secs, key=user_week_secs.get)
top_user = db.query(User).filter(User.id == top_uid).first()
if top_user:
top_daily = round(user_week_secs[top_uid] / max(1, working_days), 1)
top_producer = f"{top_user.name}(日均 {_fmt_seconds(top_daily)}"
# AI 数据上下文
data_context = (
f"周期:{monday} ~ {today}\n"
f"进行中项目:{len(active_projects)}\n"
f"本周总产出:{_fmt_seconds(week_total_secs)}\n"
f"人均日产出:{_fmt_seconds(avg_daily)}\n"
f"效率最高:{top_producer or '暂无'}\n"
f"本周人力成本:{_fmt_money(week_labor)}\n"
f"本周AI工具成本{_fmt_money(week_ai_cost)}\n"
f"各项目:\n" + "\n".join(project_lines) + "\n"
f"损耗排行:\n" + "\n".join(
f"- {w['name']}{w['rate']}%" for w in waste_ranking[:5]
) if waste_ranking else "损耗排行:无"
)
ai_summary = generate_report_summary(data_context, "weekly")
# 组装内容
title = f"AirLabs 周报 — 第{today.isocalendar()[1]}周({monday} ~ {today}"
lines = [
"**【项目进展】**",
]
lines.extend(project_lines if project_lines else ["- 暂无进行中项目"])
lines.append("")
lines.append("**【团队产出】**")
lines.append(f"- 本周总产出:{_fmt_seconds(week_total_secs)}")
lines.append(f"- 人均日产出:{_fmt_seconds(avg_daily)}")
if top_producer:
lines.append(f"- 效率最高:{top_producer}")
lines.append("")
lines.append("**【成本概览】**")
lines.append(f"- 本周人力成本:{_fmt_money(week_labor)}")
lines.append(f"- 本周 AI 工具支出:{_fmt_money(week_ai_cost)}")
if waste_ranking:
lines.append("")
lines.append("**【损耗排行】**")
for w in waste_ranking[:5]:
lines.append(f"- {w['name']}:损耗率 {w['rate']}%")
if ai_summary:
lines.append("")
lines.append("**【AI 分析与建议】**")
lines.append(ai_summary)
content = "\n".join(lines)
return {"title": title, "content": content, "data": {"week_start": str(monday), "week_end": str(today)}}
# ──────────────────────────── 月报 ────────────────────────────
def generate_monthly_report(db: Session) -> dict:
"""生成月报上月完整数据在每月1号调用"""
today = date.today()
# 上月日期范围
first_of_this_month = today.replace(day=1)
last_of_prev_month = first_of_this_month - timedelta(days=1)
first_of_prev_month = last_of_prev_month.replace(day=1)
month_label = f"{last_of_prev_month.year}{last_of_prev_month.month}"
# 上月提交
month_subs = db.query(Submission).filter(
Submission.submit_date >= first_of_prev_month,
Submission.submit_date <= last_of_prev_month,
).all()
month_total_secs = sum(s.total_seconds for s in month_subs if s.total_seconds > 0)
month_submitters = set(s.user_id for s in month_subs)
# 所有项目(含进行中和上月完成的)
all_projects = db.query(Project).filter(
Project.status.in_([ProjectStatus.IN_PROGRESS, ProjectStatus.COMPLETED])
).all()
# 上月完成的项目
completed_this_month = [
p for p in all_projects
if p.status == ProjectStatus.COMPLETED
and p.actual_completion_date
and first_of_prev_month <= p.actual_completion_date <= last_of_prev_month
]
active_projects = [p for p in all_projects if p.status == ProjectStatus.IN_PROGRESS]
# 各项目成本
project_cost_lines = []
total_all_cost = 0.0
for p in active_projects + completed_this_month:
labor = calc_labor_cost_for_project(p.id, db)
ai_tool = calc_ai_tool_cost_for_project(p.id, db)
outsource = calc_outsource_cost_for_project(p.id, db)
overhead = calc_overhead_cost_for_project(p.id, db)
total = labor + ai_tool + outsource + overhead
total_all_cost += total
project_cost_lines.append(
f"- **{p.name}**:人力 {_fmt_money(labor)} / AI工具 {_fmt_money(ai_tool)} / "
f"外包 {_fmt_money(outsource)} / 固定 {_fmt_money(overhead)} → 总计 {_fmt_money(total)}"
)
# 盈亏概览(已结算的客户正式项目)
profit_lines = []
total_profit = 0.0
total_contract = 0.0
for p in completed_this_month:
settlement = calc_project_settlement(p.id, db)
if settlement.get("contract_amount"):
pl = settlement.get("profit_loss", 0)
total_profit += pl
total_contract += settlement["contract_amount"]
sign = "+" if pl >= 0 else ""
profit_lines.append(
f"- **{p.name}**:回款 {_fmt_money(settlement['contract_amount'])}"
f"成本 {_fmt_money(settlement['total_cost'])},利润 {sign}{_fmt_money(pl)}"
)
# 损耗汇总
total_waste_secs = 0.0
total_target_secs = 0.0
for p in active_projects + completed_this_month:
w = calc_waste_for_project(p.id, db)
total_waste_secs += w.get("total_waste_seconds", 0)
total_target_secs += p.target_total_seconds or 0
waste_rate = round(total_waste_secs / total_target_secs * 100, 1) if total_target_secs > 0 else 0
# 人均产出
working_days_month = 22
avg_per_person = round(month_total_secs / max(1, len(month_submitters)), 1)
# AI 数据上下文
data_context = (
f"月份:{month_label}\n"
f"进行中项目:{len(active_projects)}\n"
f"本月完成项目:{len(completed_this_month)}\n"
f"月度总产出:{_fmt_seconds(month_total_secs)}\n"
f"月度总成本:{_fmt_money(total_all_cost)}\n"
f"总损耗率:{waste_rate}%\n"
f"参与人数:{len(month_submitters)}\n"
f"人均产出:{_fmt_seconds(avg_per_person)}\n"
f"各项目成本:\n" + "\n".join(project_cost_lines) + "\n"
f"盈亏:\n" + ("\n".join(profit_lines) if profit_lines else "本月无结算项目")
)
ai_summary = generate_report_summary(data_context, "monthly")
# 组装内容
title = f"AirLabs 月报 — {month_label}"
lines = [
"**【月度总览】**",
f"- 进行中项目:{len(active_projects)}",
f"- 本月完成项目:{len(completed_this_month)}",
f"- 月度总产出:{_fmt_seconds(month_total_secs)}",
f"- 月度总成本:{_fmt_money(total_all_cost)}",
]
if project_cost_lines:
lines.append("")
lines.append("**【各项目成本明细】**")
lines.extend(project_cost_lines)
if profit_lines:
lines.append("")
lines.append("**【盈亏概览】**")
lines.extend(profit_lines)
if total_contract > 0:
profit_rate = round(total_profit / total_contract * 100, 1)
lines.append(f"- 总利润率:{profit_rate}%")
lines.append("")
lines.append("**【月度损耗】**")
lines.append(f"- 总损耗:{_fmt_seconds(total_waste_secs)}(损耗率 {waste_rate}%")
lines.append("")
lines.append("**【人均产出】**")
lines.append(f"- 参与人数:{len(month_submitters)}")
lines.append(f"- 月度人均产出:{_fmt_seconds(avg_per_person)}")
if ai_summary:
lines.append("")
lines.append("**【AI 深度分析】**")
lines.append(ai_summary)
content = "\n".join(lines)
return {
"title": title,
"content": content,
"data": {"month": month_label, "start": str(first_of_prev_month), "end": str(last_of_prev_month)},
}
# ──────────────────────────── 风险预警 ────────────────────────────
def analyze_project_risks(db: Session) -> list:
"""
分析所有进行中项目的风险,返回风险列表
纯规则引擎判断,不依赖 AI
"""
today = date.today()
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
risks = []
for p in active_projects:
waste = calc_waste_for_project(p.id, db)
total_secs = waste.get("total_submitted_seconds", 0)
target = p.target_total_seconds
progress = round(total_secs / target * 100, 1) if target > 0 else 0
risk_factors = []
risk_level = "low"
# 1. 超期检测
if p.estimated_completion_date:
days_left = (p.estimated_completion_date - today).days
if days_left < 0:
risk_factors.append(f"已超期 {-days_left}")
risk_level = "high"
elif days_left <= 7 and progress < 80:
risk_factors.append(f"距截止仅剩 {days_left} 天,进度仅 {progress}%")
risk_level = "high"
elif days_left <= 14 and progress < 60:
risk_factors.append(f"距截止 {days_left} 天,进度 {progress}% 偏低")
risk_level = "medium"
# 进度落后于时间线
if p.estimated_completion_date and days_left > 0:
# 估算已用天数
if hasattr(p, 'created_at') and p.created_at:
created = p.created_at.date() if hasattr(p.created_at, 'date') else p.created_at
total_days = (p.estimated_completion_date - created).days
elapsed_days = (today - created).days
if total_days > 0:
expected_progress = round(elapsed_days / total_days * 100, 1)
if progress < expected_progress * 0.7:
risk_factors.append(
f"预期进度 {expected_progress}%,实际 {progress}%,严重落后"
)
risk_level = "high"
elif progress < expected_progress * 0.85:
risk_factors.append(
f"预期进度 {expected_progress}%,实际 {progress}%"
)
if risk_level != "high":
risk_level = "medium"
# 2. 损耗率检测
waste_rate = waste.get("waste_rate", 0)
if waste_rate > 80:
risk_factors.append(f"损耗率 {waste_rate}%,严重偏高")
risk_level = "high"
elif waste_rate > 50:
risk_factors.append(f"损耗率 {waste_rate}%,偏高")
if risk_level != "high":
risk_level = "medium"
# 3. 近7天无提交
week_ago = today - timedelta(days=7)
recent_subs = db.query(Submission).filter(
Submission.project_id == p.id,
Submission.submit_date >= week_ago,
).count()
if recent_subs == 0 and total_secs > 0:
risk_factors.append("近 7 天无提交,产出停滞")
if risk_level != "high":
risk_level = "medium"
if risk_factors:
risks.append({
"project_id": p.id,
"project_name": p.name,
"risk_level": risk_level,
"progress": progress,
"risk_factors": risk_factors,
})
# 高风险排前面
level_order = {"high": 0, "medium": 1, "low": 2}
risks.sort(key=lambda x: level_order.get(x["risk_level"], 99))
return risks