From f26e78c545a35d8a95648eb39410c738174b791a Mon Sep 17 00:00:00 2001 From: pmc <740076875@qq.com> Date: Thu, 14 May 2026 09:35:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(affinity-P2):=20service=20=E5=B1=82?= =?UTF-8?q?=E8=90=BD=E5=9C=B0=20=E2=80=94=20=E5=94=AF=E4=B8=80=E5=86=99?= =?UTF-8?q?=E5=85=A5=E5=85=A5=E5=8F=A3=20+=20Redis=20=E8=AE=A1=E6=95=B0?= =?UTF-8?q?=E5=99=A8=20+=20=E7=AD=89=E7=BA=A7=E6=98=A0=E5=B0=84=20+=20?= =?UTF-8?q?=E8=B7=A8=E7=BA=A7=E5=A5=96=E5=8A=B1=20+=20WS=20=E6=8E=A8?= =?UTF-8?q?=E9=80=81=20(P2-01~P2-05)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口: - counters.py (P2-02):Redis 三类计数器 - affinity:cd:{device_id}:{rule_key} 冷却 - affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限 - affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限 - 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算 - cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h - event_id 60s 去重防客户端重复上报 - levels.py (P2-03):等级映射 - map_value_to_level / update_device_level / progress_to_next_level - update_device_level 仅 level 变化时 save(update_fields=['affinity_level']) - ws.py (P2-05):WebSocket 推送 helper - 3 类事件 affinity_update / level_up / level_down - asgiref.async_to_sync 包装 channel_layer.group_send - 推送故障 fire-and-forget 仅日志记录,不阻塞主流程 - rewards.py (P2-04):跨级奖励发放(A3 方案 B) - grant_levels(user_device, from_level, to_level) 逐级独立事务 - UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发) - _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现 - services.py (P2-01):AffinityService 主入口 - apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason) - 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送] - admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS - 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error) - permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查 Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。 AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。 设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。 Co-Authored-By: Claude Opus 4.7 --- qy_lty/userapp/affinity/counters.py | 147 +++++++++ qy_lty/userapp/affinity/levels.py | 87 +++++ qy_lty/userapp/affinity/permissions.py | 16 + qy_lty/userapp/affinity/rewards.py | 131 ++++++++ qy_lty/userapp/affinity/services.py | 428 +++++++++++++++++++++++++ qy_lty/userapp/affinity/ws.py | 89 +++++ 6 files changed, 898 insertions(+) create mode 100644 qy_lty/userapp/affinity/counters.py create mode 100644 qy_lty/userapp/affinity/levels.py create mode 100644 qy_lty/userapp/affinity/permissions.py create mode 100644 qy_lty/userapp/affinity/rewards.py create mode 100644 qy_lty/userapp/affinity/services.py create mode 100644 qy_lty/userapp/affinity/ws.py diff --git a/qy_lty/userapp/affinity/counters.py b/qy_lty/userapp/affinity/counters.py new file mode 100644 index 0000000..e5747f6 --- /dev/null +++ b/qy_lty/userapp/affinity/counters.py @@ -0,0 +1,147 @@ +"""P2-02 Redis 计数器工具 + +封装三类计数器与冷却判断,统一用 django-redis (`django.core.cache`) 接入。 + +Key 命名约定(与设计文档 §4.3 触发流程一致): + affinity:cd:{device_id}:{rule_key} — 冷却(值任意,存在即冷却中) + affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} — 单规则单设备日累计(绝对值) + affinity:daily:{device_id}:_global:{YYYYMMDD} — 全局正向日累计(仅正向规则) + +自然日基准:`AffinitySetting.timezone`(默认 Asia/Shanghai),全用户统一。 +TTL 策略:日计数器 TTL=48h(远超 1 自然日边界,老 key 自然过期);冷却 TTL=规则 cooldown_seconds。 +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from django.core.cache import cache +from django.utils import timezone + +try: + from zoneinfo import ZoneInfo # Python 3.9+ +except ImportError: # pragma: no cover + from backports.zoneinfo import ZoneInfo # type: ignore + +# 日计数器 TTL — 48 小时足以覆盖最长时区偏移和单次"跨天延迟"场景 +DAILY_COUNTER_TTL = 48 * 3600 + + +# ---------- 时区与日期 ---------- + +def get_setting_timezone() -> str: + """读取 AffinitySetting.timezone(容错:表不存在或单例缺失时回退 Asia/Shanghai)""" + try: + from userapp.models import AffinitySetting + return AffinitySetting.get_solo().timezone or 'Asia/Shanghai' + except Exception: # AppConfig 启动期 / 测试中表未建好等场景 + return 'Asia/Shanghai' + + +def local_today_str(tz_name: Optional[str] = None) -> str: + """返回基于 AffinitySetting.timezone 的"今日"日期字符串 YYYYMMDD""" + tz_name = tz_name or get_setting_timezone() + now_utc = timezone.now() + local_now = now_utc.astimezone(ZoneInfo(tz_name)) + return local_now.strftime('%Y%m%d') + + +def local_today_date(tz_name: Optional[str] = None): + """返回 AffinitySetting.timezone 的"今日" date 对象(供 UserAffinityDailyCounter.date 用)""" + tz_name = tz_name or get_setting_timezone() + now_utc = timezone.now() + return now_utc.astimezone(ZoneInfo(tz_name)).date() + + +# ---------- 冷却 ---------- + +def cooldown_key(device_id: int, rule_key: str) -> str: + return f'affinity:cd:{device_id}:{rule_key}' + + +def is_in_cooldown(device_id: int, rule_key: str) -> bool: + """返回 True 表示尚未过冷却,本次触发应拒绝""" + return cache.get(cooldown_key(device_id, rule_key)) is not None + + +def set_cooldown(device_id: int, rule_key: str, seconds: int) -> None: + """设置冷却。seconds<=0 时不做任何事(视为无冷却规则)""" + if seconds <= 0: + return + cache.set(cooldown_key(device_id, rule_key), 1, timeout=seconds) + + +# ---------- 单规则日计数器 ---------- + +def rule_daily_key(device_id: int, rule_key: str, date_str: Optional[str] = None) -> str: + return f'affinity:daily:{device_id}:{rule_key}:{date_str or local_today_str()}' + + +def get_rule_daily(device_id: int, rule_key: str) -> int: + """读取本规则本设备今日累计(绝对值累加,不区分正负)""" + val = cache.get(rule_daily_key(device_id, rule_key)) + return int(val) if val is not None else 0 + + +def incr_rule_daily(device_id: int, rule_key: str, delta_abs: int) -> int: + """原子累加本规则日计数器,返回累加后的值。delta_abs 必须 > 0。 + + 首次写入时通过 cache.add 设置 TTL;之后用 cache.incr 原子累加,TTL 不变。 + """ + if delta_abs <= 0: + raise ValueError(f'delta_abs must be > 0, got {delta_abs}') + key = rule_daily_key(device_id, rule_key) + if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL): + return delta_abs + try: + return cache.incr(key, delta_abs) + except ValueError: + # cache.add 写入后 race-condition 失败极端情况兜底 + cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL) + return delta_abs + + +# ---------- 全局日计数器(仅正向汇总)---------- + +def global_daily_key(device_id: int, date_str: Optional[str] = None) -> str: + return f'affinity:daily:{device_id}:_global:{date_str or local_today_str()}' + + +def get_global_daily(device_id: int) -> int: + """读取本设备今日全局正向好感度累计(跨规则汇总)""" + val = cache.get(global_daily_key(device_id)) + return int(val) if val is not None else 0 + + +def incr_global_daily(device_id: int, delta_abs: int) -> int: + """原子累加全局日计数器(仅在正向变化时调用)。返回累加后的值。""" + if delta_abs <= 0: + raise ValueError(f'delta_abs must be > 0, got {delta_abs}') + key = global_daily_key(device_id) + if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL): + return delta_abs + try: + return cache.incr(key, delta_abs) + except ValueError: + cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL) + return delta_abs + + +# ---------- 事件去重(event_id 60s 缓存)---------- + +def event_seen_key(event_id: str) -> str: + return f'affinity:event:{event_id}' + + +def event_already_processed(event_id: str) -> bool: + """若 event_id 已在 60s 内处理过,返回 True""" + if not event_id: + return False + return cache.get(event_seen_key(event_id)) is not None + + +def mark_event_processed(event_id: str, ttl_seconds: int = 60) -> None: + if not event_id: + return + cache.set(event_seen_key(event_id), 1, timeout=ttl_seconds) diff --git a/qy_lty/userapp/affinity/levels.py b/qy_lty/userapp/affinity/levels.py new file mode 100644 index 0000000..b12c76c --- /dev/null +++ b/qy_lty/userapp/affinity/levels.py @@ -0,0 +1,87 @@ +"""P2-03 等级映射 + UserDevice.affinity_level 缓存更新 + +根据好感度数值映射到对应等级(AffinityLevel),并把结果写回 UserDevice.affinity_level +作为缓存。 + +设计依据:「好感度系统功能与规则设计.md」§6.3 等级变化规则 +- 等级由好感度区间自动映射,每台设备独立判定 +- 跨级判定:每次好感度变动后,取当前值所属区间,与上一次等级比较 +""" + +from __future__ import annotations + +from typing import Optional, Tuple + +from userapp.models import AffinityLevel + + +def map_value_to_level(value: int) -> Optional[AffinityLevel]: + """根据好感度数值找出所属的 AffinityLevel。 + + 匹配规则:min_affinity <= value <= max_affinity 且 is_enabled=True 且 is_deleted=False + 返回最高 level 优先(避免重叠区间时的歧义,但 P1 已加 clean 校验拦截重叠)。 + 若没有匹配到任何区间则返回 None(理论上不应发生,因为等级区间应覆盖 [0, max_affinity])。 + """ + return ( + AffinityLevel.objects + .filter( + min_affinity__lte=value, + max_affinity__gte=value, + is_enabled=True, + is_deleted=False, + ) + .order_by('-level') + .first() + ) + + +def progress_to_next_level(value: int, current_level: AffinityLevel) -> dict: + """计算当前值在本等级区间内的进度百分比 + 到下一等级的距离。 + + 返回: + { + 'percent': 0~100 浮点(当前值在本等级区间内的位置百分比), + 'next_level': AffinityLevel 或 None, + 'points_to_next': int 或 None, + } + """ + span = max(current_level.max_affinity - current_level.min_affinity, 1) + percent = round((value - current_level.min_affinity) / span * 100, 2) + + next_level = ( + AffinityLevel.objects + .filter(level__gt=current_level.level, is_enabled=True, is_deleted=False) + .order_by('level') + .first() + ) + points_to_next = None + if next_level is not None: + points_to_next = max(next_level.min_affinity - value, 0) + + return { + 'percent': max(0.0, min(100.0, percent)), + 'next_level': next_level, + 'points_to_next': points_to_next, + } + + +def update_device_level(user_device, save: bool = True) -> Tuple[int, int, Optional[AffinityLevel]]: + """根据 user_device.favorability 重新计算并更新 affinity_level 缓存字段。 + + 返回 (old_level, new_level, matched_level_obj) + 若没匹配到任何 AffinityLevel,new_level 保持原值,matched_level_obj=None。 + + save=False 时不调 .save(),由调用方批量保存(service 层)。 + """ + old_level = user_device.affinity_level + matched = map_value_to_level(user_device.favorability) + if matched is None: + return old_level, old_level, None + + new_level = matched.level + if new_level != old_level: + user_device.affinity_level = new_level + if save: + user_device.save(update_fields=['affinity_level']) + + return old_level, new_level, matched diff --git a/qy_lty/userapp/affinity/permissions.py b/qy_lty/userapp/affinity/permissions.py new file mode 100644 index 0000000..c1c5045 --- /dev/null +++ b/qy_lty/userapp/affinity/permissions.py @@ -0,0 +1,16 @@ +"""Admin 接口权限:要求已登录 + is_staff""" + +from rest_framework.permissions import IsAuthenticated + + +class IsAdminUserStaff(IsAuthenticated): + """已登录用户 + is_staff=True。 + + 沿用项目既有 ViewSet 的 get_permissions() 中 `request.user.is_staff` 检查惯例, + 封装成可复用的 permission 类。 + """ + + def has_permission(self, request, view): + if not super().has_permission(request, view): + return False + return bool(request.user and request.user.is_authenticated and request.user.is_staff) diff --git a/qy_lty/userapp/affinity/rewards.py b/qy_lty/userapp/affinity/rewards.py new file mode 100644 index 0000000..740ec0e --- /dev/null +++ b/qy_lty/userapp/affinity/rewards.py @@ -0,0 +1,131 @@ +"""P2-04 跨级奖励发放 + +A3 决策(方案 B):升级时**逐级独立事务**发放奖励,失败的项收集起来供调用方 +后续重试/补偿,**已成功的级别不会因后续级别失败而回滚**(这是与方案 A 整体事务 +的最大区别)。 + +发放幂等通过 UserLevelRewardGrant(device, level) 唯一约束保证: +- 重复跨过同一等级不会重发(设计文档决策 11) +- 衰减回升后再升过同等级也不重发(决策 11) + +外部副作用(虚拟货币 +、道具发放)由 _dispatch_reward_to_external_systems 占位 hook +实现 — P2 不接外部系统,先记 reward_snapshot 落库;后续接订阅/卡片/道具 app 时 +在该 hook 中调用。 +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from django.db import IntegrityError, transaction + +from userapp.models import AffinityLevel, AffinitySetting, UserLevelRewardGrant + +logger = logging.getLogger(__name__) + + +@dataclass +class RewardGrantResult: + """单次跨级奖励发放的汇总结果""" + granted: List[Dict[str, Any]] = field(default_factory=list) # 本次新发放的奖励快照列表 + skipped_duplicate: List[int] = field(default_factory=list) # 已发过被跳过的等级号 + failed: List[Dict[str, Any]] = field(default_factory=list) # 发放失败的等级(含错误信息) + + +def _build_reward_snapshot(level_obj: AffinityLevel) -> Dict[str, Any]: + """把 AffinityLevel 当前奖励配置打成快照(防 admin 后续修改影响审计)""" + return { + 'level': level_obj.level, + 'name': level_obj.name, + 'reward_type': level_obj.reward_type, + 'reward_currency': level_obj.reward_currency, + 'reward_items': list(level_obj.reward_items or []), + 'unlock_content': level_obj.unlock_content, + } + + +def _dispatch_reward_to_external_systems(user_device, snapshot: Dict[str, Any]) -> None: + """实际把奖励发放到外部系统的 hook(虚拟货币、道具、解锁标记等)。 + + P2 阶段不接外部系统 — 仅日志记录。 + P3/P4 阶段接订阅 / 卡片 / 道具 app 时在此实现具体派发逻辑。 + """ + if snapshot['reward_currency'] > 0: + logger.info( + '[affinity.rewards] [STUB] 应给设备 %s 发放虚拟货币 %d', + user_device.id, snapshot['reward_currency'], + ) + if snapshot['reward_items']: + logger.info( + '[affinity.rewards] [STUB] 应给设备 %s 发放道具 %s', + user_device.id, snapshot['reward_items'], + ) + + +def grant_levels(user_device, from_level: int, to_level: int) -> RewardGrantResult: + """逐级独立事务发放 [from_level+1, to_level] 范围内的所有等级奖励。 + + 设计文档决策 3:升级时逐级发放经过的每一级;降级回升后不再补发(永久幂等)。 + + 参数: + user_device: UserDevice 实例 + from_level: 升级前等级 + to_level: 升级后等级(需 > from_level 才会发奖励) + 返回: + RewardGrantResult,调用方可基于此推送 WS、记日志、入重试队列等 + """ + result = RewardGrantResult() + if to_level <= from_level: + return result + + # AffinitySetting.enable_rewards=False 则只记录跨级日志,不发奖励 + if not AffinitySetting.get_solo().enable_rewards: + logger.info('[affinity.rewards] enable_rewards=False,跨级 %s→%s 不发奖励', + from_level, to_level) + return result + + # 取所有需要发放的等级(按 level 升序) + levels_to_grant = list( + AffinityLevel.objects + .filter(level__gt=from_level, level__lte=to_level, + is_enabled=True, is_deleted=False) + .order_by('level') + ) + + for level_obj in levels_to_grant: + try: + with transaction.atomic(): + snapshot = _build_reward_snapshot(level_obj) + # UniqueConstraint(device, level) 保证同一设备同一等级最多一行 + _, created = UserLevelRewardGrant.objects.get_or_create( + device=user_device, + level=level_obj.level, + defaults={ + 'device_snapshot_id': user_device.id, + 'reward_snapshot': snapshot, + }, + ) + if not created: + # 决策 11:曾经达到过就标记,不再重发 + result.skipped_duplicate.append(level_obj.level) + continue + + # 派发到外部系统(P2 是 stub)— 若 hook 抛异常会触发本 level 事务回滚 + _dispatch_reward_to_external_systems(user_device, snapshot) + result.granted.append(snapshot) + + except IntegrityError as exc: + # 并发场景下 UniqueConstraint 命中 — 等价于 already granted + result.skipped_duplicate.append(level_obj.level) + logger.info('[affinity.rewards] Lv%s 已被并发线程发放:%s', level_obj.level, exc) + except Exception as exc: + logger.exception('[affinity.rewards] Lv%s 发放失败:%s', level_obj.level, exc) + result.failed.append({ + 'level': level_obj.level, + 'error': str(exc), + 'snapshot': _build_reward_snapshot(level_obj), + }) + + return result diff --git a/qy_lty/userapp/affinity/services.py b/qy_lty/userapp/affinity/services.py new file mode 100644 index 0000000..f62c557 --- /dev/null +++ b/qy_lty/userapp/affinity/services.py @@ -0,0 +1,428 @@ +"""P2-01 好感度系统服务层 — 唯一写入入口 + +所有好感度变化(设备事件、手机事件、衰减任务、管理员调整)**必须**经由 +`AffinityService.apply()` 处理,确保单一冷却 / 日上限 / 钳位 / 日志 / 等级 +更新 / 奖励派发 / WS 推送的语义闭环。 + +设计文档:§4.3 触发计算流程 +""" + +from __future__ import annotations + +import logging +import random +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, Tuple + +from django.db import transaction +from django.db.models import F + +from device_interaction.models import UserDevice +from userapp.models import ( + AffinityLog, + AffinityRule, + AffinitySetting, + UserAffinityDailyCounter, +) + +from . import counters as redis_counters +from . import levels as level_utils +from . import rewards as reward_utils +from . import ws + +logger = logging.getLogger(__name__) + + +# ---------- 返回类型 ---------- + +class ApplyOutcome: + """ApplyResult.outcome 的常量集合""" + APPLIED = 'applied' # 成功写入 + NOOP_NO_RULE = 'noop_no_rule' # 规则不存在或已禁用 / 软删 + NOOP_COOLDOWN = 'noop_cooldown' # 冷却中 + NOOP_RULE_DAILY_CAP = 'noop_rule_daily_cap' # 本规则今日已达上限 + NOOP_GLOBAL_DAILY_CAP = 'noop_global_daily_cap' # 全局今日已达上限 + NOOP_EVENT_DUP = 'noop_event_duplicate' # 同 event_id 60s 内已处理 + NOOP_VALUE_BOUNDARY = 'noop_value_boundary' # 当前已在边界(0 还想扣 / 上限还想加) + ERROR = 'error' # 异常 + + +@dataclass +class ApplyResult: + outcome: str + change_value: int = 0 + before_value: int = 0 + after_value: int = 0 + rule_key: str = '' + old_level: int = 0 + new_level: int = 0 + log_id: Optional[int] = None + rewards_granted: list = field(default_factory=list) + rewards_failed: list = field(default_factory=list) + error: Optional[str] = None + + @property + def is_applied(self) -> bool: + return self.outcome == ApplyOutcome.APPLIED + + +# ---------- 主入口 ---------- + +class AffinityService: + """好感度系统服务层""" + + @classmethod + def apply( + cls, + *, + user_id: int, + device_id: int, + rule_key: str, + source: str, + event_id: str = '', + metadata: Optional[Dict[str, Any]] = None, + operator_admin_id: Optional[int] = None, + reason: str = '', + ) -> ApplyResult: + """好感度变化的唯一写入入口。 + + 参数: + user_id: ParadiseUser.id(用于 AffinityLog.user 关联及 WS group 路由) + device_id: UserDevice.id(注意是绑定 ID,不是 Device.id) + rule_key: AffinityRule.rule_key(如 'chat' / 'gift' / 'decay' / 'admin') + source: AffinityLog.SOURCE_CHOICES 之一 + event_id: 客户端事件 UUID,用于 60s 内幂等去重 + metadata: 扩展上下文 JSON + operator_admin_id / reason: 管理员调整时填写 + + 返回 ApplyResult,调用方根据 outcome 决定后续动作。 + """ + # 0) event_id 幂等防重 + if event_id and redis_counters.event_already_processed(event_id): + return ApplyResult( + outcome=ApplyOutcome.NOOP_EVENT_DUP, + rule_key=rule_key, + ) + + # 1) 取规则 + rule = ( + AffinityRule.objects + .filter(rule_key=rule_key, is_enabled=True, is_deleted=False) + .first() + ) + if rule is None: + return ApplyResult(outcome=ApplyOutcome.NOOP_NO_RULE, rule_key=rule_key) + + # 2) 冷却检查 + if redis_counters.is_in_cooldown(device_id, rule_key): + return ApplyResult(outcome=ApplyOutcome.NOOP_COOLDOWN, rule_key=rule_key) + + # 3) 取设备(要求绑定有效) + try: + user_device = UserDevice.active.get(id=device_id, user_id=user_id) + except UserDevice.DoesNotExist: + logger.warning( + '[affinity.service] device %s 不属于 user %s 或已解绑,跳过', + device_id, user_id, + ) + return ApplyResult( + outcome=ApplyOutcome.ERROR, + rule_key=rule_key, + error='UserDevice not found or unbound', + ) + + # 4) 计算本次变化值([min, max] 闭区间随机;P1 CHECK 保证 min <= max) + raw_change = random.randint(rule.min_change, rule.max_change) + # single_cap 钳位(保护性 — P1 CHECK 已保证 single_cap > 0) + if raw_change > rule.single_cap: + raw_change = rule.single_cap + elif raw_change < -rule.single_cap: + raw_change = -rule.single_cap + + if raw_change == 0: + return ApplyResult( + outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY, + rule_key=rule_key, + change_value=0, + ) + + # 5) 本规则日上限检查(用绝对值累加) + abs_change = abs(raw_change) + rule_today = redis_counters.get_rule_daily(device_id, rule_key) + if rule_today + abs_change > rule.daily_cap: + # 允许部分通过:剩余空间 > 0 时按剩余空间钳位 + remain = rule.daily_cap - rule_today + if remain <= 0: + return ApplyResult( + outcome=ApplyOutcome.NOOP_RULE_DAILY_CAP, + rule_key=rule_key, + ) + # 按正负方向取剩余空间 + raw_change = remain if raw_change > 0 else -remain + abs_change = remain + + # 6) 全局日上限检查(仅正向汇总,衰减不占用) + setting = AffinitySetting.get_solo() + if raw_change > 0: + global_today = redis_counters.get_global_daily(device_id) + if global_today + abs_change > setting.global_daily_cap: + remain = setting.global_daily_cap - global_today + if remain <= 0: + return ApplyResult( + outcome=ApplyOutcome.NOOP_GLOBAL_DAILY_CAP, + rule_key=rule_key, + ) + raw_change = remain + abs_change = remain + + # 7) 原子更新 UserDevice.favorability + 钳位 + try: + with transaction.atomic(): + # 锁住该 UserDevice 行,避免并发写竞争 + ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id) + before_value = ud_locked.favorability + after_value = before_value + raw_change + # 钳位 [0, max_affinity](管理员调整也走此分支,因此天然遵守 CR-001 决策 6) + after_value = max(0, min(setting.max_affinity, after_value)) + actual_change = after_value - before_value + + if actual_change == 0: + # 边界场景:当前值已经在边界,本次实际无变化 + return ApplyResult( + outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY, + rule_key=rule_key, + before_value=before_value, + after_value=after_value, + ) + + ud_locked.favorability = after_value + # 同步刷新 last_active_at(衰减判断依赖) + from django.utils import timezone as djtz + ud_locked.last_active_at = djtz.now() + ud_locked.save(update_fields=['favorability', 'last_active_at']) + + # 8) 写 AffinityLog(含 cross-app FK 与冗余 rule_key) + log = AffinityLog.objects.create( + user_id=user_id, + device=ud_locked, + rule=rule, + rule_key=rule_key, + change_value=actual_change, + before_value=before_value, + after_value=after_value, + source=source, + event_id=event_id or None, # WR-004:空 -> NULL + operator_admin_id=operator_admin_id, + reason=reason, + metadata=metadata or {}, + ) + + # 9) 更新数据库兜底计数器(Redis 是热路径,DB 是审计兜底) + today_date = redis_counters.local_today_date(setting.timezone) + counter, _ = UserAffinityDailyCounter.objects.select_for_update().get_or_create( + device=ud_locked, rule=rule, date=today_date, + defaults={'accumulated_change': 0, 'trigger_count': 0}, + ) + counter.accumulated_change = F('accumulated_change') + actual_change + counter.trigger_count = F('trigger_count') + 1 + counter.save(update_fields=['accumulated_change', 'trigger_count']) + + # 10) 更新等级缓存(事务内做,避免半成品状态) + old_level, new_level, _matched = level_utils.update_device_level( + ud_locked, save=True, + ) + + except Exception as exc: + logger.exception( + '[affinity.service] apply 失败 user=%s device=%s rule=%s: %s', + user_id, device_id, rule_key, exc, + ) + return ApplyResult( + outcome=ApplyOutcome.ERROR, + rule_key=rule_key, + error=str(exc), + ) + + # 事务已提交,下面是非原子的副作用:Redis 计数 / 推送 / 奖励派发 + # 11) Redis 计数器累加(仅在数据真正写入后再扣冷却 / 日上限配额) + redis_counters.set_cooldown(device_id, rule_key, rule.cooldown_seconds) + redis_counters.incr_rule_daily(device_id, rule_key, abs(actual_change)) + if actual_change > 0: + redis_counters.incr_global_daily(device_id, abs(actual_change)) + if event_id: + redis_counters.mark_event_processed(event_id) + + # 12) 跨级奖励发放(A3 方案 B — 每级独立事务) + grant_result = reward_utils.RewardGrantResult() + if new_level > old_level: + grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level) + + # 13) WS 推送 + if setting.enable_notify: + ws.push_affinity_update( + user_id=user_id, + device_id=device_id, + change=actual_change, + before=before_value, + after=after_value, + rule_key=rule_key, + source=source, + ) + if new_level > old_level: + ws.push_level_up( + user_id=user_id, + device_id=device_id, + old_level=old_level, + new_level=new_level, + rewards=grant_result.granted, + ) + elif new_level < old_level: + ws.push_level_down( + user_id=user_id, + device_id=device_id, + old_level=old_level, + new_level=new_level, + ) + + return ApplyResult( + outcome=ApplyOutcome.APPLIED, + change_value=actual_change, + before_value=before_value, + after_value=after_value, + rule_key=rule_key, + old_level=old_level, + new_level=new_level, + log_id=log.id, + rewards_granted=grant_result.granted, + rewards_failed=grant_result.failed, + ) + + # ---------- 管理员调整专用入口(绕过 rule,直接给数值)---------- + + @classmethod + def admin_adjust( + cls, + *, + user_id: int, + device_id: int, + delta: int, + operator_admin_id: int, + reason: str = '', + batch: bool = False, + ) -> ApplyResult: + """管理员手动调整。 + + - 不查 AffinityRule(rule=NULL,rule_key='admin') + - 不查冷却 / 日上限 + - 依然钳位 [0, max_affinity](决策 6:管理员不能突破上限) + - source=admin_adjust_single 或 admin_adjust_batch + - 仍写 AffinityLog、更新等级缓存、发奖励、推 WS + """ + from userapp.models import AffinitySetting + source = 'admin_adjust_batch' if batch else 'admin_adjust_single' + + try: + user_device = UserDevice.active.get(id=device_id, user_id=user_id) + except UserDevice.DoesNotExist: + return ApplyResult( + outcome=ApplyOutcome.ERROR, + rule_key='admin', + error='UserDevice not found or unbound', + ) + + if delta == 0: + return ApplyResult( + outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY, + rule_key='admin', + ) + + setting = AffinitySetting.get_solo() + + try: + with transaction.atomic(): + ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id) + before_value = ud_locked.favorability + after_value = max(0, min(setting.max_affinity, before_value + delta)) + actual_change = after_value - before_value + + if actual_change == 0: + return ApplyResult( + outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY, + rule_key='admin', + before_value=before_value, + after_value=after_value, + ) + + ud_locked.favorability = after_value + from django.utils import timezone as djtz + ud_locked.last_active_at = djtz.now() + ud_locked.save(update_fields=['favorability', 'last_active_at']) + + log = AffinityLog.objects.create( + user_id=user_id, + device=ud_locked, + rule=None, + rule_key='admin', + change_value=actual_change, + before_value=before_value, + after_value=after_value, + source=source, + operator_admin_id=operator_admin_id, + reason=reason, + metadata={'requested_delta': delta}, + ) + + old_level, new_level, _matched = level_utils.update_device_level( + ud_locked, save=True, + ) + + except Exception as exc: + logger.exception( + '[affinity.service.admin_adjust] 失败 user=%s device=%s delta=%s', + user_id, device_id, delta, + ) + return ApplyResult( + outcome=ApplyOutcome.ERROR, + rule_key='admin', + error=str(exc), + ) + + # 副作用 + grant_result = reward_utils.RewardGrantResult() + if new_level > old_level: + grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level) + + if setting.enable_notify: + ws.push_affinity_update( + user_id=user_id, + device_id=device_id, + change=actual_change, + before=before_value, + after=after_value, + rule_key='admin', + source=source, + ) + if new_level > old_level: + ws.push_level_up( + user_id=user_id, device_id=device_id, + old_level=old_level, new_level=new_level, + rewards=grant_result.granted, + ) + elif new_level < old_level: + ws.push_level_down( + user_id=user_id, device_id=device_id, + old_level=old_level, new_level=new_level, + ) + + return ApplyResult( + outcome=ApplyOutcome.APPLIED, + change_value=actual_change, + before_value=before_value, + after_value=after_value, + rule_key='admin', + old_level=old_level, + new_level=new_level, + log_id=log.id, + rewards_granted=grant_result.granted, + rewards_failed=grant_result.failed, + ) diff --git a/qy_lty/userapp/affinity/ws.py b/qy_lty/userapp/affinity/ws.py new file mode 100644 index 0000000..93b8d1f --- /dev/null +++ b/qy_lty/userapp/affinity/ws.py @@ -0,0 +1,89 @@ +"""P2-05 WebSocket 推送 helper + +把好感度变化事件推到 `device_{user_id}` channel layer group, +设备端和手机端的 DeviceConsumer 都加入此分组,会同时收到(设计文档 §9.3)。 + +推送是「fire-and-forget」语义 — channel layer 故障或用户不在线时不应阻塞 service +主流程,全部用 try/except 包裹并仅日志记录。 +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from asgiref.sync import async_to_sync +from channels.layers import get_channel_layer + +logger = logging.getLogger(__name__) + + +def _send(user_id: int, payload: Dict[str, Any]) -> None: + """内部统一推送入口。channel_layer 不可用 / group_send 异常时静默吞掉但记日志。""" + try: + channel_layer = get_channel_layer() + if channel_layer is None: + logger.warning('[affinity.ws] channel_layer 未配置,跳过推送 payload=%s', payload) + return + async_to_sync(channel_layer.group_send)(f'device_{user_id}', payload) + except Exception as exc: # pragma: no cover — 推送失败不影响业务 + logger.warning('[affinity.ws] group_send 失败 user_id=%s err=%s', user_id, exc) + + +def push_affinity_update( + user_id: int, + device_id: Optional[int], + *, + change: int, + before: int, + after: int, + rule_key: str, + source: str, +) -> None: + """好感度数值变化事件。所有触发点(设备 / 手机 / 衰减 / 管理员调整)共用此事件。""" + _send(user_id, { + 'type': 'affinity.update', # consumer 端 handler 名(消费者用 .replace('.', '_')) + 'event': 'affinity_update', + 'device_id': device_id, + 'change': change, + 'before': before, + 'after': after, + 'rule_key': rule_key, + 'source': source, + }) + + +def push_level_up( + user_id: int, + device_id: Optional[int], + *, + old_level: int, + new_level: int, + rewards: List[Dict[str, Any]], +) -> None: + """升级事件。rewards 是本次跨级一次性发放的奖励列表,每级一项。""" + _send(user_id, { + 'type': 'affinity.level.up', + 'event': 'level_up', + 'device_id': device_id, + 'old_level': old_level, + 'new_level': new_level, + 'rewards': rewards, + }) + + +def push_level_down( + user_id: int, + device_id: Optional[int], + *, + old_level: int, + new_level: int, +) -> None: + """降级事件。衰减导致的等级回退,不追回奖励但取消等级解锁内容。""" + _send(user_id, { + 'type': 'affinity.level.down', + 'event': 'level_down', + 'device_id': device_id, + 'old_level': old_level, + 'new_level': new_level, + })