airlabs-manage/backend/services/report_service.py
seaislee1209 bc9fa5a798
All checks were successful
Build and Deploy Backend / build-and-deploy (push) Successful in 4m11s
Build and Deploy Web / build-and-deploy (push) Successful in 2m52s
fix: 报告进度只算中期产出 + 移除无效日报接收人
- 日报/周报进度从 total_submitted_seconds 改为只算 PRODUCTION 阶段
  (修复魔法少女 398%→118% 等虚高问题)
- 移除日报接收人 17762840667(飞书发送失败)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 21:51:36 +08:00

555 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""报告生成服务 —— 汇总数据库数据 + 调用 AI 生成摘要"""
import logging
from datetime import date, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func
from models import (
User, Project, Submission, AIToolCost, Role,
ProjectStatus, WorkType, PhaseGroup,
)
from calculations import (
calc_waste_for_project,
calc_labor_cost_for_project,
calc_ai_tool_cost_for_project,
calc_outsource_cost_for_project,
calc_overhead_cost_for_project,
calc_team_efficiency,
calc_project_settlement,
)
from services.ai_service import generate_report_summary
logger = logging.getLogger(__name__)
def _fmt_seconds(secs: float) -> str:
"""秒数格式化为 Xh Xm 或 Xm Xs"""
if secs <= 0:
return "0s"
h = int(secs) // 3600
m = (int(secs) % 3600) // 60
s = int(secs) % 60
if h > 0:
return f"{h}h {m}m" if m > 0 else f"{h}h"
if m > 0:
return f"{m}m {s}s" if s > 0 else f"{m}m"
return f"{s}s"
def _fmt_money(amount: float) -> str:
"""金额格式化"""
if amount >= 10000:
return f"¥{amount/10000:.1f}"
return f"¥{amount:,.0f}"
def _is_weekend(d: date) -> bool:
"""判断是否为周末(周六=5, 周日=6"""
return d.weekday() >= 5
# ──────────────────────────── 日报 ────────────────────────────
def generate_daily_report(db: Session) -> dict:
"""
生成日报,返回结构化数据供飞书卡片使用
"""
today = date.today()
is_weekend = _is_weekend(today)
weekday_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
weekday_label = weekday_names[today.weekday()]
# 今日提交
today_subs = db.query(Submission).filter(
Submission.submit_date == today
).all()
today_submitter_ids = set(s.user_id for s in today_subs)
# 产出只算中期制作阶段的动画秒数
today_total_secs = sum(
s.total_seconds for s in today_subs
if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION
)
# 未提交人员(仅工作日统计,且排除豁免角色)
not_submitted = []
if not is_weekend:
exempt_role_ids = set(
r.id for r in db.query(Role).filter(Role.exempt_submission == 1).all()
)
all_active_user_ids = set(
uid for (uid,) in db.query(Submission.user_id).distinct().all()
)
for uid in all_active_user_ids:
if uid not in today_submitter_ids:
user = db.query(User).filter(User.id == uid).first()
if user and user.is_active and user.role_id not in exempt_role_ids:
not_submitted.append(user.name)
# 进行中项目
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
projects_data = []
for p in active_projects:
# 只用中期产出算进度(与项目详情页一致)
prod_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == p.id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
target = p.target_total_seconds
progress = round(prod_secs / target * 100, 1) if target > 0 else 0
proj_today_secs = sum(
s.total_seconds for s in today_subs
if s.project_id == p.id and s.total_seconds > 0
and s.project_phase == PhaseGroup.PRODUCTION
)
projects_data.append({
"name": p.name,
"progress": progress,
"today_output": _fmt_seconds(proj_today_secs),
})
# 风险检测(使用完整风险引擎)
full_risks = analyze_project_risks(db)
risks_data = [
{
"name": r["project_name"],
"level": r["risk_level"],
"detail": "".join(r["risk_factors"]),
}
for r in full_risks
]
# AI 数据上下文
project_lines = [
f"- {p['name']}:进度 {p['progress']}%,今日产出 {p['today_output']}"
for p in projects_data
]
risk_lines = [f"- {r['name']}{r['detail']}" for r in risks_data]
data_context = (
f"日期:{today}{weekday_label}{',周末' if is_weekend else ''}\n"
f"进行中项目:{len(active_projects)}\n"
f"今日提交人次:{len(today_submitter_ids)}\n"
f"今日总产出:{_fmt_seconds(today_total_secs)}\n"
)
if not is_weekend:
data_context += f"今日未提交人员:{', '.join(not_submitted) if not_submitted else ''}\n"
else:
data_context += "今天是周末,不统计未提交\n"
data_context += (
f"各项目情况:\n" + "\n".join(project_lines) + "\n"
f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "")
)
ai_summary = generate_report_summary(data_context, "daily")
title = f"AirLabs 日报 — {today}{weekday_label}"
return {
"title": title,
"report_type": "daily",
"card_data": {
"is_weekend": is_weekend,
"weekday_label": weekday_label,
"active_project_count": len(active_projects),
"submitter_count": len(today_submitter_ids),
"total_output": _fmt_seconds(today_total_secs),
"not_submitted": not_submitted,
"projects": projects_data,
"risks": risks_data,
"ai_summary": ai_summary,
},
}
# ──────────────────────────── 周报 ────────────────────────────
def generate_weekly_report(db: Session) -> dict:
"""生成周报(本周一到当天的数据),返回结构化数据"""
today = date.today()
monday = today - timedelta(days=today.weekday())
week_subs = db.query(Submission).filter(
Submission.submit_date >= monday,
Submission.submit_date <= today,
).all()
week_submitter_ids = set(s.user_id for s in week_subs)
# 产出只算中期制作阶段的动画秒数
week_total_secs = sum(
s.total_seconds for s in week_subs
if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION
)
working_days = min((today - monday).days + 1, 5)
avg_daily = round(week_total_secs / max(1, len(week_submitter_ids)) / max(1, working_days), 1)
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
projects_data = []
for p in active_projects:
# 只用中期产出算进度(与项目详情页一致)
prod_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == p.id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
target = p.target_total_seconds
progress = round(prod_secs / target * 100, 1) if target > 0 else 0
proj_week_secs = sum(
s.total_seconds for s in week_subs
if s.project_id == p.id and s.total_seconds > 0
and s.project_phase == PhaseGroup.PRODUCTION
)
projects_data.append({
"name": p.name,
"progress": progress,
"week_output": _fmt_seconds(proj_week_secs),
})
# 本周成本
week_labor = 0.0
processed = set()
for s in week_subs:
key = (s.user_id, s.submit_date)
if key not in processed:
processed.add(key)
user = db.query(User).filter(User.id == s.user_id).first()
if user:
week_labor += user.daily_cost
week_ai_cost = db.query(sa_func.sum(AIToolCost.amount)).filter(
AIToolCost.record_date >= monday,
AIToolCost.record_date <= today,
).scalar() or 0
# 损耗排行
waste_ranking = []
for p in active_projects:
w = calc_waste_for_project(p.id, db)
if w.get("total_waste_seconds", 0) > 0:
waste_ranking.append({"name": p.name, "rate": w["waste_rate"]})
waste_ranking.sort(key=lambda x: x["rate"], reverse=True)
# 效率之星 —— 跨项目加权通过率(有效产出 / 总产出,扣除返工)
from collections import defaultdict
user_prod_secs = defaultdict(float) # 制作秒数
user_rev_secs = defaultdict(float) # 返工秒数
for s in week_subs:
if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION:
if s.work_type == WorkType.PRODUCTION:
user_prod_secs[s.user_id] += s.total_seconds
elif s.work_type == WorkType.REVISION:
user_rev_secs[s.user_id] += s.total_seconds
top_producer = None
best_rate = -1
best_uid = None
for uid, prod in user_prod_secs.items():
if prod < 60: # 至少 1 分钟制作产出才参与排名
continue
rev = user_rev_secs.get(uid, 0)
rate = (prod - rev) / prod if prod > 0 else 0
if rate > best_rate:
best_rate = rate
best_uid = uid
if best_uid is not None and best_rate >= 0:
top_user = db.query(User).filter(User.id == best_uid).first()
if top_user:
top_producer = f"{top_user.name}(通过率 {round(best_rate * 100, 1)}%"
# AI 数据上下文
project_lines = [
f"- {p['name']}:当前进度 {p['progress']}%,本周产出 {p['week_output']}"
for p in projects_data
]
# 风险检测
full_risks = analyze_project_risks(db)
risk_lines = [
f"- {r['project_name']}{''.join(r['risk_factors'])}"
for r in full_risks
]
data_context = (
f"周期:{monday} ~ {today}\n"
f"进行中项目:{len(active_projects)}\n"
f"本周总产出:{_fmt_seconds(week_total_secs)}\n"
f"人均日产出:{_fmt_seconds(avg_daily)}\n"
f"效率最高:{top_producer or '暂无'}\n"
f"本周人力成本:{_fmt_money(week_labor)}\n"
f"本周AI工具成本{_fmt_money(week_ai_cost)}\n"
f"各项目:\n" + "\n".join(project_lines) + "\n"
f"损耗排行:\n" + ("\n".join(
f"- {w['name']}{w['rate']}%" for w in waste_ranking[:5]
) if waste_ranking else "") + "\n"
f"风险项目:\n" + ("\n".join(risk_lines) if risk_lines else "")
)
ai_summary = generate_report_summary(data_context, "weekly")
week_num = today.isocalendar()[1]
title = f"AirLabs 周报 — 第{week_num}周({monday} ~ {today}"
return {
"title": title,
"report_type": "weekly",
"card_data": {
"week_range": f"{monday} ~ {today}",
"week_num": week_num,
"total_output": _fmt_seconds(week_total_secs),
"avg_daily_output": _fmt_seconds(avg_daily),
"top_producer": top_producer,
"projects": projects_data,
"labor_cost": _fmt_money(week_labor),
"ai_tool_cost": _fmt_money(week_ai_cost),
"waste_ranking": waste_ranking[:5],
"ai_summary": ai_summary,
},
}
# ──────────────────────────── 月报 ────────────────────────────
def generate_monthly_report(db: Session) -> dict:
"""生成月报(上月完整数据),返回结构化数据"""
today = date.today()
first_of_this_month = today.replace(day=1)
last_of_prev_month = first_of_this_month - timedelta(days=1)
first_of_prev_month = last_of_prev_month.replace(day=1)
month_label = f"{last_of_prev_month.year}{last_of_prev_month.month}"
month_subs = db.query(Submission).filter(
Submission.submit_date >= first_of_prev_month,
Submission.submit_date <= last_of_prev_month,
).all()
# 产出只算中期制作阶段的动画秒数
month_total_secs = sum(
s.total_seconds for s in month_subs
if s.total_seconds > 0 and s.project_phase == PhaseGroup.PRODUCTION
)
month_submitters = set(s.user_id for s in month_subs)
all_projects = db.query(Project).filter(
Project.status.in_([ProjectStatus.IN_PROGRESS, ProjectStatus.COMPLETED])
).all()
completed_this_month = [
p for p in all_projects
if p.status == ProjectStatus.COMPLETED
and p.actual_completion_date
and first_of_prev_month <= p.actual_completion_date <= last_of_prev_month
]
active_projects = [p for p in all_projects if p.status == ProjectStatus.IN_PROGRESS]
# 各项目成本
project_costs = []
total_all_cost = 0.0
for p in active_projects + completed_this_month:
labor = calc_labor_cost_for_project(p.id, db)
ai_tool = calc_ai_tool_cost_for_project(p.id, db)
outsource = calc_outsource_cost_for_project(p.id, db)
overhead = calc_overhead_cost_for_project(p.id, db)
total = labor + ai_tool + outsource + overhead
total_all_cost += total
project_costs.append({
"name": p.name,
"labor": _fmt_money(labor),
"ai_tool": _fmt_money(ai_tool),
"outsource": _fmt_money(outsource),
"overhead": _fmt_money(overhead),
"total": _fmt_money(total),
})
# 盈亏
profit_items = []
total_profit = 0.0
total_contract = 0.0
for p in completed_this_month:
settlement = calc_project_settlement(p.id, db)
if settlement.get("contract_amount"):
pl = settlement.get("profit_loss", 0)
total_profit += pl
total_contract += settlement["contract_amount"]
profit_items.append({
"name": p.name,
"contract": _fmt_money(settlement["contract_amount"]),
"cost": _fmt_money(settlement["total_cost"]),
"profit": _fmt_money(pl),
"is_positive": pl >= 0,
})
profit_rate = round(total_profit / total_contract * 100, 1) if total_contract > 0 else None
# 损耗汇总
total_waste_secs = 0.0
total_target_secs = 0.0
for p in active_projects + completed_this_month:
w = calc_waste_for_project(p.id, db)
total_waste_secs += w.get("total_waste_seconds", 0)
total_target_secs += p.target_total_seconds or 0
waste_rate = round(total_waste_secs / total_target_secs * 100, 1) if total_target_secs > 0 else 0
avg_per_person = round(month_total_secs / max(1, len(month_submitters)), 1)
# AI 数据上下文
project_cost_lines = [
f"- {pc['name']}:人力 {pc['labor']} / AI工具 {pc['ai_tool']} / "
f"外包 {pc['outsource']} / 固定 {pc['overhead']} → 总计 {pc['total']}"
for pc in project_costs
]
profit_lines = [
f"- {pi['name']}:回款 {pi['contract']},成本 {pi['cost']},利润 {'+'if pi['is_positive'] else ''}{pi['profit']}"
for pi in profit_items
]
# 风险检测
monthly_risks = analyze_project_risks(db)
monthly_risk_lines = [
f"- {r['project_name']}{''.join(r['risk_factors'])}"
for r in monthly_risks
]
data_context = (
f"月份:{month_label}\n"
f"进行中项目:{len(active_projects)}\n"
f"本月完成项目:{len(completed_this_month)}\n"
f"月度总产出:{_fmt_seconds(month_total_secs)}\n"
f"月度总成本:{_fmt_money(total_all_cost)}\n"
f"总损耗率:{waste_rate}%\n"
f"参与人数:{len(month_submitters)}\n"
f"人均产出:{_fmt_seconds(avg_per_person)}\n"
f"各项目成本:\n" + "\n".join(project_cost_lines) + "\n"
f"盈亏:\n" + ("\n".join(profit_lines) if profit_lines else "本月无结算项目") + "\n"
f"风险项目:\n" + ("\n".join(monthly_risk_lines) if monthly_risk_lines else "")
)
ai_summary = generate_report_summary(data_context, "monthly")
title = f"AirLabs 月报 — {month_label}"
return {
"title": title,
"report_type": "monthly",
"card_data": {
"month_label": month_label,
"active_count": len(active_projects),
"completed_count": len(completed_this_month),
"total_output": _fmt_seconds(month_total_secs),
"total_cost": _fmt_money(total_all_cost),
"project_costs": project_costs,
"profit_items": profit_items,
"profit_rate": profit_rate,
"waste_total": _fmt_seconds(total_waste_secs),
"waste_rate": waste_rate,
"participant_count": len(month_submitters),
"avg_per_person": _fmt_seconds(avg_per_person),
"ai_summary": ai_summary,
},
}
# ──────────────────────────── 风险预警 ────────────────────────────
def analyze_project_risks(db: Session) -> list:
"""
分析所有进行中项目的风险,返回风险列表
纯规则引擎判断,不依赖 AI
"""
today = date.today()
active_projects = db.query(Project).filter(
Project.status == ProjectStatus.IN_PROGRESS
).all()
risks = []
for p in active_projects:
waste = calc_waste_for_project(p.id, db)
# 进度只算中期制作产出,不把后期算进去
production_secs = db.query(sa_func.sum(Submission.total_seconds)).filter(
Submission.project_id == p.id,
Submission.total_seconds > 0,
Submission.project_phase == PhaseGroup.PRODUCTION,
).scalar() or 0
target = p.target_total_seconds
progress = round(production_secs / target * 100, 1) if target > 0 else 0
risk_factors = []
risk_level = "low"
# 1. 超期检测
if p.estimated_completion_date:
days_left = (p.estimated_completion_date - today).days
if days_left < 0:
risk_factors.append(f"已超期 {-days_left}")
risk_level = "high"
elif days_left <= 7 and progress < 80:
risk_factors.append(f"距截止仅剩 {days_left} 天,进度仅 {progress}%")
risk_level = "high"
elif days_left <= 14 and progress < 60:
risk_factors.append(f"距截止 {days_left} 天,进度 {progress}% 偏低")
risk_level = "medium"
# 进度落后于时间线
if p.estimated_completion_date and days_left > 0:
if hasattr(p, 'created_at') and p.created_at:
created = p.created_at.date() if hasattr(p.created_at, 'date') else p.created_at
total_days = (p.estimated_completion_date - created).days
elapsed_days = (today - created).days
if total_days > 0:
expected_progress = round(elapsed_days / total_days * 100, 1)
if progress < expected_progress * 0.7:
risk_factors.append(
f"预期进度 {expected_progress}%,实际 {progress}%,严重落后"
)
risk_level = "high"
elif progress < expected_progress * 0.85:
risk_factors.append(
f"预期进度 {expected_progress}%,实际 {progress}%"
)
if risk_level != "high":
risk_level = "medium"
# 2. 损耗率检测
waste_rate = waste.get("waste_rate", 0)
if waste_rate > 80:
risk_factors.append(f"损耗率 {waste_rate}%,严重偏高")
risk_level = "high"
elif waste_rate > 50:
risk_factors.append(f"损耗率 {waste_rate}%,偏高")
if risk_level != "high":
risk_level = "medium"
# 3. 近7天无提交
week_ago = today - timedelta(days=7)
recent_subs = db.query(Submission).filter(
Submission.project_id == p.id,
Submission.submit_date >= week_ago,
).count()
if recent_subs == 0 and production_secs > 0:
risk_factors.append("近 7 天无提交,产出停滞")
if risk_level != "high":
risk_level = "medium"
if risk_factors:
risks.append({
"project_id": p.id,
"project_name": p.name,
"risk_level": risk_level,
"progress": progress,
"risk_factors": risk_factors,
})
# 高风险排前面
level_order = {"high": 0, "medium": 1, "low": 2}
risks.sort(key=lambda x: level_order.get(x["risk_level"], 99))
return risks