"""AI 报告路由 —— 手动触发报告生成与飞书推送""" from typing import Optional from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from database import get_db from models import User from auth import require_permission router = APIRouter(prefix="/api/reports", tags=["AI报告"]) async def _push_card(card: dict, test_mobile: Optional[str] = None, receivers: list = None) -> dict: """推送卡片,支持 test_mobile 只推单人,或指定 receivers 列表""" from services.feishu_service import feishu if test_mobile: user_id = await feishu.get_user_id_by_mobile(test_mobile) if not user_id: return {"success": [], "failed": [{"mobile": test_mobile, "reason": "未找到用户"}]} ok = await feishu.send_card(user_id, card) if ok: return {"success": [test_mobile], "failed": []} return {"success": [], "failed": [{"mobile": test_mobile, "reason": "发送失败"}]} return await feishu.send_report_card_to_all(card, receivers=receivers) @router.post("/daily") async def trigger_daily_report( db: Session = Depends(get_db), current_user: User = Depends(require_permission("dashboard:view")), test_mobile: Optional[str] = Query(None, description="测试手机号,只推这一个人"), ): """手动触发日报生成并推送飞书""" from services.report_service import generate_daily_report from services.feishu_service import build_daily_card from config import DAILY_REPORT_RECEIVERS report = generate_daily_report(db) card = build_daily_card(report["title"], report["card_data"]) push_result = await _push_card(card, test_mobile, receivers=DAILY_REPORT_RECEIVERS) return { "message": "日报生成并推送完成", "title": report["title"], "card_data": report["card_data"], "push_result": push_result, } @router.post("/weekly") async def trigger_weekly_report( db: Session = Depends(get_db), current_user: User = Depends(require_permission("dashboard:view")), test_mobile: Optional[str] = Query(None, description="测试手机号,只推这一个人"), ): """手动触发周报生成并推送飞书""" from services.report_service import generate_weekly_report from services.feishu_service import build_weekly_card report = generate_weekly_report(db) card = build_weekly_card(report["title"], report["card_data"]) push_result = await _push_card(card, test_mobile) return { "message": "周报生成并推送完成", "title": report["title"], "card_data": report["card_data"], "push_result": push_result, } @router.post("/monthly") async def trigger_monthly_report( db: Session = Depends(get_db), current_user: User = Depends(require_permission("dashboard:view")), test_mobile: Optional[str] = Query(None, description="测试手机号,只推这一个人"), ): """手动触发月报生成并推送飞书""" from services.report_service import generate_monthly_report from services.feishu_service import build_monthly_card report = generate_monthly_report(db) card = build_monthly_card(report["title"], report["card_data"]) push_result = await _push_card(card, test_mobile) return { "message": "月报生成并推送完成", "title": report["title"], "card_data": report["card_data"], "push_result": push_result, } @router.post("/preview/{report_type}") async def preview_report( report_type: str, db: Session = Depends(get_db), current_user: User = Depends(require_permission("dashboard:view")), ): """预览报告内容(不推送飞书),返回结构化数据 + 卡片 JSON""" from services.report_service import ( generate_daily_report, generate_weekly_report, generate_monthly_report, ) from services.feishu_service import ( build_daily_card, build_weekly_card, build_monthly_card, ) generators = { "daily": (generate_daily_report, build_daily_card), "weekly": (generate_weekly_report, build_weekly_card), "monthly": (generate_monthly_report, build_monthly_card), } entry = generators.get(report_type) if not entry: return {"error": f"不支持的报告类型: {report_type},可选: daily, weekly, monthly"} gen_fn, build_fn = entry report = gen_fn(db) card = build_fn(report["title"], report["card_data"]) return { "title": report["title"], "card_data": report["card_data"], "card_json": card, } @router.get("/risks") def get_project_risks( db: Session = Depends(get_db), current_user: User = Depends(require_permission("dashboard:view")), ): """获取当前所有项目风险预警""" from services.report_service import analyze_project_risks return analyze_project_risks(db)