pmc f26e78c545 feat(affinity-P2): service 层落地 — 唯一写入入口 + Redis 计数器 + 等级映射 + 跨级奖励 + WS 推送 (P2-01~P2-05)
新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:

- counters.py (P2-02):Redis 三类计数器
  - affinity💿{device_id}:{rule_key} 冷却
  - affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
  - affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
  - 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
  - cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
  - event_id 60s 去重防客户端重复上报

- levels.py (P2-03):等级映射
  - map_value_to_level / update_device_level / progress_to_next_level
  - update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])

- ws.py (P2-05):WebSocket 推送 helper
  - 3 类事件 affinity_update / level_up / level_down
  - asgiref.async_to_sync 包装 channel_layer.group_send
  - 推送故障 fire-and-forget 仅日志记录,不阻塞主流程

- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
  - grant_levels(user_device, from_level, to_level) 逐级独立事务
  - UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
  - _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现

- services.py (P2-01):AffinityService 主入口
  - apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
  - 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
  - admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
  - 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)

- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查

Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。

设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 09:35:53 +08:00

132 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""P2-04 跨级奖励发放
A3 决策(方案 B升级时**逐级独立事务**发放奖励,失败的项收集起来供调用方
后续重试/补偿,**已成功的级别不会因后续级别失败而回滚**(这是与方案 A 整体事务
的最大区别)。
发放幂等通过 UserLevelRewardGrant(device, level) 唯一约束保证:
- 重复跨过同一等级不会重发(设计文档决策 11
- 衰减回升后再升过同等级也不重发(决策 11
外部副作用(虚拟货币 +、道具发放)由 _dispatch_reward_to_external_systems 占位 hook
实现 — P2 不接外部系统,先记 reward_snapshot 落库;后续接订阅/卡片/道具 app 时
在该 hook 中调用。
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from django.db import IntegrityError, transaction
from userapp.models import AffinityLevel, AffinitySetting, UserLevelRewardGrant
logger = logging.getLogger(__name__)
@dataclass
class RewardGrantResult:
"""单次跨级奖励发放的汇总结果"""
granted: List[Dict[str, Any]] = field(default_factory=list) # 本次新发放的奖励快照列表
skipped_duplicate: List[int] = field(default_factory=list) # 已发过被跳过的等级号
failed: List[Dict[str, Any]] = field(default_factory=list) # 发放失败的等级(含错误信息)
def _build_reward_snapshot(level_obj: AffinityLevel) -> Dict[str, Any]:
"""把 AffinityLevel 当前奖励配置打成快照(防 admin 后续修改影响审计)"""
return {
'level': level_obj.level,
'name': level_obj.name,
'reward_type': level_obj.reward_type,
'reward_currency': level_obj.reward_currency,
'reward_items': list(level_obj.reward_items or []),
'unlock_content': level_obj.unlock_content,
}
def _dispatch_reward_to_external_systems(user_device, snapshot: Dict[str, Any]) -> None:
"""实际把奖励发放到外部系统的 hook虚拟货币、道具、解锁标记等
P2 阶段不接外部系统 — 仅日志记录。
P3/P4 阶段接订阅 / 卡片 / 道具 app 时在此实现具体派发逻辑。
"""
if snapshot['reward_currency'] > 0:
logger.info(
'[affinity.rewards] [STUB] 应给设备 %s 发放虚拟货币 %d',
user_device.id, snapshot['reward_currency'],
)
if snapshot['reward_items']:
logger.info(
'[affinity.rewards] [STUB] 应给设备 %s 发放道具 %s',
user_device.id, snapshot['reward_items'],
)
def grant_levels(user_device, from_level: int, to_level: int) -> RewardGrantResult:
"""逐级独立事务发放 [from_level+1, to_level] 范围内的所有等级奖励。
设计文档决策 3升级时逐级发放经过的每一级降级回升后不再补发永久幂等
参数:
user_device: UserDevice 实例
from_level: 升级前等级
to_level: 升级后等级(需 > from_level 才会发奖励)
返回:
RewardGrantResult调用方可基于此推送 WS、记日志、入重试队列等
"""
result = RewardGrantResult()
if to_level <= from_level:
return result
# AffinitySetting.enable_rewards=False 则只记录跨级日志,不发奖励
if not AffinitySetting.get_solo().enable_rewards:
logger.info('[affinity.rewards] enable_rewards=False跨级 %s%s 不发奖励',
from_level, to_level)
return result
# 取所有需要发放的等级(按 level 升序)
levels_to_grant = list(
AffinityLevel.objects
.filter(level__gt=from_level, level__lte=to_level,
is_enabled=True, is_deleted=False)
.order_by('level')
)
for level_obj in levels_to_grant:
try:
with transaction.atomic():
snapshot = _build_reward_snapshot(level_obj)
# UniqueConstraint(device, level) 保证同一设备同一等级最多一行
_, created = UserLevelRewardGrant.objects.get_or_create(
device=user_device,
level=level_obj.level,
defaults={
'device_snapshot_id': user_device.id,
'reward_snapshot': snapshot,
},
)
if not created:
# 决策 11曾经达到过就标记不再重发
result.skipped_duplicate.append(level_obj.level)
continue
# 派发到外部系统P2 是 stub— 若 hook 抛异常会触发本 level 事务回滚
_dispatch_reward_to_external_systems(user_device, snapshot)
result.granted.append(snapshot)
except IntegrityError as exc:
# 并发场景下 UniqueConstraint 命中 — 等价于 already granted
result.skipped_duplicate.append(level_obj.level)
logger.info('[affinity.rewards] Lv%s 已被并发线程发放:%s', level_obj.level, exc)
except Exception as exc:
logger.exception('[affinity.rewards] Lv%s 发放失败:%s', level_obj.level, exc)
result.failed.append({
'level': level_obj.level,
'error': str(exc),
'snapshot': _build_reward_snapshot(level_obj),
})
return result