新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:
- counters.py (P2-02):Redis 三类计数器
- affinity💿{device_id}:{rule_key} 冷却
- affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
- affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
- 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
- cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
- event_id 60s 去重防客户端重复上报
- levels.py (P2-03):等级映射
- map_value_to_level / update_device_level / progress_to_next_level
- update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])
- ws.py (P2-05):WebSocket 推送 helper
- 3 类事件 affinity_update / level_up / level_down
- asgiref.async_to_sync 包装 channel_layer.group_send
- 推送故障 fire-and-forget 仅日志记录,不阻塞主流程
- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
- grant_levels(user_device, from_level, to_level) 逐级独立事务
- UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
- _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现
- services.py (P2-01):AffinityService 主入口
- apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
- 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
- admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
- 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)
- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查
Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。
设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
429 lines
16 KiB
Python
429 lines
16 KiB
Python
"""P2-01 好感度系统服务层 — 唯一写入入口
|
||
|
||
所有好感度变化(设备事件、手机事件、衰减任务、管理员调整)**必须**经由
|
||
`AffinityService.apply()` 处理,确保单一冷却 / 日上限 / 钳位 / 日志 / 等级
|
||
更新 / 奖励派发 / WS 推送的语义闭环。
|
||
|
||
设计文档:§4.3 触发计算流程
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import random
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Dict, Optional, Tuple
|
||
|
||
from django.db import transaction
|
||
from django.db.models import F
|
||
|
||
from device_interaction.models import UserDevice
|
||
from userapp.models import (
|
||
AffinityLog,
|
||
AffinityRule,
|
||
AffinitySetting,
|
||
UserAffinityDailyCounter,
|
||
)
|
||
|
||
from . import counters as redis_counters
|
||
from . import levels as level_utils
|
||
from . import rewards as reward_utils
|
||
from . import ws
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------- 返回类型 ----------
|
||
|
||
class ApplyOutcome:
|
||
"""ApplyResult.outcome 的常量集合"""
|
||
APPLIED = 'applied' # 成功写入
|
||
NOOP_NO_RULE = 'noop_no_rule' # 规则不存在或已禁用 / 软删
|
||
NOOP_COOLDOWN = 'noop_cooldown' # 冷却中
|
||
NOOP_RULE_DAILY_CAP = 'noop_rule_daily_cap' # 本规则今日已达上限
|
||
NOOP_GLOBAL_DAILY_CAP = 'noop_global_daily_cap' # 全局今日已达上限
|
||
NOOP_EVENT_DUP = 'noop_event_duplicate' # 同 event_id 60s 内已处理
|
||
NOOP_VALUE_BOUNDARY = 'noop_value_boundary' # 当前已在边界(0 还想扣 / 上限还想加)
|
||
ERROR = 'error' # 异常
|
||
|
||
|
||
@dataclass
|
||
class ApplyResult:
|
||
outcome: str
|
||
change_value: int = 0
|
||
before_value: int = 0
|
||
after_value: int = 0
|
||
rule_key: str = ''
|
||
old_level: int = 0
|
||
new_level: int = 0
|
||
log_id: Optional[int] = None
|
||
rewards_granted: list = field(default_factory=list)
|
||
rewards_failed: list = field(default_factory=list)
|
||
error: Optional[str] = None
|
||
|
||
@property
|
||
def is_applied(self) -> bool:
|
||
return self.outcome == ApplyOutcome.APPLIED
|
||
|
||
|
||
# ---------- 主入口 ----------
|
||
|
||
class AffinityService:
|
||
"""好感度系统服务层"""
|
||
|
||
@classmethod
|
||
def apply(
|
||
cls,
|
||
*,
|
||
user_id: int,
|
||
device_id: int,
|
||
rule_key: str,
|
||
source: str,
|
||
event_id: str = '',
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
operator_admin_id: Optional[int] = None,
|
||
reason: str = '',
|
||
) -> ApplyResult:
|
||
"""好感度变化的唯一写入入口。
|
||
|
||
参数:
|
||
user_id: ParadiseUser.id(用于 AffinityLog.user 关联及 WS group 路由)
|
||
device_id: UserDevice.id(注意是绑定 ID,不是 Device.id)
|
||
rule_key: AffinityRule.rule_key(如 'chat' / 'gift' / 'decay' / 'admin')
|
||
source: AffinityLog.SOURCE_CHOICES 之一
|
||
event_id: 客户端事件 UUID,用于 60s 内幂等去重
|
||
metadata: 扩展上下文 JSON
|
||
operator_admin_id / reason: 管理员调整时填写
|
||
|
||
返回 ApplyResult,调用方根据 outcome 决定后续动作。
|
||
"""
|
||
# 0) event_id 幂等防重
|
||
if event_id and redis_counters.event_already_processed(event_id):
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_EVENT_DUP,
|
||
rule_key=rule_key,
|
||
)
|
||
|
||
# 1) 取规则
|
||
rule = (
|
||
AffinityRule.objects
|
||
.filter(rule_key=rule_key, is_enabled=True, is_deleted=False)
|
||
.first()
|
||
)
|
||
if rule is None:
|
||
return ApplyResult(outcome=ApplyOutcome.NOOP_NO_RULE, rule_key=rule_key)
|
||
|
||
# 2) 冷却检查
|
||
if redis_counters.is_in_cooldown(device_id, rule_key):
|
||
return ApplyResult(outcome=ApplyOutcome.NOOP_COOLDOWN, rule_key=rule_key)
|
||
|
||
# 3) 取设备(要求绑定有效)
|
||
try:
|
||
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
|
||
except UserDevice.DoesNotExist:
|
||
logger.warning(
|
||
'[affinity.service] device %s 不属于 user %s 或已解绑,跳过',
|
||
device_id, user_id,
|
||
)
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.ERROR,
|
||
rule_key=rule_key,
|
||
error='UserDevice not found or unbound',
|
||
)
|
||
|
||
# 4) 计算本次变化值([min, max] 闭区间随机;P1 CHECK 保证 min <= max)
|
||
raw_change = random.randint(rule.min_change, rule.max_change)
|
||
# single_cap 钳位(保护性 — P1 CHECK 已保证 single_cap > 0)
|
||
if raw_change > rule.single_cap:
|
||
raw_change = rule.single_cap
|
||
elif raw_change < -rule.single_cap:
|
||
raw_change = -rule.single_cap
|
||
|
||
if raw_change == 0:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
|
||
rule_key=rule_key,
|
||
change_value=0,
|
||
)
|
||
|
||
# 5) 本规则日上限检查(用绝对值累加)
|
||
abs_change = abs(raw_change)
|
||
rule_today = redis_counters.get_rule_daily(device_id, rule_key)
|
||
if rule_today + abs_change > rule.daily_cap:
|
||
# 允许部分通过:剩余空间 > 0 时按剩余空间钳位
|
||
remain = rule.daily_cap - rule_today
|
||
if remain <= 0:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_RULE_DAILY_CAP,
|
||
rule_key=rule_key,
|
||
)
|
||
# 按正负方向取剩余空间
|
||
raw_change = remain if raw_change > 0 else -remain
|
||
abs_change = remain
|
||
|
||
# 6) 全局日上限检查(仅正向汇总,衰减不占用)
|
||
setting = AffinitySetting.get_solo()
|
||
if raw_change > 0:
|
||
global_today = redis_counters.get_global_daily(device_id)
|
||
if global_today + abs_change > setting.global_daily_cap:
|
||
remain = setting.global_daily_cap - global_today
|
||
if remain <= 0:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_GLOBAL_DAILY_CAP,
|
||
rule_key=rule_key,
|
||
)
|
||
raw_change = remain
|
||
abs_change = remain
|
||
|
||
# 7) 原子更新 UserDevice.favorability + 钳位
|
||
try:
|
||
with transaction.atomic():
|
||
# 锁住该 UserDevice 行,避免并发写竞争
|
||
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
|
||
before_value = ud_locked.favorability
|
||
after_value = before_value + raw_change
|
||
# 钳位 [0, max_affinity](管理员调整也走此分支,因此天然遵守 CR-001 决策 6)
|
||
after_value = max(0, min(setting.max_affinity, after_value))
|
||
actual_change = after_value - before_value
|
||
|
||
if actual_change == 0:
|
||
# 边界场景:当前值已经在边界,本次实际无变化
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
|
||
rule_key=rule_key,
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
)
|
||
|
||
ud_locked.favorability = after_value
|
||
# 同步刷新 last_active_at(衰减判断依赖)
|
||
from django.utils import timezone as djtz
|
||
ud_locked.last_active_at = djtz.now()
|
||
ud_locked.save(update_fields=['favorability', 'last_active_at'])
|
||
|
||
# 8) 写 AffinityLog(含 cross-app FK 与冗余 rule_key)
|
||
log = AffinityLog.objects.create(
|
||
user_id=user_id,
|
||
device=ud_locked,
|
||
rule=rule,
|
||
rule_key=rule_key,
|
||
change_value=actual_change,
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
source=source,
|
||
event_id=event_id or None, # WR-004:空 -> NULL
|
||
operator_admin_id=operator_admin_id,
|
||
reason=reason,
|
||
metadata=metadata or {},
|
||
)
|
||
|
||
# 9) 更新数据库兜底计数器(Redis 是热路径,DB 是审计兜底)
|
||
today_date = redis_counters.local_today_date(setting.timezone)
|
||
counter, _ = UserAffinityDailyCounter.objects.select_for_update().get_or_create(
|
||
device=ud_locked, rule=rule, date=today_date,
|
||
defaults={'accumulated_change': 0, 'trigger_count': 0},
|
||
)
|
||
counter.accumulated_change = F('accumulated_change') + actual_change
|
||
counter.trigger_count = F('trigger_count') + 1
|
||
counter.save(update_fields=['accumulated_change', 'trigger_count'])
|
||
|
||
# 10) 更新等级缓存(事务内做,避免半成品状态)
|
||
old_level, new_level, _matched = level_utils.update_device_level(
|
||
ud_locked, save=True,
|
||
)
|
||
|
||
except Exception as exc:
|
||
logger.exception(
|
||
'[affinity.service] apply 失败 user=%s device=%s rule=%s: %s',
|
||
user_id, device_id, rule_key, exc,
|
||
)
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.ERROR,
|
||
rule_key=rule_key,
|
||
error=str(exc),
|
||
)
|
||
|
||
# 事务已提交,下面是非原子的副作用:Redis 计数 / 推送 / 奖励派发
|
||
# 11) Redis 计数器累加(仅在数据真正写入后再扣冷却 / 日上限配额)
|
||
redis_counters.set_cooldown(device_id, rule_key, rule.cooldown_seconds)
|
||
redis_counters.incr_rule_daily(device_id, rule_key, abs(actual_change))
|
||
if actual_change > 0:
|
||
redis_counters.incr_global_daily(device_id, abs(actual_change))
|
||
if event_id:
|
||
redis_counters.mark_event_processed(event_id)
|
||
|
||
# 12) 跨级奖励发放(A3 方案 B — 每级独立事务)
|
||
grant_result = reward_utils.RewardGrantResult()
|
||
if new_level > old_level:
|
||
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
|
||
|
||
# 13) WS 推送
|
||
if setting.enable_notify:
|
||
ws.push_affinity_update(
|
||
user_id=user_id,
|
||
device_id=device_id,
|
||
change=actual_change,
|
||
before=before_value,
|
||
after=after_value,
|
||
rule_key=rule_key,
|
||
source=source,
|
||
)
|
||
if new_level > old_level:
|
||
ws.push_level_up(
|
||
user_id=user_id,
|
||
device_id=device_id,
|
||
old_level=old_level,
|
||
new_level=new_level,
|
||
rewards=grant_result.granted,
|
||
)
|
||
elif new_level < old_level:
|
||
ws.push_level_down(
|
||
user_id=user_id,
|
||
device_id=device_id,
|
||
old_level=old_level,
|
||
new_level=new_level,
|
||
)
|
||
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.APPLIED,
|
||
change_value=actual_change,
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
rule_key=rule_key,
|
||
old_level=old_level,
|
||
new_level=new_level,
|
||
log_id=log.id,
|
||
rewards_granted=grant_result.granted,
|
||
rewards_failed=grant_result.failed,
|
||
)
|
||
|
||
# ---------- 管理员调整专用入口(绕过 rule,直接给数值)----------
|
||
|
||
@classmethod
|
||
def admin_adjust(
|
||
cls,
|
||
*,
|
||
user_id: int,
|
||
device_id: int,
|
||
delta: int,
|
||
operator_admin_id: int,
|
||
reason: str = '',
|
||
batch: bool = False,
|
||
) -> ApplyResult:
|
||
"""管理员手动调整。
|
||
|
||
- 不查 AffinityRule(rule=NULL,rule_key='admin')
|
||
- 不查冷却 / 日上限
|
||
- 依然钳位 [0, max_affinity](决策 6:管理员不能突破上限)
|
||
- source=admin_adjust_single 或 admin_adjust_batch
|
||
- 仍写 AffinityLog、更新等级缓存、发奖励、推 WS
|
||
"""
|
||
from userapp.models import AffinitySetting
|
||
source = 'admin_adjust_batch' if batch else 'admin_adjust_single'
|
||
|
||
try:
|
||
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
|
||
except UserDevice.DoesNotExist:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.ERROR,
|
||
rule_key='admin',
|
||
error='UserDevice not found or unbound',
|
||
)
|
||
|
||
if delta == 0:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
|
||
rule_key='admin',
|
||
)
|
||
|
||
setting = AffinitySetting.get_solo()
|
||
|
||
try:
|
||
with transaction.atomic():
|
||
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
|
||
before_value = ud_locked.favorability
|
||
after_value = max(0, min(setting.max_affinity, before_value + delta))
|
||
actual_change = after_value - before_value
|
||
|
||
if actual_change == 0:
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
|
||
rule_key='admin',
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
)
|
||
|
||
ud_locked.favorability = after_value
|
||
from django.utils import timezone as djtz
|
||
ud_locked.last_active_at = djtz.now()
|
||
ud_locked.save(update_fields=['favorability', 'last_active_at'])
|
||
|
||
log = AffinityLog.objects.create(
|
||
user_id=user_id,
|
||
device=ud_locked,
|
||
rule=None,
|
||
rule_key='admin',
|
||
change_value=actual_change,
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
source=source,
|
||
operator_admin_id=operator_admin_id,
|
||
reason=reason,
|
||
metadata={'requested_delta': delta},
|
||
)
|
||
|
||
old_level, new_level, _matched = level_utils.update_device_level(
|
||
ud_locked, save=True,
|
||
)
|
||
|
||
except Exception as exc:
|
||
logger.exception(
|
||
'[affinity.service.admin_adjust] 失败 user=%s device=%s delta=%s',
|
||
user_id, device_id, delta,
|
||
)
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.ERROR,
|
||
rule_key='admin',
|
||
error=str(exc),
|
||
)
|
||
|
||
# 副作用
|
||
grant_result = reward_utils.RewardGrantResult()
|
||
if new_level > old_level:
|
||
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
|
||
|
||
if setting.enable_notify:
|
||
ws.push_affinity_update(
|
||
user_id=user_id,
|
||
device_id=device_id,
|
||
change=actual_change,
|
||
before=before_value,
|
||
after=after_value,
|
||
rule_key='admin',
|
||
source=source,
|
||
)
|
||
if new_level > old_level:
|
||
ws.push_level_up(
|
||
user_id=user_id, device_id=device_id,
|
||
old_level=old_level, new_level=new_level,
|
||
rewards=grant_result.granted,
|
||
)
|
||
elif new_level < old_level:
|
||
ws.push_level_down(
|
||
user_id=user_id, device_id=device_id,
|
||
old_level=old_level, new_level=new_level,
|
||
)
|
||
|
||
return ApplyResult(
|
||
outcome=ApplyOutcome.APPLIED,
|
||
change_value=actual_change,
|
||
before_value=before_value,
|
||
after_value=after_value,
|
||
rule_key='admin',
|
||
old_level=old_level,
|
||
new_level=new_level,
|
||
log_id=log.id,
|
||
rewards_granted=grant_result.granted,
|
||
rewards_failed=grant_result.failed,
|
||
)
|