lty/qy_lty/userapp/affinity/services.py
pmc f26e78c545 feat(affinity-P2): service 层落地 — 唯一写入入口 + Redis 计数器 + 等级映射 + 跨级奖励 + WS 推送 (P2-01~P2-05)
新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:

- counters.py (P2-02):Redis 三类计数器
  - affinity💿{device_id}:{rule_key} 冷却
  - affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
  - affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
  - 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
  - cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
  - event_id 60s 去重防客户端重复上报

- levels.py (P2-03):等级映射
  - map_value_to_level / update_device_level / progress_to_next_level
  - update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])

- ws.py (P2-05):WebSocket 推送 helper
  - 3 类事件 affinity_update / level_up / level_down
  - asgiref.async_to_sync 包装 channel_layer.group_send
  - 推送故障 fire-and-forget 仅日志记录,不阻塞主流程

- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
  - grant_levels(user_device, from_level, to_level) 逐级独立事务
  - UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
  - _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现

- services.py (P2-01):AffinityService 主入口
  - apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
  - 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
  - admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
  - 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)

- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查

Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。

设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 09:35:53 +08:00

429 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""P2-01 好感度系统服务层 — 唯一写入入口
所有好感度变化(设备事件、手机事件、衰减任务、管理员调整)**必须**经由
`AffinityService.apply()` 处理,确保单一冷却 / 日上限 / 钳位 / 日志 / 等级
更新 / 奖励派发 / WS 推送的语义闭环。
设计文档§4.3 触发计算流程
"""
from __future__ import annotations
import logging
import random
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple
from django.db import transaction
from django.db.models import F
from device_interaction.models import UserDevice
from userapp.models import (
AffinityLog,
AffinityRule,
AffinitySetting,
UserAffinityDailyCounter,
)
from . import counters as redis_counters
from . import levels as level_utils
from . import rewards as reward_utils
from . import ws
logger = logging.getLogger(__name__)
# ---------- 返回类型 ----------
class ApplyOutcome:
"""ApplyResult.outcome 的常量集合"""
APPLIED = 'applied' # 成功写入
NOOP_NO_RULE = 'noop_no_rule' # 规则不存在或已禁用 / 软删
NOOP_COOLDOWN = 'noop_cooldown' # 冷却中
NOOP_RULE_DAILY_CAP = 'noop_rule_daily_cap' # 本规则今日已达上限
NOOP_GLOBAL_DAILY_CAP = 'noop_global_daily_cap' # 全局今日已达上限
NOOP_EVENT_DUP = 'noop_event_duplicate' # 同 event_id 60s 内已处理
NOOP_VALUE_BOUNDARY = 'noop_value_boundary' # 当前已在边界0 还想扣 / 上限还想加)
ERROR = 'error' # 异常
@dataclass
class ApplyResult:
outcome: str
change_value: int = 0
before_value: int = 0
after_value: int = 0
rule_key: str = ''
old_level: int = 0
new_level: int = 0
log_id: Optional[int] = None
rewards_granted: list = field(default_factory=list)
rewards_failed: list = field(default_factory=list)
error: Optional[str] = None
@property
def is_applied(self) -> bool:
return self.outcome == ApplyOutcome.APPLIED
# ---------- 主入口 ----------
class AffinityService:
"""好感度系统服务层"""
@classmethod
def apply(
cls,
*,
user_id: int,
device_id: int,
rule_key: str,
source: str,
event_id: str = '',
metadata: Optional[Dict[str, Any]] = None,
operator_admin_id: Optional[int] = None,
reason: str = '',
) -> ApplyResult:
"""好感度变化的唯一写入入口。
参数:
user_id: ParadiseUser.id用于 AffinityLog.user 关联及 WS group 路由)
device_id: UserDevice.id注意是绑定 ID不是 Device.id
rule_key: AffinityRule.rule_key'chat' / 'gift' / 'decay' / 'admin'
source: AffinityLog.SOURCE_CHOICES 之一
event_id: 客户端事件 UUID用于 60s 内幂等去重
metadata: 扩展上下文 JSON
operator_admin_id / reason: 管理员调整时填写
返回 ApplyResult调用方根据 outcome 决定后续动作。
"""
# 0) event_id 幂等防重
if event_id and redis_counters.event_already_processed(event_id):
return ApplyResult(
outcome=ApplyOutcome.NOOP_EVENT_DUP,
rule_key=rule_key,
)
# 1) 取规则
rule = (
AffinityRule.objects
.filter(rule_key=rule_key, is_enabled=True, is_deleted=False)
.first()
)
if rule is None:
return ApplyResult(outcome=ApplyOutcome.NOOP_NO_RULE, rule_key=rule_key)
# 2) 冷却检查
if redis_counters.is_in_cooldown(device_id, rule_key):
return ApplyResult(outcome=ApplyOutcome.NOOP_COOLDOWN, rule_key=rule_key)
# 3) 取设备(要求绑定有效)
try:
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
except UserDevice.DoesNotExist:
logger.warning(
'[affinity.service] device %s 不属于 user %s 或已解绑,跳过',
device_id, user_id,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key=rule_key,
error='UserDevice not found or unbound',
)
# 4) 计算本次变化值([min, max] 闭区间随机P1 CHECK 保证 min <= max
raw_change = random.randint(rule.min_change, rule.max_change)
# single_cap 钳位(保护性 — P1 CHECK 已保证 single_cap > 0
if raw_change > rule.single_cap:
raw_change = rule.single_cap
elif raw_change < -rule.single_cap:
raw_change = -rule.single_cap
if raw_change == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key=rule_key,
change_value=0,
)
# 5) 本规则日上限检查(用绝对值累加)
abs_change = abs(raw_change)
rule_today = redis_counters.get_rule_daily(device_id, rule_key)
if rule_today + abs_change > rule.daily_cap:
# 允许部分通过:剩余空间 > 0 时按剩余空间钳位
remain = rule.daily_cap - rule_today
if remain <= 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_RULE_DAILY_CAP,
rule_key=rule_key,
)
# 按正负方向取剩余空间
raw_change = remain if raw_change > 0 else -remain
abs_change = remain
# 6) 全局日上限检查(仅正向汇总,衰减不占用)
setting = AffinitySetting.get_solo()
if raw_change > 0:
global_today = redis_counters.get_global_daily(device_id)
if global_today + abs_change > setting.global_daily_cap:
remain = setting.global_daily_cap - global_today
if remain <= 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_GLOBAL_DAILY_CAP,
rule_key=rule_key,
)
raw_change = remain
abs_change = remain
# 7) 原子更新 UserDevice.favorability + 钳位
try:
with transaction.atomic():
# 锁住该 UserDevice 行,避免并发写竞争
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
before_value = ud_locked.favorability
after_value = before_value + raw_change
# 钳位 [0, max_affinity](管理员调整也走此分支,因此天然遵守 CR-001 决策 6
after_value = max(0, min(setting.max_affinity, after_value))
actual_change = after_value - before_value
if actual_change == 0:
# 边界场景:当前值已经在边界,本次实际无变化
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key=rule_key,
before_value=before_value,
after_value=after_value,
)
ud_locked.favorability = after_value
# 同步刷新 last_active_at衰减判断依赖
from django.utils import timezone as djtz
ud_locked.last_active_at = djtz.now()
ud_locked.save(update_fields=['favorability', 'last_active_at'])
# 8) 写 AffinityLog含 cross-app FK 与冗余 rule_key
log = AffinityLog.objects.create(
user_id=user_id,
device=ud_locked,
rule=rule,
rule_key=rule_key,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
source=source,
event_id=event_id or None, # WR-004空 -> NULL
operator_admin_id=operator_admin_id,
reason=reason,
metadata=metadata or {},
)
# 9) 更新数据库兜底计数器Redis 是热路径DB 是审计兜底)
today_date = redis_counters.local_today_date(setting.timezone)
counter, _ = UserAffinityDailyCounter.objects.select_for_update().get_or_create(
device=ud_locked, rule=rule, date=today_date,
defaults={'accumulated_change': 0, 'trigger_count': 0},
)
counter.accumulated_change = F('accumulated_change') + actual_change
counter.trigger_count = F('trigger_count') + 1
counter.save(update_fields=['accumulated_change', 'trigger_count'])
# 10) 更新等级缓存(事务内做,避免半成品状态)
old_level, new_level, _matched = level_utils.update_device_level(
ud_locked, save=True,
)
except Exception as exc:
logger.exception(
'[affinity.service] apply 失败 user=%s device=%s rule=%s: %s',
user_id, device_id, rule_key, exc,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key=rule_key,
error=str(exc),
)
# 事务已提交下面是非原子的副作用Redis 计数 / 推送 / 奖励派发
# 11) Redis 计数器累加(仅在数据真正写入后再扣冷却 / 日上限配额)
redis_counters.set_cooldown(device_id, rule_key, rule.cooldown_seconds)
redis_counters.incr_rule_daily(device_id, rule_key, abs(actual_change))
if actual_change > 0:
redis_counters.incr_global_daily(device_id, abs(actual_change))
if event_id:
redis_counters.mark_event_processed(event_id)
# 12) 跨级奖励发放A3 方案 B — 每级独立事务)
grant_result = reward_utils.RewardGrantResult()
if new_level > old_level:
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
# 13) WS 推送
if setting.enable_notify:
ws.push_affinity_update(
user_id=user_id,
device_id=device_id,
change=actual_change,
before=before_value,
after=after_value,
rule_key=rule_key,
source=source,
)
if new_level > old_level:
ws.push_level_up(
user_id=user_id,
device_id=device_id,
old_level=old_level,
new_level=new_level,
rewards=grant_result.granted,
)
elif new_level < old_level:
ws.push_level_down(
user_id=user_id,
device_id=device_id,
old_level=old_level,
new_level=new_level,
)
return ApplyResult(
outcome=ApplyOutcome.APPLIED,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
rule_key=rule_key,
old_level=old_level,
new_level=new_level,
log_id=log.id,
rewards_granted=grant_result.granted,
rewards_failed=grant_result.failed,
)
# ---------- 管理员调整专用入口(绕过 rule直接给数值----------
@classmethod
def admin_adjust(
cls,
*,
user_id: int,
device_id: int,
delta: int,
operator_admin_id: int,
reason: str = '',
batch: bool = False,
) -> ApplyResult:
"""管理员手动调整。
- 不查 AffinityRulerule=NULLrule_key='admin'
- 不查冷却 / 日上限
- 依然钳位 [0, max_affinity](决策 6管理员不能突破上限
- source=admin_adjust_single 或 admin_adjust_batch
- 仍写 AffinityLog、更新等级缓存、发奖励、推 WS
"""
from userapp.models import AffinitySetting
source = 'admin_adjust_batch' if batch else 'admin_adjust_single'
try:
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
except UserDevice.DoesNotExist:
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key='admin',
error='UserDevice not found or unbound',
)
if delta == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key='admin',
)
setting = AffinitySetting.get_solo()
try:
with transaction.atomic():
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
before_value = ud_locked.favorability
after_value = max(0, min(setting.max_affinity, before_value + delta))
actual_change = after_value - before_value
if actual_change == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key='admin',
before_value=before_value,
after_value=after_value,
)
ud_locked.favorability = after_value
from django.utils import timezone as djtz
ud_locked.last_active_at = djtz.now()
ud_locked.save(update_fields=['favorability', 'last_active_at'])
log = AffinityLog.objects.create(
user_id=user_id,
device=ud_locked,
rule=None,
rule_key='admin',
change_value=actual_change,
before_value=before_value,
after_value=after_value,
source=source,
operator_admin_id=operator_admin_id,
reason=reason,
metadata={'requested_delta': delta},
)
old_level, new_level, _matched = level_utils.update_device_level(
ud_locked, save=True,
)
except Exception as exc:
logger.exception(
'[affinity.service.admin_adjust] 失败 user=%s device=%s delta=%s',
user_id, device_id, delta,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key='admin',
error=str(exc),
)
# 副作用
grant_result = reward_utils.RewardGrantResult()
if new_level > old_level:
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
if setting.enable_notify:
ws.push_affinity_update(
user_id=user_id,
device_id=device_id,
change=actual_change,
before=before_value,
after=after_value,
rule_key='admin',
source=source,
)
if new_level > old_level:
ws.push_level_up(
user_id=user_id, device_id=device_id,
old_level=old_level, new_level=new_level,
rewards=grant_result.granted,
)
elif new_level < old_level:
ws.push_level_down(
user_id=user_id, device_id=device_id,
old_level=old_level, new_level=new_level,
)
return ApplyResult(
outcome=ApplyOutcome.APPLIED,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
rule_key='admin',
old_level=old_level,
new_level=new_level,
log_id=log.id,
rewards_granted=grant_result.granted,
rewards_failed=grant_result.failed,
)