"""定时消费监控任务 -- 多项目聚合 + 额度划拨制 + 阶梯式告警""" import logging from decimal import Decimal from django.utils import timezone logger = logging.getLogger(__name__) _scheduler_started = False def check_spending(): """定时检查所有子账号消费:遍历开启监测的项目,聚合消费,触发阶梯告警""" from apps.monitor.models import VolcAccount, IAMUser, IAMUserProject, GlobalConfig, AlertRecord, SpendingRecord from utils.crypto import decrypt from utils.billing_service import BillingService from utils.iam_service import IAMService from utils.feishu import send_feishu_alert config = GlobalConfig.get_solo() webhook = config.feishu_webhook_url bill_period = timezone.now().strftime("%Y-%m") for volc_account in VolcAccount.objects.filter(is_active=True): ak = decrypt(volc_account.access_key_enc) sk = decrypt(volc_account.secret_key_enc) if not ak or not sk: logger.warning(f"主账号 {volc_account.name} 密钥为空,跳过") continue billing = BillingService(ak, sk) iam_svc = IAMService(ak, sk) users = IAMUser.objects.filter( volc_account=volc_account, monitor_enabled=True, ).exclude(status=IAMUser.Status.DISABLED) for user in users: try: # --- 遍历所有开启监测的项目,分别查询消费并累加 --- enabled_projects = IAMUserProject.objects.filter( iam_user=user, monitor_enabled=True ) if not enabled_projects.exists(): logger.info(f"用户 {user.username} 无开启监测的项目,跳过") continue total_spending = Decimal('0') for project in enabled_projects: try: proj_spending = billing.get_spending_by_project( bill_period, project.project_name ) except Exception as e: logger.error(f"查询项目 {project.project_name} 消费失败: {e}") proj_spending = project.current_spending # 保留上次值 # 更新项目级消费 project.current_spending = proj_spending project.save(update_fields=['current_spending']) # 记录项目级月度快照 SpendingRecord.objects.update_or_create( iam_user=user, project_name=project.project_name, bill_period=bill_period, defaults={'amount': proj_spending}, ) total_spending += proj_spending # 更新子账号总消费 # 累计消费 = 所有月份的所有开启监测项目的消费之和 all_enabled_names = list(enabled_projects.values_list('project_name', flat=True)) from django.db.models import Sum cumulative = SpendingRecord.objects.filter( iam_user=user, project_name__in=all_enabled_names, ).aggregate(total=Sum('amount'))['total'] or Decimal('0') user.consumed_total = cumulative user.spending_updated_at = timezone.now() quota = user.allocated_quota if not quota or quota <= 0: user.save(update_fields=['consumed_total', 'spending_updated_at']) continue usage_percent = float(cumulative) / float(quota) * 100 triggered = user.triggered_alerts or [] # --- 阶梯式告警 --- for step in user.get_alert_thresholds(): if usage_percent >= step and step not in triggered: triggered.append(step) threshold_amount = Decimal(str(quota)) * step / 100 # 构建项目明细 detail_lines = "\n".join( f" {p.project_name}: ¥{p.current_spending}" for p in enabled_projects ) AlertRecord.objects.create( iam_user=user, alert_type=AlertRecord.AlertType.WARNING, title=f"{user.username} 消费达到额度 {step}%", content=( f"累计消费 ¥{cumulative:.2f}," f"已划拨额度 ¥{quota:.2f} 的 {step}%\n" f"剩余额度: ¥{user.remaining_quota:.2f}\n" f"项目明细:\n{detail_lines}" ), spending_amount=cumulative, threshold_amount=threshold_amount, notified=True, ) send_feishu_alert( webhook, f"⚠️ {user.username} 消费达到额度 {step}%", f"**用户**: {user.username}\n" f"**累计消费**: ¥{cumulative:.2f}\n" f"**已划拨额度**: ¥{quota:.2f}\n" f"**剩余额度**: ¥{user.remaining_quota:.2f}\n" f"**使用率**: {usage_percent:.1f}%\n" f"**监测项目数**: {enabled_projects.count()}\n" f"**项目明细**:\n{detail_lines}", template="orange" if step < 90 else "red", ) # --- 额度用尽,自动停用 --- if (usage_percent >= 100 and user.auto_disable_enabled and 100 not in triggered): triggered.append(100) try: iam_svc.disable_user(user.username) user.status = IAMUser.Status.DISABLED except Exception as e: logger.error(f"停用用户 {user.username} 失败: {e}") detail_lines = "\n".join( f" {p.project_name}: ¥{p.current_spending}" for p in enabled_projects ) AlertRecord.objects.create( iam_user=user, alert_type=AlertRecord.AlertType.DISABLE, title=f"{user.username} 额度用尽,已自动停用", content=( f"累计消费 ¥{cumulative:.2f},已划拨额度 ¥{quota:.2f} 已用尽。\n" f"项目明细:\n{detail_lines}\n" f"如需继续使用,请划拨新额度后恢复账号。" ), spending_amount=cumulative, threshold_amount=quota, notified=True, ) send_feishu_alert( webhook, f"🚨 {user.username} 额度用尽,已自动停用", f"**用户**: {user.username}\n" f"**累计消费**: ¥{cumulative:.2f}\n" f"**已划拨额度**: ¥{quota:.2f}\n" f"**项目明细**:\n{detail_lines}\n" f"额度已用尽,账号已自动停用。\n" f"请在 AirGate 划拨新额度后恢复。", template="red", ) user.triggered_alerts = triggered user.save(update_fields=[ 'consumed_total', 'spending_updated_at', 'triggered_alerts', 'status', ]) except Exception as e: logger.error(f"检查用户 {user.username} 消费失败: {e}") def start_scheduler(): """启动定时任务""" global _scheduler_started if _scheduler_started: return _scheduler_started = True try: from apscheduler.schedulers.background import BackgroundScheduler from apps.monitor.models import GlobalConfig scheduler = BackgroundScheduler() config = GlobalConfig.get_solo() interval = config.monitor_interval_seconds or 3600 scheduler.add_job(check_spending, 'interval', seconds=interval, id='check_spending', replace_existing=True) scheduler.start() logger.info(f"消费监控定时任务已启动,间隔 {interval} 秒") except Exception as e: logger.error(f"启动定时任务失败: {e}")