"""定时消费监控任务 -- 额度划拨制 + 阶梯式告警""" import logging from decimal import Decimal from django.utils import timezone logger = logging.getLogger(__name__) _scheduler_started = False def check_spending(): """定时检查所有子账号消费,对比已划拨额度触发阶梯告警""" from apps.monitor.models import VolcAccount, IAMUser, GlobalConfig, AlertRecord, SpendingRecord from utils.crypto import decrypt from utils.billing_service import BillingService from utils.iam_service import IAMService from utils.feishu import send_feishu_alert config = GlobalConfig.get_solo() webhook = config.feishu_webhook_url for volc_account in VolcAccount.objects.filter(is_active=True): ak = decrypt(volc_account.access_key_enc) sk = decrypt(volc_account.secret_key_enc) if not ak or not sk: logger.warning(f"主账号 {volc_account.name} 密钥为空,跳过") continue billing = BillingService(ak, sk) iam_svc = IAMService(ak, sk) users = IAMUser.objects.filter( volc_account=volc_account, monitor_enabled=True, ).exclude(status=IAMUser.Status.DISABLED) for user in users: try: # 查询当月消费(按项目筛选) bill_period = timezone.now().strftime("%Y-%m") spending = billing.get_spending_by_project( bill_period, user.project_name or None ) # 记录月度快照 SpendingRecord.objects.update_or_create( iam_user=user, bill_period=bill_period, defaults={'amount': spending}, ) # 累计消费 = 所有月份的消费之和 from django.db.models import Sum total = SpendingRecord.objects.filter( iam_user=user ).aggregate(total=Sum('amount'))['total'] or Decimal('0') user.consumed_total = total user.spending_updated_at = timezone.now() quota = user.allocated_quota if not quota or quota <= 0: user.save(update_fields=['consumed_total', 'spending_updated_at']) continue usage_percent = float(total) / float(quota) * 100 triggered = user.triggered_alerts or [] # --- 阶梯式告警 --- for step in user.get_alert_thresholds(): if usage_percent >= step and step not in triggered: triggered.append(step) threshold_amount = Decimal(str(quota)) * step / 100 AlertRecord.objects.create( iam_user=user, alert_type=AlertRecord.AlertType.WARNING, title=f"{user.username} 消费达到额度 {step}%", content=( f"累计消费 ¥{total:.2f}," f"已划拨额度 ¥{quota:.2f} 的 {step}%\n" f"剩余额度: ¥{user.remaining_quota:.2f}" ), spending_amount=total, threshold_amount=threshold_amount, notified=True, ) send_feishu_alert( webhook, f"⚠️ {user.username} 消费达到额度 {step}%", f"**用户**: {user.username}\n" f"**累计消费**: ¥{total:.2f}\n" f"**已划拨额度**: ¥{quota:.2f}\n" f"**剩余额度**: ¥{user.remaining_quota:.2f}\n" f"**使用率**: {usage_percent:.1f}%", template="orange" if step < 90 else "red", ) # --- 额度用尽,自动停用 --- if (usage_percent >= 100 and user.auto_disable_enabled and 100 not in triggered): triggered.append(100) try: iam_svc.disable_user(user.username) user.status = IAMUser.Status.DISABLED except Exception as e: logger.error(f"停用用户 {user.username} 失败: {e}") AlertRecord.objects.create( iam_user=user, alert_type=AlertRecord.AlertType.DISABLE, title=f"{user.username} 额度用尽,已自动停用", content=( f"累计消费 ¥{total:.2f},已划拨额度 ¥{quota:.2f} 已用尽。\n" f"如需继续使用,请划拨新额度后恢复账号。" ), spending_amount=total, threshold_amount=quota, notified=True, ) send_feishu_alert( webhook, f"🚨 {user.username} 额度用尽,已自动停用", f"**用户**: {user.username}\n" f"**累计消费**: ¥{total:.2f}\n" f"**已划拨额度**: ¥{quota:.2f}\n" f"额度已用尽,账号已自动停用。\n" f"请在 AirGate 划拨新额度后恢复。", template="red", ) user.triggered_alerts = triggered user.save(update_fields=[ 'consumed_total', 'spending_updated_at', 'triggered_alerts', 'status', ]) except Exception as e: logger.error(f"检查用户 {user.username} 消费失败: {e}") def start_scheduler(): """启动定时任务""" global _scheduler_started if _scheduler_started: return _scheduler_started = True try: from apscheduler.schedulers.background import BackgroundScheduler from apps.monitor.models import GlobalConfig scheduler = BackgroundScheduler() config = GlobalConfig.get_solo() interval = config.monitor_interval_seconds or 3600 scheduler.add_job(check_spending, 'interval', seconds=interval, id='check_spending', replace_existing=True) scheduler.start() logger.info(f"消费监控定时任务已启动,间隔 {interval} 秒") except Exception as e: logger.error(f"启动定时任务失败: {e}")