import uuid from django.contrib.auth.models import AbstractUser from django.db import models class Team(models.Model): """团队模型 — 额度管理的核心单位。""" name = models.CharField(max_length=100, unique=True, verbose_name='团队名称') total_seconds_pool = models.BigIntegerField(default=0, verbose_name='总额度池(秒)') total_seconds_used = models.FloatField(default=0, verbose_name='已消耗总秒数') monthly_seconds_limit = models.IntegerField(default=6000, verbose_name='每月消费上限(秒)') daily_member_limit_default = models.IntegerField(default=600, verbose_name='新成员默认每日限额(秒)') is_active = models.BooleanField(default=True, verbose_name='启用状态') expected_regions = models.CharField(max_length=500, blank=True, default='', verbose_name='预期登录城市(逗号分隔)') disabled_by = models.CharField(max_length=10, blank=True, default='', verbose_name='禁用来源') created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间') class Meta: verbose_name = '团队' verbose_name_plural = '团队' def __str__(self): return self.name @property def remaining_seconds(self): return self.total_seconds_pool - self.total_seconds_used class User(AbstractUser): """Extended user model — Phase 5: team-based quota.""" email = models.EmailField(unique=True, verbose_name='邮箱') team = models.ForeignKey( Team, on_delete=models.SET_NULL, null=True, blank=True, related_name='members', verbose_name='所属团队', ) is_team_admin = models.BooleanField(default=False, verbose_name='团队管理员') daily_seconds_limit = models.IntegerField(default=600, verbose_name='每日秒数上限') monthly_seconds_limit = models.IntegerField(default=6000, verbose_name='每月秒数上限') must_change_password = models.BooleanField(default=True, verbose_name='必须修改密码') disabled_by = models.CharField(max_length=10, blank=True, default='', verbose_name='禁用来源') created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间') class Meta: verbose_name = '用户' verbose_name_plural = '用户' def __str__(self): return self.username @property def role(self): if self.is_staff and self.team is None: return 'super_admin' if self.is_team_admin and self.team is not None: return 'team_admin' return 'member' class AdminAuditLog(models.Model): """管理员操作审计日志""" ACTION_CHOICES = [ ('team_create', '创建团队'), ('team_update', '更新团队'), ('team_topup', '团队充值'), ('team_set_pool', '设置团队额度池'), ('team_create_admin', '创建团队管理员'), ('user_create', '创建用户'), ('user_quota_update', '更新用户额度'), ('user_status_toggle', '切换用户状态'), ('settings_update', '更新系统设置'), ('member_create', '创建团队成员'), ('member_quota_update', '更新成员额度'), ('member_status_toggle', '切换成员状态'), ('user_password_reset', '重置用户密码'), ] operator = models.ForeignKey( User, on_delete=models.SET_NULL, null=True, related_name='audit_logs', verbose_name='操作人', ) operator_name = models.CharField(max_length=150, verbose_name='操作人用户名') action = models.CharField(max_length=30, choices=ACTION_CHOICES, verbose_name='操作类型') target_type = models.CharField(max_length=20, verbose_name='目标类型') target_id = models.IntegerField(null=True, blank=True, verbose_name='目标ID') target_name = models.CharField(max_length=200, blank=True, default='', verbose_name='目标名称') before = models.JSONField(null=True, blank=True, verbose_name='变更前') after = models.JSONField(null=True, blank=True, verbose_name='变更后') ip_address = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP地址') created_at = models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='操作时间') class Meta: verbose_name = '审计日志' verbose_name_plural = '审计日志' ordering = ['-created_at'] def __str__(self): return f'{self.operator_name} - {self.get_action_display()} - {self.target_name}' class ActiveSession(models.Model): """活跃会话 — 用于并发登录设备限制。""" DEVICE_TYPE_CHOICES = [ ('desktop', '桌面端'), ('mobile', '移动端'), ('unknown', '未知'), ] user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='active_sessions', verbose_name='用户') session_id = models.UUIDField(default=uuid.uuid4, unique=True, db_index=True, verbose_name='会话ID') device_type = models.CharField(max_length=10, choices=DEVICE_TYPE_CHOICES, default='unknown', verbose_name='设备类型') user_agent = models.TextField(blank=True, default='', verbose_name='User-Agent') created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') class Meta: verbose_name = '活跃会话' verbose_name_plural = '活跃会话' ordering = ['created_at'] def __str__(self): return f'{self.user.username} - {self.device_type} - {self.session_id}' class LoginRecord(models.Model): """登录记录 — 含 IP 归属地,供异常检测使用。""" user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='login_records', verbose_name='用户') team = models.ForeignKey(Team, on_delete=models.SET_NULL, null=True, blank=True, related_name='login_records', verbose_name='所属团队') ip_address = models.GenericIPAddressField(null=True, blank=True, verbose_name='IP地址') user_agent = models.TextField(blank=True, default='', verbose_name='User-Agent') geo_country = models.CharField(max_length=50, blank=True, default='', verbose_name='国家') geo_province = models.CharField(max_length=50, blank=True, default='', verbose_name='省份') geo_city = models.CharField(max_length=50, blank=True, default='', verbose_name='城市') geo_source = models.CharField(max_length=10, blank=True, default='', verbose_name='归属地来源') created_at = models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='登录时间') class Meta: verbose_name = '登录记录' verbose_name_plural = '登录记录' ordering = ['-created_at'] def __str__(self): return f'{self.user.username} - {self.ip_address} - {self.geo_city} - {self.created_at}' class TeamAnomalyConfig(models.Model): """团队级异常检测阈值配置 — 未设置的字段使用全局默认值。""" team = models.OneToOneField(Team, on_delete=models.CASCADE, related_name='anomaly_config', verbose_name='团队') r1_enabled = models.BooleanField(null=True, blank=True, verbose_name='R1 开关') r2_enabled = models.BooleanField(null=True, blank=True, verbose_name='R2 开关') r2_window_seconds = models.IntegerField(null=True, blank=True, verbose_name='R2 时间窗口(秒)') r3_enabled = models.BooleanField(null=True, blank=True, verbose_name='R3 开关') r3_window_seconds = models.IntegerField(null=True, blank=True, verbose_name='R3 时间窗口(秒)') r3_max_count = models.IntegerField(null=True, blank=True, verbose_name='R3 最大登录次数') r4_enabled = models.BooleanField(null=True, blank=True, verbose_name='R4 开关') r4_window_seconds = models.IntegerField(null=True, blank=True, verbose_name='R4 时间窗口(秒)') r4_city_count = models.IntegerField(null=True, blank=True, verbose_name='R4 预期外城市数阈值') r5_enabled = models.BooleanField(null=True, blank=True, verbose_name='R5 开关') r5_days = models.IntegerField(null=True, blank=True, verbose_name='R5 统计天数') r5_country_count = models.IntegerField(null=True, blank=True, verbose_name='R5 海外国家数阈值') class Meta: verbose_name = '团队异常检测配置' verbose_name_plural = '团队异常检测配置' def __str__(self): return f'{self.team.name} 异常检测配置' class LoginAnomaly(models.Model): """登录异常记录。""" LEVEL_CHOICES = [ ('warning', '警告'), ('critical', '严重'), ] RULE_CHOICES = [ ('region_mismatch', '登录地区不对'), ('impossible_travel', '不可能的旅行'), ('login_frequency', '登录太频繁'), ('multi_city', '团队遍地开花'), ('overseas_ip_diversity', '海外IP太杂'), ] team = models.ForeignKey(Team, on_delete=models.CASCADE, related_name='login_anomalies', verbose_name='团队') user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='login_anomalies', verbose_name='用户') login_record = models.ForeignKey(LoginRecord, on_delete=models.CASCADE, related_name='anomalies', verbose_name='触发登录记录') level = models.CharField(max_length=10, choices=LEVEL_CHOICES, verbose_name='严重程度') rule = models.CharField(max_length=30, choices=RULE_CHOICES, verbose_name='触发规则') detail = models.JSONField(default=dict, verbose_name='详情') alerted = models.BooleanField(default=False, verbose_name='已发告警') auto_disabled = models.BooleanField(default=False, verbose_name='已自动封禁') disabled_target = models.CharField(max_length=10, blank=True, default='', verbose_name='封禁对象') created_at = models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='创建时间') class Meta: verbose_name = '登录异常' verbose_name_plural = '登录异常' ordering = ['-created_at'] def __str__(self): return f'{self.team.name} - {self.get_rule_display()} - {self.get_level_display()}' def get_client_ip(request): """从请求中提取客户端 IP 地址。""" return request.META.get('HTTP_X_FORWARDED_FOR', '').split(',')[0].strip() or request.META.get('REMOTE_ADDR') def parse_device_type(user_agent): """根据 User-Agent 判断设备类型。""" ua_lower = (user_agent or '').lower() mobile_keywords = ['iphone', 'ipad', 'android', 'mobile', 'ipod', 'windows phone'] if any(kw in ua_lower for kw in mobile_keywords): return 'mobile' if ua_lower: return 'desktop' return 'unknown' def log_admin_action(request, action, target_type, target_id=None, target_name='', before=None, after=None): """记录管理员操作日志""" ip = get_client_ip(request) AdminAuditLog.objects.create( operator=request.user, operator_name=request.user.username, action=action, target_type=target_type, target_id=target_id, target_name=target_name, before=before, after=after, ip_address=ip, )