AirGate/backend/utils/scheduler.py
seaislee1209 1e94241587 feat: multi-project per sub-account support
Data model:
- Add IAMUserProject model (sub-account → N projects, each with monitoring toggle)
- Remove old single project_name from IAMUser model
- Update SpendingRecord with per-project granularity

Backend:
- Project CRUD views: list/add/update-toggle/delete/toggle-all
- Create user view auto-adds first project if specified
- Scheduler aggregates spending across all enabled projects per user
- Per-project spending recorded in SpendingRecord + IAMUserProject.current_spending
- Alert details include per-project spending breakdown

Frontend:
- New "项目管理" dialog: add projects from Volcengine dropdown, toggle monitoring per project, remove projects, batch toggle all
- "项目" column in user table showing monitored/total count (clickable)
- BillingView: expandable rows showing per-project spending breakdown
- Create dialog: optional initial project selection
- Removed old single-project select from config dialog

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 20:37:38 +08:00

206 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""定时消费监控任务 -- 多项目聚合 + 额度划拨制 + 阶梯式告警"""
import logging
from decimal import Decimal
from django.utils import timezone
logger = logging.getLogger(__name__)
_scheduler_started = False
def check_spending():
"""定时检查所有子账号消费:遍历开启监测的项目,聚合消费,触发阶梯告警"""
from apps.monitor.models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord
from utils.crypto import decrypt
from utils.billing_service import BillingService
from utils.iam_service import IAMService
from utils.feishu import send_feishu_alert
config = GlobalConfig.get_solo()
webhook = config.feishu_webhook_url
bill_period = timezone.now().strftime("%Y-%m")
for volc_account in VolcAccount.objects.filter(is_active=True):
ak = decrypt(volc_account.access_key_enc)
sk = decrypt(volc_account.secret_key_enc)
if not ak or not sk:
logger.warning(f"主账号 {volc_account.name} 密钥为空,跳过")
continue
billing = BillingService(ak, sk)
iam_svc = IAMService(ak, sk)
users = IAMUser.objects.filter(
volc_account=volc_account,
monitor_enabled=True,
).exclude(status=IAMUser.Status.DISABLED)
for user in users:
try:
# --- 遍历所有开启监测的项目,分别查询消费并累加 ---
enabled_projects = IAMUserProject.objects.filter(
iam_user=user, monitor_enabled=True
)
if not enabled_projects.exists():
logger.info(f"用户 {user.username} 无开启监测的项目,跳过")
continue
total_spending = Decimal('0')
for project in enabled_projects:
try:
proj_spending = billing.get_spending_by_project(
bill_period, project.project_name
)
except Exception as e:
logger.error(f"查询项目 {project.project_name} 消费失败: {e}")
proj_spending = project.current_spending # 保留上次值
# 更新项目级消费
project.current_spending = proj_spending
project.save(update_fields=['current_spending'])
# 记录项目级月度快照
SpendingRecord.objects.update_or_create(
iam_user=user,
project_name=project.project_name,
bill_period=bill_period,
defaults={'amount': proj_spending},
)
total_spending += proj_spending
# 更新子账号总消费
# 累计消费 = 所有月份的所有开启监测项目的消费之和
all_enabled_names = list(enabled_projects.values_list('project_name', flat=True))
from django.db.models import Sum
cumulative = SpendingRecord.objects.filter(
iam_user=user,
project_name__in=all_enabled_names,
).aggregate(total=Sum('amount'))['total'] or Decimal('0')
user.consumed_total = cumulative
user.spending_updated_at = timezone.now()
quota = user.allocated_quota
if not quota or quota <= 0:
user.save(update_fields=['consumed_total', 'spending_updated_at'])
continue
usage_percent = float(cumulative) / float(quota) * 100
triggered = user.triggered_alerts or []
# --- 阶梯式告警 ---
for step in user.get_alert_thresholds():
if usage_percent >= step and step not in triggered:
triggered.append(step)
threshold_amount = Decimal(str(quota)) * step / 100
# 构建项目明细
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.WARNING,
title=f"{user.username} 消费达到额度 {step}%",
content=(
f"累计消费 ¥{cumulative:.2f}"
f"已划拨额度 ¥{quota:.2f}{step}%\n"
f"剩余额度: ¥{user.remaining_quota:.2f}\n"
f"项目明细:\n{detail_lines}"
),
spending_amount=cumulative,
threshold_amount=threshold_amount,
notified=True,
)
send_feishu_alert(
webhook,
f"⚠️ {user.username} 消费达到额度 {step}%",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**剩余额度**: ¥{user.remaining_quota:.2f}\n"
f"**使用率**: {usage_percent:.1f}%\n"
f"**监测项目数**: {enabled_projects.count()}\n"
f"**项目明细**:\n{detail_lines}",
template="orange" if step < 90 else "red",
)
# --- 额度用尽,自动停用 ---
if (usage_percent >= 100
and user.auto_disable_enabled
and 100 not in triggered):
triggered.append(100)
try:
iam_svc.disable_user(user.username)
user.status = IAMUser.Status.DISABLED
except Exception as e:
logger.error(f"停用用户 {user.username} 失败: {e}")
detail_lines = "\n".join(
f" {p.project_name}: ¥{p.current_spending}"
for p in enabled_projects
)
AlertRecord.objects.create(
iam_user=user,
alert_type=AlertRecord.AlertType.DISABLE,
title=f"{user.username} 额度用尽,已自动停用",
content=(
f"累计消费 ¥{cumulative:.2f},已划拨额度 ¥{quota:.2f} 已用尽。\n"
f"项目明细:\n{detail_lines}\n"
f"如需继续使用,请划拨新额度后恢复账号。"
),
spending_amount=cumulative,
threshold_amount=quota,
notified=True,
)
send_feishu_alert(
webhook,
f"🚨 {user.username} 额度用尽,已自动停用",
f"**用户**: {user.username}\n"
f"**累计消费**: ¥{cumulative:.2f}\n"
f"**已划拨额度**: ¥{quota:.2f}\n"
f"**项目明细**:\n{detail_lines}\n"
f"额度已用尽,账号已自动停用。\n"
f"请在 AirGate 划拨新额度后恢复。",
template="red",
)
user.triggered_alerts = triggered
user.save(update_fields=[
'consumed_total', 'spending_updated_at',
'triggered_alerts', 'status',
])
except Exception as e:
logger.error(f"检查用户 {user.username} 消费失败: {e}")
def start_scheduler():
"""启动定时任务"""
global _scheduler_started
if _scheduler_started:
return
_scheduler_started = True
try:
from apscheduler.schedulers.background import BackgroundScheduler
from apps.monitor.models import GlobalConfig
scheduler = BackgroundScheduler()
config = GlobalConfig.get_solo()
interval = config.monitor_interval_seconds or 3600
scheduler.add_job(check_spending, 'interval', seconds=interval,
id='check_spending', replace_existing=True)
scheduler.start()
logger.info(f"消费监控定时任务已启动,间隔 {interval}")
except Exception as e:
logger.error(f"启动定时任务失败: {e}")