AirGate/backend/utils/scheduler.py
seaislee1209 a455753fdc
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m35s
fix: scheduler now refreshes last 3 months billing
Volcengine bill data has 1-2 day delay. Previously the scheduler
only refreshed current month, so when the month rolls over the
previous month's incomplete snapshot was frozen. Now it refreshes
the current month plus 2 prior months.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 18:20:40 +08:00

221 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""定时消费监控任务 -- 多项目聚合 + 额度划拨制 + 阶梯式告警"""
import logging
from decimal import Decimal
from django.utils import timezone
logger = logging.getLogger(__name__)
_scheduler_started = False
def check_spending():
"""定时检查所有子账号消费:遍历开启监测的项目,聚合消费,触发阶梯告警"""
from apps.monitor.models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord
from utils.crypto import decrypt
from utils.billing_service import BillingService
from utils.iam_service import IAMService
from utils.feishu import send_feishu_alert
config = GlobalConfig.get_solo()
webhook = config.feishu_webhook_url
# 同时刷新最近 3 个月的账单(火山账单延迟 1-2 天,月初切换时上月账单还在补全)
now = timezone.now()
bill_periods = []
y, m = now.year, now.month
for _ in range(3):
bill_periods.append(f"{y:04d}-{m:02d}")
m -= 1
if m == 0:
m = 12
y -= 1
current_period = bill_periods[0]
logger.info(f"刷新账单期: {bill_periods}")
for volc_account in VolcAccount.objects.filter(is_active=True):
ak = decrypt(volc_account.access_key_enc)
sk = decrypt(volc_account.secret_key_enc)
if not ak or not sk:
logger.warning(f"主账号 {volc_account.name} 密钥为空,跳过")
continue
billing = BillingService(ak, sk)
iam_svc = IAMService(ak, sk)
# 批量查询最近 3 个月每个项目的消费
spending_by_period = {}
for period in bill_periods:
try:
spending_by_period[period] = billing.get_spending_all_projects(period)
except Exception as e:
logger.error(f"批量查询 {period} 消费失败: {e}")
spending_by_period[period] = {}
users = IAMUser.objects.filter(
volc_account=volc_account,
monitor_enabled=True,
).exclude(status=IAMUser.Status.DISABLED)
for user in users:
try:
# --- 遍历所有开启监测的项目,更新最近 3 个月的快照 ---
enabled_projects = IAMUserProject.objects.filter(
iam_user=user, monitor_enabled=True
)
if not enabled_projects.exists():
logger.info(f"用户 {user.username} 无开启监测的项目,跳过")
continue
for project in enabled_projects:
# 更新当月项目级消费(用于前端显示)
proj_current = spending_by_period.get(current_period, {}).get(
project.project_name, project.current_spending
)
project.current_spending = proj_current
project.save(update_fields=['current_spending'])
# 更新最近 3 个月每个月的快照
for period in bill_periods:
period_data = spending_by_period.get(period, {})
if project.project_name in period_data:
SpendingRecord.objects.update_or_create(
iam_user=user,
project_name=project.project_name,
bill_period=period,
defaults={'amount': period_data[project.project_name]},
)
# 更新子账号总消费
# 累计消费 = 所有月份的所有开启监测项目的消费之和
all_enabled_names = list(enabled_projects.values_list('project_name', flat=True))
from django.db.models import Sum
cumulative = SpendingRecord.objects.filter(
iam_user=user,
project_name__in=all_enabled_names,
).aggregate(total=Sum('amount'))['total'] or Decimal('0')
user.consumed_total = cumulative
user.spending_updated_at = timezone.now()
quota = user.allocated_quota
if not quota or quota <= 0:
user.save(update_fields=['consumed_total', 'spending_updated_at'])
continue
usage_percent = float(cumulative) / float(quota) * 100
triggered = user.triggered_alerts or []
# --- 阶梯式告警 ---
for step in user.get_alert_thresholds():
if usage_percent >= step and step not in triggered:
triggered.append(step)
threshold_amount = Decimal(str(quota)) * step / 100
# 构建项目明细
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.WARNING,
title=f"{user.username} 消费达到额度 {step}%",
content=(
f"累计消费 ¥{cumulative:.2f}"
f"已划拨额度 ¥{quota:.2f}{step}%\n"
f"剩余额度: ¥{user.remaining_quota:.2f}\n"
f"项目明细:\n{detail_lines}"
),
spending_amount=cumulative,
threshold_amount=threshold_amount,
notified=True,
)
send_feishu_alert(
webhook,
f"⚠️ {user.username} 消费达到额度 {step}%",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**剩余额度**: ¥{user.remaining_quota:.2f}\n"
f"**使用率**: {usage_percent:.1f}%\n"
f"**监测项目数**: {enabled_projects.count()}\n"
f"**项目明细**:\n{detail_lines}",
template="orange" if step < 90 else "red",
)
# --- 额度用尽,自动停用 ---
if (usage_percent >= 100
and user.auto_disable_enabled
and 100 not in triggered):
triggered.append(100)
try:
iam_svc.disable_user(user.username)
user.status = IAMUser.Status.DISABLED
except Exception as e:
logger.error(f"停用用户 {user.username} 失败: {e}")
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.DISABLE,
title=f"{user.username} 额度用尽,已自动停用",
content=(
f"累计消费 ¥{cumulative:.2f},已划拨额度 ¥{quota:.2f} 已用尽。\n"
f"项目明细:\n{detail_lines}\n"
f"如需继续使用,请划拨新额度后恢复账号。"
),
spending_amount=cumulative,
threshold_amount=quota,
notified=True,
)
send_feishu_alert(
webhook,
f"🚨 {user.username} 额度用尽,已自动停用",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**项目明细**:\n{detail_lines}\n"
f"额度已用尽,账号已自动停用。\n"
f"请在 AirGate 划拨新额度后恢复。",
template="red",
)
user.triggered_alerts = triggered
user.save(update_fields=[
'consumed_total', 'spending_updated_at',
'triggered_alerts', 'status',
])
except Exception as e:
logger.error(f"检查用户 {user.username} 消费失败: {e}")
def start_scheduler():
"""启动定时任务"""
global _scheduler_started
if _scheduler_started:
return
_scheduler_started = True
try:
from apscheduler.schedulers.background import BackgroundScheduler
from apps.monitor.models import GlobalConfig
scheduler = BackgroundScheduler()
config = GlobalConfig.get_solo()
interval = config.monitor_interval_seconds or 3600
scheduler.add_job(check_spending, 'interval', seconds=interval,
id='check_spending', replace_existing=True)
scheduler.start()
logger.info(f"消费监控定时任务已启动,间隔 {interval}")
except Exception as e:
logger.error(f"启动定时任务失败: {e}")