新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:
- counters.py (P2-02):Redis 三类计数器
- affinity💿{device_id}:{rule_key} 冷却
- affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
- affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
- 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
- cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
- event_id 60s 去重防客户端重复上报
- levels.py (P2-03):等级映射
- map_value_to_level / update_device_level / progress_to_next_level
- update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])
- ws.py (P2-05):WebSocket 推送 helper
- 3 类事件 affinity_update / level_up / level_down
- asgiref.async_to_sync 包装 channel_layer.group_send
- 推送故障 fire-and-forget 仅日志记录,不阻塞主流程
- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
- grant_levels(user_device, from_level, to_level) 逐级独立事务
- UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
- _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现
- services.py (P2-01):AffinityService 主入口
- apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
- 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
- admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
- 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)
- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查
Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。
设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
132 lines
5.4 KiB
Python
132 lines
5.4 KiB
Python
"""P2-04 跨级奖励发放
|
||
|
||
A3 决策(方案 B):升级时**逐级独立事务**发放奖励,失败的项收集起来供调用方
|
||
后续重试/补偿,**已成功的级别不会因后续级别失败而回滚**(这是与方案 A 整体事务
|
||
的最大区别)。
|
||
|
||
发放幂等通过 UserLevelRewardGrant(device, level) 唯一约束保证:
|
||
- 重复跨过同一等级不会重发(设计文档决策 11)
|
||
- 衰减回升后再升过同等级也不重发(决策 11)
|
||
|
||
外部副作用(虚拟货币 +、道具发放)由 _dispatch_reward_to_external_systems 占位 hook
|
||
实现 — P2 不接外部系统,先记 reward_snapshot 落库;后续接订阅/卡片/道具 app 时
|
||
在该 hook 中调用。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from django.db import IntegrityError, transaction
|
||
|
||
from userapp.models import AffinityLevel, AffinitySetting, UserLevelRewardGrant
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class RewardGrantResult:
|
||
"""单次跨级奖励发放的汇总结果"""
|
||
granted: List[Dict[str, Any]] = field(default_factory=list) # 本次新发放的奖励快照列表
|
||
skipped_duplicate: List[int] = field(default_factory=list) # 已发过被跳过的等级号
|
||
failed: List[Dict[str, Any]] = field(default_factory=list) # 发放失败的等级(含错误信息)
|
||
|
||
|
||
def _build_reward_snapshot(level_obj: AffinityLevel) -> Dict[str, Any]:
|
||
"""把 AffinityLevel 当前奖励配置打成快照(防 admin 后续修改影响审计)"""
|
||
return {
|
||
'level': level_obj.level,
|
||
'name': level_obj.name,
|
||
'reward_type': level_obj.reward_type,
|
||
'reward_currency': level_obj.reward_currency,
|
||
'reward_items': list(level_obj.reward_items or []),
|
||
'unlock_content': level_obj.unlock_content,
|
||
}
|
||
|
||
|
||
def _dispatch_reward_to_external_systems(user_device, snapshot: Dict[str, Any]) -> None:
|
||
"""实际把奖励发放到外部系统的 hook(虚拟货币、道具、解锁标记等)。
|
||
|
||
P2 阶段不接外部系统 — 仅日志记录。
|
||
P3/P4 阶段接订阅 / 卡片 / 道具 app 时在此实现具体派发逻辑。
|
||
"""
|
||
if snapshot['reward_currency'] > 0:
|
||
logger.info(
|
||
'[affinity.rewards] [STUB] 应给设备 %s 发放虚拟货币 %d',
|
||
user_device.id, snapshot['reward_currency'],
|
||
)
|
||
if snapshot['reward_items']:
|
||
logger.info(
|
||
'[affinity.rewards] [STUB] 应给设备 %s 发放道具 %s',
|
||
user_device.id, snapshot['reward_items'],
|
||
)
|
||
|
||
|
||
def grant_levels(user_device, from_level: int, to_level: int) -> RewardGrantResult:
|
||
"""逐级独立事务发放 [from_level+1, to_level] 范围内的所有等级奖励。
|
||
|
||
设计文档决策 3:升级时逐级发放经过的每一级;降级回升后不再补发(永久幂等)。
|
||
|
||
参数:
|
||
user_device: UserDevice 实例
|
||
from_level: 升级前等级
|
||
to_level: 升级后等级(需 > from_level 才会发奖励)
|
||
返回:
|
||
RewardGrantResult,调用方可基于此推送 WS、记日志、入重试队列等
|
||
"""
|
||
result = RewardGrantResult()
|
||
if to_level <= from_level:
|
||
return result
|
||
|
||
# AffinitySetting.enable_rewards=False 则只记录跨级日志,不发奖励
|
||
if not AffinitySetting.get_solo().enable_rewards:
|
||
logger.info('[affinity.rewards] enable_rewards=False,跨级 %s→%s 不发奖励',
|
||
from_level, to_level)
|
||
return result
|
||
|
||
# 取所有需要发放的等级(按 level 升序)
|
||
levels_to_grant = list(
|
||
AffinityLevel.objects
|
||
.filter(level__gt=from_level, level__lte=to_level,
|
||
is_enabled=True, is_deleted=False)
|
||
.order_by('level')
|
||
)
|
||
|
||
for level_obj in levels_to_grant:
|
||
try:
|
||
with transaction.atomic():
|
||
snapshot = _build_reward_snapshot(level_obj)
|
||
# UniqueConstraint(device, level) 保证同一设备同一等级最多一行
|
||
_, created = UserLevelRewardGrant.objects.get_or_create(
|
||
device=user_device,
|
||
level=level_obj.level,
|
||
defaults={
|
||
'device_snapshot_id': user_device.id,
|
||
'reward_snapshot': snapshot,
|
||
},
|
||
)
|
||
if not created:
|
||
# 决策 11:曾经达到过就标记,不再重发
|
||
result.skipped_duplicate.append(level_obj.level)
|
||
continue
|
||
|
||
# 派发到外部系统(P2 是 stub)— 若 hook 抛异常会触发本 level 事务回滚
|
||
_dispatch_reward_to_external_systems(user_device, snapshot)
|
||
result.granted.append(snapshot)
|
||
|
||
except IntegrityError as exc:
|
||
# 并发场景下 UniqueConstraint 命中 — 等价于 already granted
|
||
result.skipped_duplicate.append(level_obj.level)
|
||
logger.info('[affinity.rewards] Lv%s 已被并发线程发放:%s', level_obj.level, exc)
|
||
except Exception as exc:
|
||
logger.exception('[affinity.rewards] Lv%s 发放失败:%s', level_obj.level, exc)
|
||
result.failed.append({
|
||
'level': level_obj.level,
|
||
'error': str(exc),
|
||
'snapshot': _build_reward_snapshot(level_obj),
|
||
})
|
||
|
||
return result
|