AirGate/backend/utils/scheduler.py
seaislee1209 610058ae5f feat: switch billing to ListSplitBillDetail for accurate project spending
- BillingService now uses ListSplitBillDetail (split bill) instead of
  ListBillDetail (bill detail) - the latter shows Project='-' for
  Seedance pay-as-you-go products
- Added get_spending_all_projects() for batch query (avoids N+1 API calls)
- Scheduler optimized: single API call fetches all project spending
- Verified: amounts match Volcengine console split bill page exactly
- Updated research report with billing API findings

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 20:42:08 +08:00

209 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""定时消费监控任务 -- 多项目聚合 + 额度划拨制 + 阶梯式告警"""
import logging
from decimal import Decimal
from django.utils import timezone
logger = logging.getLogger(__name__)
_scheduler_started = False
def check_spending():
"""定时检查所有子账号消费:遍历开启监测的项目,聚合消费,触发阶梯告警"""
from apps.monitor.models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord
from utils.crypto import decrypt
from utils.billing_service import BillingService
from utils.iam_service import IAMService
from utils.feishu import send_feishu_alert
config = GlobalConfig.get_solo()
webhook = config.feishu_webhook_url
bill_period = timezone.now().strftime("%Y-%m")
for volc_account in VolcAccount.objects.filter(is_active=True):
ak = decrypt(volc_account.access_key_enc)
sk = decrypt(volc_account.secret_key_enc)
if not ak or not sk:
logger.warning(f"主账号 {volc_account.name} 密钥为空,跳过")
continue
billing = BillingService(ak, sk)
iam_svc = IAMService(ak, sk)
# 一次性查询所有项目的消费(避免 N+1 API 调用)
try:
all_project_spending = billing.get_spending_all_projects(bill_period)
except Exception as e:
logger.error(f"批量查询消费失败: {e}")
all_project_spending = {}
users = IAMUser.objects.filter(
volc_account=volc_account,
monitor_enabled=True,
).exclude(status=IAMUser.Status.DISABLED)
for user in users:
try:
# --- 遍历所有开启监测的项目,从批量结果中获取消费 ---
enabled_projects = IAMUserProject.objects.filter(
iam_user=user, monitor_enabled=True
)
if not enabled_projects.exists():
logger.info(f"用户 {user.username} 无开启监测的项目,跳过")
continue
total_spending = Decimal('0')
for project in enabled_projects:
proj_spending = all_project_spending.get(
project.project_name, project.current_spending
)
# 更新项目级消费
project.current_spending = proj_spending
project.save(update_fields=['current_spending'])
# 记录项目级月度快照
SpendingRecord.objects.update_or_create(
iam_user=user,
project_name=project.project_name,
bill_period=bill_period,
defaults={'amount': proj_spending},
)
total_spending += proj_spending
# 更新子账号总消费
# 累计消费 = 所有月份的所有开启监测项目的消费之和
all_enabled_names = list(enabled_projects.values_list('project_name', flat=True))
from django.db.models import Sum
cumulative = SpendingRecord.objects.filter(
iam_user=user,
project_name__in=all_enabled_names,
).aggregate(total=Sum('amount'))['total'] or Decimal('0')
user.consumed_total = cumulative
user.spending_updated_at = timezone.now()
quota = user.allocated_quota
if not quota or quota <= 0:
user.save(update_fields=['consumed_total', 'spending_updated_at'])
continue
usage_percent = float(cumulative) / float(quota) * 100
triggered = user.triggered_alerts or []
# --- 阶梯式告警 ---
for step in user.get_alert_thresholds():
if usage_percent >= step and step not in triggered:
triggered.append(step)
threshold_amount = Decimal(str(quota)) * step / 100
# 构建项目明细
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.WARNING,
title=f"{user.username} 消费达到额度 {step}%",
content=(
f"累计消费 ¥{cumulative:.2f}"
f"已划拨额度 ¥{quota:.2f}{step}%\n"
f"剩余额度: ¥{user.remaining_quota:.2f}\n"
f"项目明细:\n{detail_lines}"
),
spending_amount=cumulative,
threshold_amount=threshold_amount,
notified=True,
)
send_feishu_alert(
webhook,
f"⚠️ {user.username} 消费达到额度 {step}%",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**剩余额度**: ¥{user.remaining_quota:.2f}\n"
f"**使用率**: {usage_percent:.1f}%\n"
f"**监测项目数**: {enabled_projects.count()}\n"
f"**项目明细**:\n{detail_lines}",
template="orange" if step < 90 else "red",
)
# --- 额度用尽,自动停用 ---
if (usage_percent >= 100
and user.auto_disable_enabled
and 100 not in triggered):
triggered.append(100)
try:
iam_svc.disable_user(user.username)
user.status = IAMUser.Status.DISABLED
except Exception as e:
logger.error(f"停用用户 {user.username} 失败: {e}")
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.DISABLE,
title=f"{user.username} 额度用尽,已自动停用",
content=(
f"累计消费 ¥{cumulative:.2f},已划拨额度 ¥{quota:.2f} 已用尽。\n"
f"项目明细:\n{detail_lines}\n"
f"如需继续使用,请划拨新额度后恢复账号。"
),
spending_amount=cumulative,
threshold_amount=quota,
notified=True,
)
send_feishu_alert(
webhook,
f"🚨 {user.username} 额度用尽,已自动停用",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**项目明细**:\n{detail_lines}\n"
f"额度已用尽,账号已自动停用。\n"
f"请在 AirGate 划拨新额度后恢复。",
template="red",
)
user.triggered_alerts = triggered
user.save(update_fields=[
'consumed_total', 'spending_updated_at',
'triggered_alerts', 'status',
])
except Exception as e:
logger.error(f"检查用户 {user.username} 消费失败: {e}")
def start_scheduler():
"""启动定时任务"""
global _scheduler_started
if _scheduler_started:
return
_scheduler_started = True
try:
from apscheduler.schedulers.background import BackgroundScheduler
from apps.monitor.models import GlobalConfig
scheduler = BackgroundScheduler()
config = GlobalConfig.get_solo()
interval = config.monitor_interval_seconds or 3600
scheduler.add_job(check_spending, 'interval', seconds=interval,
id='check_spending', replace_existing=True)
scheduler.start()
logger.info(f"消费监控定时任务已启动,间隔 {interval}")
except Exception as e:
logger.error(f"启动定时任务失败: {e}")