airlabs-manage/backend/services/scheduler_service.py
seaislee1209 90707005ed
All checks were successful
Build and Deploy Backend / build-and-deploy (push) Successful in 1m27s
Build and Deploy Web / build-and-deploy (push) Successful in 54s
feat: V2功能增强 — 里程碑系统+圆环进度图+损耗修复+AI服务+报告系统
- 项目详情页三阶段里程碑管理(前期/制作/后期)
- 制作卡片改用180px ECharts圆环进度图+右侧数据列表
- 修复损耗率双重计算bug(测试秒数不再重复计入超产)
- 新增飞书推送服务、豆包AI风险分析、APScheduler定时报告
- 项目列表页增强(筛选/排序/批量操作/废弃功能)
- 成员详情页产出时间轴+效率对比
- 成本页固定开支管理

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 18:36:44 +08:00

79 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""APScheduler 定时任务 —— 自动生成报告并推送飞书"""
import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from database import SessionLocal
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
async def _run_report_job(report_type: str):
"""通用报告任务执行器"""
from services.report_service import (
generate_daily_report, generate_weekly_report, generate_monthly_report,
)
from services.feishu_service import feishu
logger.info(f"[定时任务] 开始生成{report_type}...")
db = SessionLocal()
try:
if report_type == "日报":
result = generate_daily_report(db)
elif report_type == "周报":
result = generate_weekly_report(db)
elif report_type == "月报":
result = generate_monthly_report(db)
else:
logger.error(f"未知报告类型: {report_type}")
return
logger.info(f"[定时任务] {report_type}生成完成,开始推送飞书...")
push_result = await feishu.send_report_to_all(result["title"], result["content"])
logger.info(f"[定时任务] {report_type}推送完成: {push_result}")
except Exception as e:
logger.error(f"[定时任务] {report_type}生成/推送失败: {e}", exc_info=True)
finally:
db.close()
async def daily_report_job():
await _run_report_job("日报")
async def weekly_report_job():
await _run_report_job("周报")
async def monthly_report_job():
await _run_report_job("月报")
def setup_scheduler():
"""配置并启动定时任务"""
# 日报:每天 20:00
scheduler.add_job(
daily_report_job, "cron",
hour=20, minute=0,
id="daily_report", replace_existing=True,
)
# 周报:每周五 20:00
scheduler.add_job(
weekly_report_job, "cron",
day_of_week="fri", hour=20, minute=0,
id="weekly_report", replace_existing=True,
)
# 月报每月1日 10:00
scheduler.add_job(
monthly_report_job, "cron",
day=1, hour=10, minute=0,
id="monthly_report", replace_existing=True,
)
scheduler.start()
logger.info(
"[定时任务] 已启动 — 日报:每天20:00 | 周报:周五20:00 | 月报:每月1日10:00"
)