"""P2-04 跨级奖励发放 A3 决策(方案 B):升级时**逐级独立事务**发放奖励,失败的项收集起来供调用方 后续重试/补偿,**已成功的级别不会因后续级别失败而回滚**(这是与方案 A 整体事务 的最大区别)。 发放幂等通过 UserLevelRewardGrant(device, level) 唯一约束保证: - 重复跨过同一等级不会重发(设计文档决策 11) - 衰减回升后再升过同等级也不重发(决策 11) 外部副作用(虚拟货币 +、道具发放)由 _dispatch_reward_to_external_systems 占位 hook 实现 — P2 不接外部系统,先记 reward_snapshot 落库;后续接订阅/卡片/道具 app 时 在该 hook 中调用。 """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from django.db import IntegrityError, transaction from userapp.models import AffinityLevel, AffinitySetting, UserLevelRewardGrant logger = logging.getLogger(__name__) @dataclass class RewardGrantResult: """单次跨级奖励发放的汇总结果""" granted: List[Dict[str, Any]] = field(default_factory=list) # 本次新发放的奖励快照列表 skipped_duplicate: List[int] = field(default_factory=list) # 已发过被跳过的等级号 failed: List[Dict[str, Any]] = field(default_factory=list) # 发放失败的等级(含错误信息) def _build_reward_snapshot(level_obj: AffinityLevel) -> Dict[str, Any]: """把 AffinityLevel 当前奖励配置打成快照(防 admin 后续修改影响审计)""" return { 'level': level_obj.level, 'name': level_obj.name, 'reward_type': level_obj.reward_type, 'reward_currency': level_obj.reward_currency, 'reward_items': list(level_obj.reward_items or []), 'unlock_content': level_obj.unlock_content, } def _dispatch_reward_to_external_systems(user_device, snapshot: Dict[str, Any]) -> None: """实际把奖励发放到外部系统的 hook(虚拟货币、道具、解锁标记等)。 P2 阶段不接外部系统 — 仅日志记录。 P3/P4 阶段接订阅 / 卡片 / 道具 app 时在此实现具体派发逻辑。 """ if snapshot['reward_currency'] > 0: logger.info( '[affinity.rewards] [STUB] 应给设备 %s 发放虚拟货币 %d', user_device.id, snapshot['reward_currency'], ) if snapshot['reward_items']: logger.info( '[affinity.rewards] [STUB] 应给设备 %s 发放道具 %s', user_device.id, snapshot['reward_items'], ) def grant_levels(user_device, from_level: int, to_level: int) -> RewardGrantResult: """逐级独立事务发放 [from_level+1, to_level] 范围内的所有等级奖励。 设计文档决策 3:升级时逐级发放经过的每一级;降级回升后不再补发(永久幂等)。 参数: user_device: UserDevice 实例 from_level: 升级前等级 to_level: 升级后等级(需 > from_level 才会发奖励) 返回: RewardGrantResult,调用方可基于此推送 WS、记日志、入重试队列等 """ result = RewardGrantResult() if to_level <= from_level: return result # AffinitySetting.enable_rewards=False 则只记录跨级日志,不发奖励 if not AffinitySetting.get_solo().enable_rewards: logger.info('[affinity.rewards] enable_rewards=False,跨级 %s→%s 不发奖励', from_level, to_level) return result # 取所有需要发放的等级(按 level 升序) levels_to_grant = list( AffinityLevel.objects .filter(level__gt=from_level, level__lte=to_level, is_enabled=True, is_deleted=False) .order_by('level') ) for level_obj in levels_to_grant: try: with transaction.atomic(): snapshot = _build_reward_snapshot(level_obj) # UniqueConstraint(device, level) 保证同一设备同一等级最多一行 _, created = UserLevelRewardGrant.objects.get_or_create( device=user_device, level=level_obj.level, defaults={ 'device_snapshot_id': user_device.id, 'reward_snapshot': snapshot, }, ) if not created: # 决策 11:曾经达到过就标记,不再重发 result.skipped_duplicate.append(level_obj.level) continue # 派发到外部系统(P2 是 stub)— 若 hook 抛异常会触发本 level 事务回滚 _dispatch_reward_to_external_systems(user_device, snapshot) result.granted.append(snapshot) except IntegrityError as exc: # 并发场景下 UniqueConstraint 命中 — 等价于 already granted result.skipped_duplicate.append(level_obj.level) logger.info('[affinity.rewards] Lv%s 已被并发线程发放:%s', level_obj.level, exc) except Exception as exc: logger.exception('[affinity.rewards] Lv%s 发放失败:%s', level_obj.level, exc) result.failed.append({ 'level': level_obj.level, 'error': str(exc), 'snapshot': _build_reward_snapshot(level_obj), }) return result