feat(affinity-P2): service 层落地 — 唯一写入入口 + Redis 计数器 + 等级映射 + 跨级奖励 + WS 推送 (P2-01~P2-05)

新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:

- counters.py (P2-02):Redis 三类计数器
  - affinity💿{device_id}:{rule_key} 冷却
  - affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
  - affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
  - 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
  - cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
  - event_id 60s 去重防客户端重复上报

- levels.py (P2-03):等级映射
  - map_value_to_level / update_device_level / progress_to_next_level
  - update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])

- ws.py (P2-05):WebSocket 推送 helper
  - 3 类事件 affinity_update / level_up / level_down
  - asgiref.async_to_sync 包装 channel_layer.group_send
  - 推送故障 fire-and-forget 仅日志记录,不阻塞主流程

- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
  - grant_levels(user_device, from_level, to_level) 逐级独立事务
  - UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
  - _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现

- services.py (P2-01):AffinityService 主入口
  - apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
  - 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
  - admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
  - 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)

- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查

Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。

设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
pmc 2026-05-14 09:35:53 +08:00
parent f66e2dfc86
commit f26e78c545
6 changed files with 898 additions and 0 deletions

View File

@ -0,0 +1,147 @@
"""P2-02 Redis 计数器工具
封装三类计数器与冷却判断统一用 django-redis (`django.core.cache`) 接入
Key 命名约定与设计文档 §4.3 触发流程一致
affinity:cd:{device_id}:{rule_key} 冷却值任意存在即冷却中
affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则单设备日累计绝对值
affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日累计仅正向规则
自然日基准`AffinitySetting.timezone`默认 Asia/Shanghai全用户统一
TTL 策略日计数器 TTL=48h远超 1 自然日边界 key 自然过期冷却 TTL=规则 cooldown_seconds
"""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from django.core.cache import cache
from django.utils import timezone
try:
from zoneinfo import ZoneInfo # Python 3.9+
except ImportError: # pragma: no cover
from backports.zoneinfo import ZoneInfo # type: ignore
# 日计数器 TTL — 48 小时足以覆盖最长时区偏移和单次"跨天延迟"场景
DAILY_COUNTER_TTL = 48 * 3600
# ---------- 时区与日期 ----------
def get_setting_timezone() -> str:
"""读取 AffinitySetting.timezone容错表不存在或单例缺失时回退 Asia/Shanghai"""
try:
from userapp.models import AffinitySetting
return AffinitySetting.get_solo().timezone or 'Asia/Shanghai'
except Exception: # AppConfig 启动期 / 测试中表未建好等场景
return 'Asia/Shanghai'
def local_today_str(tz_name: Optional[str] = None) -> str:
"""返回基于 AffinitySetting.timezone 的"今日"日期字符串 YYYYMMDD"""
tz_name = tz_name or get_setting_timezone()
now_utc = timezone.now()
local_now = now_utc.astimezone(ZoneInfo(tz_name))
return local_now.strftime('%Y%m%d')
def local_today_date(tz_name: Optional[str] = None):
"""返回 AffinitySetting.timezone 的"今日" date 对象(供 UserAffinityDailyCounter.date 用)"""
tz_name = tz_name or get_setting_timezone()
now_utc = timezone.now()
return now_utc.astimezone(ZoneInfo(tz_name)).date()
# ---------- 冷却 ----------
def cooldown_key(device_id: int, rule_key: str) -> str:
return f'affinity:cd:{device_id}:{rule_key}'
def is_in_cooldown(device_id: int, rule_key: str) -> bool:
"""返回 True 表示尚未过冷却,本次触发应拒绝"""
return cache.get(cooldown_key(device_id, rule_key)) is not None
def set_cooldown(device_id: int, rule_key: str, seconds: int) -> None:
"""设置冷却。seconds<=0 时不做任何事(视为无冷却规则)"""
if seconds <= 0:
return
cache.set(cooldown_key(device_id, rule_key), 1, timeout=seconds)
# ---------- 单规则日计数器 ----------
def rule_daily_key(device_id: int, rule_key: str, date_str: Optional[str] = None) -> str:
return f'affinity:daily:{device_id}:{rule_key}:{date_str or local_today_str()}'
def get_rule_daily(device_id: int, rule_key: str) -> int:
"""读取本规则本设备今日累计(绝对值累加,不区分正负)"""
val = cache.get(rule_daily_key(device_id, rule_key))
return int(val) if val is not None else 0
def incr_rule_daily(device_id: int, rule_key: str, delta_abs: int) -> int:
"""原子累加本规则日计数器返回累加后的值。delta_abs 必须 > 0。
首次写入时通过 cache.add 设置 TTL之后用 cache.incr 原子累加TTL 不变
"""
if delta_abs <= 0:
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
key = rule_daily_key(device_id, rule_key)
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
return delta_abs
try:
return cache.incr(key, delta_abs)
except ValueError:
# cache.add 写入后 race-condition 失败极端情况兜底
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
return delta_abs
# ---------- 全局日计数器(仅正向汇总)----------
def global_daily_key(device_id: int, date_str: Optional[str] = None) -> str:
return f'affinity:daily:{device_id}:_global:{date_str or local_today_str()}'
def get_global_daily(device_id: int) -> int:
"""读取本设备今日全局正向好感度累计(跨规则汇总)"""
val = cache.get(global_daily_key(device_id))
return int(val) if val is not None else 0
def incr_global_daily(device_id: int, delta_abs: int) -> int:
"""原子累加全局日计数器(仅在正向变化时调用)。返回累加后的值。"""
if delta_abs <= 0:
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
key = global_daily_key(device_id)
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
return delta_abs
try:
return cache.incr(key, delta_abs)
except ValueError:
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
return delta_abs
# ---------- 事件去重event_id 60s 缓存)----------
def event_seen_key(event_id: str) -> str:
return f'affinity:event:{event_id}'
def event_already_processed(event_id: str) -> bool:
"""若 event_id 已在 60s 内处理过,返回 True"""
if not event_id:
return False
return cache.get(event_seen_key(event_id)) is not None
def mark_event_processed(event_id: str, ttl_seconds: int = 60) -> None:
if not event_id:
return
cache.set(event_seen_key(event_id), 1, timeout=ttl_seconds)

View File

@ -0,0 +1,87 @@
"""P2-03 等级映射 + UserDevice.affinity_level 缓存更新
根据好感度数值映射到对应等级AffinityLevel并把结果写回 UserDevice.affinity_level
作为缓存
设计依据好感度系统功能与规则设计.md§6.3 等级变化规则
- 等级由好感度区间自动映射每台设备独立判定
- 跨级判定每次好感度变动后取当前值所属区间与上一次等级比较
"""
from __future__ import annotations
from typing import Optional, Tuple
from userapp.models import AffinityLevel
def map_value_to_level(value: int) -> Optional[AffinityLevel]:
"""根据好感度数值找出所属的 AffinityLevel。
匹配规则min_affinity <= value <= max_affinity is_enabled=True is_deleted=False
返回最高 level 优先避免重叠区间时的歧义 P1 已加 clean 校验拦截重叠
若没有匹配到任何区间则返回 None理论上不应发生因为等级区间应覆盖 [0, max_affinity]
"""
return (
AffinityLevel.objects
.filter(
min_affinity__lte=value,
max_affinity__gte=value,
is_enabled=True,
is_deleted=False,
)
.order_by('-level')
.first()
)
def progress_to_next_level(value: int, current_level: AffinityLevel) -> dict:
"""计算当前值在本等级区间内的进度百分比 + 到下一等级的距离。
返回
{
'percent': 0~100 浮点当前值在本等级区间内的位置百分比,
'next_level': AffinityLevel None,
'points_to_next': int None,
}
"""
span = max(current_level.max_affinity - current_level.min_affinity, 1)
percent = round((value - current_level.min_affinity) / span * 100, 2)
next_level = (
AffinityLevel.objects
.filter(level__gt=current_level.level, is_enabled=True, is_deleted=False)
.order_by('level')
.first()
)
points_to_next = None
if next_level is not None:
points_to_next = max(next_level.min_affinity - value, 0)
return {
'percent': max(0.0, min(100.0, percent)),
'next_level': next_level,
'points_to_next': points_to_next,
}
def update_device_level(user_device, save: bool = True) -> Tuple[int, int, Optional[AffinityLevel]]:
"""根据 user_device.favorability 重新计算并更新 affinity_level 缓存字段。
返回 (old_level, new_level, matched_level_obj)
若没匹配到任何 AffinityLevelnew_level 保持原值matched_level_obj=None
save=False 时不调 .save()由调用方批量保存service
"""
old_level = user_device.affinity_level
matched = map_value_to_level(user_device.favorability)
if matched is None:
return old_level, old_level, None
new_level = matched.level
if new_level != old_level:
user_device.affinity_level = new_level
if save:
user_device.save(update_fields=['affinity_level'])
return old_level, new_level, matched

View File

@ -0,0 +1,16 @@
"""Admin 接口权限:要求已登录 + is_staff"""
from rest_framework.permissions import IsAuthenticated
class IsAdminUserStaff(IsAuthenticated):
"""已登录用户 + is_staff=True。
沿用项目既有 ViewSet get_permissions() `request.user.is_staff` 检查惯例
封装成可复用的 permission
"""
def has_permission(self, request, view):
if not super().has_permission(request, view):
return False
return bool(request.user and request.user.is_authenticated and request.user.is_staff)

View File

@ -0,0 +1,131 @@
"""P2-04 跨级奖励发放
A3 决策方案 B升级时**逐级独立事务**发放奖励失败的项收集起来供调用方
后续重试/补偿**已成功的级别不会因后续级别失败而回滚**这是与方案 A 整体事务
的最大区别
发放幂等通过 UserLevelRewardGrant(device, level) 唯一约束保证
- 重复跨过同一等级不会重发设计文档决策 11
- 衰减回升后再升过同等级也不重发决策 11
外部副作用虚拟货币 +道具发放 _dispatch_reward_to_external_systems 占位 hook
实现 P2 不接外部系统先记 reward_snapshot 落库后续接订阅/卡片/道具 app
在该 hook 中调用
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from django.db import IntegrityError, transaction
from userapp.models import AffinityLevel, AffinitySetting, UserLevelRewardGrant
logger = logging.getLogger(__name__)
@dataclass
class RewardGrantResult:
"""单次跨级奖励发放的汇总结果"""
granted: List[Dict[str, Any]] = field(default_factory=list) # 本次新发放的奖励快照列表
skipped_duplicate: List[int] = field(default_factory=list) # 已发过被跳过的等级号
failed: List[Dict[str, Any]] = field(default_factory=list) # 发放失败的等级(含错误信息)
def _build_reward_snapshot(level_obj: AffinityLevel) -> Dict[str, Any]:
"""把 AffinityLevel 当前奖励配置打成快照(防 admin 后续修改影响审计)"""
return {
'level': level_obj.level,
'name': level_obj.name,
'reward_type': level_obj.reward_type,
'reward_currency': level_obj.reward_currency,
'reward_items': list(level_obj.reward_items or []),
'unlock_content': level_obj.unlock_content,
}
def _dispatch_reward_to_external_systems(user_device, snapshot: Dict[str, Any]) -> None:
"""实际把奖励发放到外部系统的 hook虚拟货币、道具、解锁标记等
P2 阶段不接外部系统 仅日志记录
P3/P4 阶段接订阅 / 卡片 / 道具 app 时在此实现具体派发逻辑
"""
if snapshot['reward_currency'] > 0:
logger.info(
'[affinity.rewards] [STUB] 应给设备 %s 发放虚拟货币 %d',
user_device.id, snapshot['reward_currency'],
)
if snapshot['reward_items']:
logger.info(
'[affinity.rewards] [STUB] 应给设备 %s 发放道具 %s',
user_device.id, snapshot['reward_items'],
)
def grant_levels(user_device, from_level: int, to_level: int) -> RewardGrantResult:
"""逐级独立事务发放 [from_level+1, to_level] 范围内的所有等级奖励。
设计文档决策 3升级时逐级发放经过的每一级降级回升后不再补发永久幂等
参数
user_device: UserDevice 实例
from_level: 升级前等级
to_level: 升级后等级 > from_level 才会发奖励
返回
RewardGrantResult调用方可基于此推送 WS记日志入重试队列等
"""
result = RewardGrantResult()
if to_level <= from_level:
return result
# AffinitySetting.enable_rewards=False 则只记录跨级日志,不发奖励
if not AffinitySetting.get_solo().enable_rewards:
logger.info('[affinity.rewards] enable_rewards=False跨级 %s%s 不发奖励',
from_level, to_level)
return result
# 取所有需要发放的等级(按 level 升序)
levels_to_grant = list(
AffinityLevel.objects
.filter(level__gt=from_level, level__lte=to_level,
is_enabled=True, is_deleted=False)
.order_by('level')
)
for level_obj in levels_to_grant:
try:
with transaction.atomic():
snapshot = _build_reward_snapshot(level_obj)
# UniqueConstraint(device, level) 保证同一设备同一等级最多一行
_, created = UserLevelRewardGrant.objects.get_or_create(
device=user_device,
level=level_obj.level,
defaults={
'device_snapshot_id': user_device.id,
'reward_snapshot': snapshot,
},
)
if not created:
# 决策 11曾经达到过就标记不再重发
result.skipped_duplicate.append(level_obj.level)
continue
# 派发到外部系统P2 是 stub— 若 hook 抛异常会触发本 level 事务回滚
_dispatch_reward_to_external_systems(user_device, snapshot)
result.granted.append(snapshot)
except IntegrityError as exc:
# 并发场景下 UniqueConstraint 命中 — 等价于 already granted
result.skipped_duplicate.append(level_obj.level)
logger.info('[affinity.rewards] Lv%s 已被并发线程发放:%s', level_obj.level, exc)
except Exception as exc:
logger.exception('[affinity.rewards] Lv%s 发放失败:%s', level_obj.level, exc)
result.failed.append({
'level': level_obj.level,
'error': str(exc),
'snapshot': _build_reward_snapshot(level_obj),
})
return result

View File

@ -0,0 +1,428 @@
"""P2-01 好感度系统服务层 — 唯一写入入口
所有好感度变化设备事件手机事件衰减任务管理员调整**必须**经由
`AffinityService.apply()` 处理确保单一冷却 / 日上限 / 钳位 / 日志 / 等级
更新 / 奖励派发 / WS 推送的语义闭环
设计文档§4.3 触发计算流程
"""
from __future__ import annotations
import logging
import random
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple
from django.db import transaction
from django.db.models import F
from device_interaction.models import UserDevice
from userapp.models import (
AffinityLog,
AffinityRule,
AffinitySetting,
UserAffinityDailyCounter,
)
from . import counters as redis_counters
from . import levels as level_utils
from . import rewards as reward_utils
from . import ws
logger = logging.getLogger(__name__)
# ---------- 返回类型 ----------
class ApplyOutcome:
"""ApplyResult.outcome 的常量集合"""
APPLIED = 'applied' # 成功写入
NOOP_NO_RULE = 'noop_no_rule' # 规则不存在或已禁用 / 软删
NOOP_COOLDOWN = 'noop_cooldown' # 冷却中
NOOP_RULE_DAILY_CAP = 'noop_rule_daily_cap' # 本规则今日已达上限
NOOP_GLOBAL_DAILY_CAP = 'noop_global_daily_cap' # 全局今日已达上限
NOOP_EVENT_DUP = 'noop_event_duplicate' # 同 event_id 60s 内已处理
NOOP_VALUE_BOUNDARY = 'noop_value_boundary' # 当前已在边界0 还想扣 / 上限还想加)
ERROR = 'error' # 异常
@dataclass
class ApplyResult:
outcome: str
change_value: int = 0
before_value: int = 0
after_value: int = 0
rule_key: str = ''
old_level: int = 0
new_level: int = 0
log_id: Optional[int] = None
rewards_granted: list = field(default_factory=list)
rewards_failed: list = field(default_factory=list)
error: Optional[str] = None
@property
def is_applied(self) -> bool:
return self.outcome == ApplyOutcome.APPLIED
# ---------- 主入口 ----------
class AffinityService:
"""好感度系统服务层"""
@classmethod
def apply(
cls,
*,
user_id: int,
device_id: int,
rule_key: str,
source: str,
event_id: str = '',
metadata: Optional[Dict[str, Any]] = None,
operator_admin_id: Optional[int] = None,
reason: str = '',
) -> ApplyResult:
"""好感度变化的唯一写入入口。
参数
user_id: ParadiseUser.id用于 AffinityLog.user 关联及 WS group 路由
device_id: UserDevice.id注意是绑定 ID不是 Device.id
rule_key: AffinityRule.rule_key 'chat' / 'gift' / 'decay' / 'admin'
source: AffinityLog.SOURCE_CHOICES 之一
event_id: 客户端事件 UUID用于 60s 内幂等去重
metadata: 扩展上下文 JSON
operator_admin_id / reason: 管理员调整时填写
返回 ApplyResult调用方根据 outcome 决定后续动作
"""
# 0) event_id 幂等防重
if event_id and redis_counters.event_already_processed(event_id):
return ApplyResult(
outcome=ApplyOutcome.NOOP_EVENT_DUP,
rule_key=rule_key,
)
# 1) 取规则
rule = (
AffinityRule.objects
.filter(rule_key=rule_key, is_enabled=True, is_deleted=False)
.first()
)
if rule is None:
return ApplyResult(outcome=ApplyOutcome.NOOP_NO_RULE, rule_key=rule_key)
# 2) 冷却检查
if redis_counters.is_in_cooldown(device_id, rule_key):
return ApplyResult(outcome=ApplyOutcome.NOOP_COOLDOWN, rule_key=rule_key)
# 3) 取设备(要求绑定有效)
try:
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
except UserDevice.DoesNotExist:
logger.warning(
'[affinity.service] device %s 不属于 user %s 或已解绑,跳过',
device_id, user_id,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key=rule_key,
error='UserDevice not found or unbound',
)
# 4) 计算本次变化值([min, max] 闭区间随机P1 CHECK 保证 min <= max
raw_change = random.randint(rule.min_change, rule.max_change)
# single_cap 钳位(保护性 — P1 CHECK 已保证 single_cap > 0
if raw_change > rule.single_cap:
raw_change = rule.single_cap
elif raw_change < -rule.single_cap:
raw_change = -rule.single_cap
if raw_change == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key=rule_key,
change_value=0,
)
# 5) 本规则日上限检查(用绝对值累加)
abs_change = abs(raw_change)
rule_today = redis_counters.get_rule_daily(device_id, rule_key)
if rule_today + abs_change > rule.daily_cap:
# 允许部分通过:剩余空间 > 0 时按剩余空间钳位
remain = rule.daily_cap - rule_today
if remain <= 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_RULE_DAILY_CAP,
rule_key=rule_key,
)
# 按正负方向取剩余空间
raw_change = remain if raw_change > 0 else -remain
abs_change = remain
# 6) 全局日上限检查(仅正向汇总,衰减不占用)
setting = AffinitySetting.get_solo()
if raw_change > 0:
global_today = redis_counters.get_global_daily(device_id)
if global_today + abs_change > setting.global_daily_cap:
remain = setting.global_daily_cap - global_today
if remain <= 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_GLOBAL_DAILY_CAP,
rule_key=rule_key,
)
raw_change = remain
abs_change = remain
# 7) 原子更新 UserDevice.favorability + 钳位
try:
with transaction.atomic():
# 锁住该 UserDevice 行,避免并发写竞争
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
before_value = ud_locked.favorability
after_value = before_value + raw_change
# 钳位 [0, max_affinity](管理员调整也走此分支,因此天然遵守 CR-001 决策 6
after_value = max(0, min(setting.max_affinity, after_value))
actual_change = after_value - before_value
if actual_change == 0:
# 边界场景:当前值已经在边界,本次实际无变化
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key=rule_key,
before_value=before_value,
after_value=after_value,
)
ud_locked.favorability = after_value
# 同步刷新 last_active_at衰减判断依赖
from django.utils import timezone as djtz
ud_locked.last_active_at = djtz.now()
ud_locked.save(update_fields=['favorability', 'last_active_at'])
# 8) 写 AffinityLog含 cross-app FK 与冗余 rule_key
log = AffinityLog.objects.create(
user_id=user_id,
device=ud_locked,
rule=rule,
rule_key=rule_key,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
source=source,
event_id=event_id or None, # WR-004空 -> NULL
operator_admin_id=operator_admin_id,
reason=reason,
metadata=metadata or {},
)
# 9) 更新数据库兜底计数器Redis 是热路径DB 是审计兜底)
today_date = redis_counters.local_today_date(setting.timezone)
counter, _ = UserAffinityDailyCounter.objects.select_for_update().get_or_create(
device=ud_locked, rule=rule, date=today_date,
defaults={'accumulated_change': 0, 'trigger_count': 0},
)
counter.accumulated_change = F('accumulated_change') + actual_change
counter.trigger_count = F('trigger_count') + 1
counter.save(update_fields=['accumulated_change', 'trigger_count'])
# 10) 更新等级缓存(事务内做,避免半成品状态)
old_level, new_level, _matched = level_utils.update_device_level(
ud_locked, save=True,
)
except Exception as exc:
logger.exception(
'[affinity.service] apply 失败 user=%s device=%s rule=%s: %s',
user_id, device_id, rule_key, exc,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key=rule_key,
error=str(exc),
)
# 事务已提交下面是非原子的副作用Redis 计数 / 推送 / 奖励派发
# 11) Redis 计数器累加(仅在数据真正写入后再扣冷却 / 日上限配额)
redis_counters.set_cooldown(device_id, rule_key, rule.cooldown_seconds)
redis_counters.incr_rule_daily(device_id, rule_key, abs(actual_change))
if actual_change > 0:
redis_counters.incr_global_daily(device_id, abs(actual_change))
if event_id:
redis_counters.mark_event_processed(event_id)
# 12) 跨级奖励发放A3 方案 B — 每级独立事务)
grant_result = reward_utils.RewardGrantResult()
if new_level > old_level:
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
# 13) WS 推送
if setting.enable_notify:
ws.push_affinity_update(
user_id=user_id,
device_id=device_id,
change=actual_change,
before=before_value,
after=after_value,
rule_key=rule_key,
source=source,
)
if new_level > old_level:
ws.push_level_up(
user_id=user_id,
device_id=device_id,
old_level=old_level,
new_level=new_level,
rewards=grant_result.granted,
)
elif new_level < old_level:
ws.push_level_down(
user_id=user_id,
device_id=device_id,
old_level=old_level,
new_level=new_level,
)
return ApplyResult(
outcome=ApplyOutcome.APPLIED,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
rule_key=rule_key,
old_level=old_level,
new_level=new_level,
log_id=log.id,
rewards_granted=grant_result.granted,
rewards_failed=grant_result.failed,
)
# ---------- 管理员调整专用入口(绕过 rule直接给数值----------
@classmethod
def admin_adjust(
cls,
*,
user_id: int,
device_id: int,
delta: int,
operator_admin_id: int,
reason: str = '',
batch: bool = False,
) -> ApplyResult:
"""管理员手动调整。
- 不查 AffinityRulerule=NULLrule_key='admin'
- 不查冷却 / 日上限
- 依然钳位 [0, max_affinity]决策 6管理员不能突破上限
- source=admin_adjust_single admin_adjust_batch
- 仍写 AffinityLog更新等级缓存发奖励 WS
"""
from userapp.models import AffinitySetting
source = 'admin_adjust_batch' if batch else 'admin_adjust_single'
try:
user_device = UserDevice.active.get(id=device_id, user_id=user_id)
except UserDevice.DoesNotExist:
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key='admin',
error='UserDevice not found or unbound',
)
if delta == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key='admin',
)
setting = AffinitySetting.get_solo()
try:
with transaction.atomic():
ud_locked = UserDevice.objects.select_for_update().get(id=user_device.id)
before_value = ud_locked.favorability
after_value = max(0, min(setting.max_affinity, before_value + delta))
actual_change = after_value - before_value
if actual_change == 0:
return ApplyResult(
outcome=ApplyOutcome.NOOP_VALUE_BOUNDARY,
rule_key='admin',
before_value=before_value,
after_value=after_value,
)
ud_locked.favorability = after_value
from django.utils import timezone as djtz
ud_locked.last_active_at = djtz.now()
ud_locked.save(update_fields=['favorability', 'last_active_at'])
log = AffinityLog.objects.create(
user_id=user_id,
device=ud_locked,
rule=None,
rule_key='admin',
change_value=actual_change,
before_value=before_value,
after_value=after_value,
source=source,
operator_admin_id=operator_admin_id,
reason=reason,
metadata={'requested_delta': delta},
)
old_level, new_level, _matched = level_utils.update_device_level(
ud_locked, save=True,
)
except Exception as exc:
logger.exception(
'[affinity.service.admin_adjust] 失败 user=%s device=%s delta=%s',
user_id, device_id, delta,
)
return ApplyResult(
outcome=ApplyOutcome.ERROR,
rule_key='admin',
error=str(exc),
)
# 副作用
grant_result = reward_utils.RewardGrantResult()
if new_level > old_level:
grant_result = reward_utils.grant_levels(ud_locked, old_level, new_level)
if setting.enable_notify:
ws.push_affinity_update(
user_id=user_id,
device_id=device_id,
change=actual_change,
before=before_value,
after=after_value,
rule_key='admin',
source=source,
)
if new_level > old_level:
ws.push_level_up(
user_id=user_id, device_id=device_id,
old_level=old_level, new_level=new_level,
rewards=grant_result.granted,
)
elif new_level < old_level:
ws.push_level_down(
user_id=user_id, device_id=device_id,
old_level=old_level, new_level=new_level,
)
return ApplyResult(
outcome=ApplyOutcome.APPLIED,
change_value=actual_change,
before_value=before_value,
after_value=after_value,
rule_key='admin',
old_level=old_level,
new_level=new_level,
log_id=log.id,
rewards_granted=grant_result.granted,
rewards_failed=grant_result.failed,
)

View File

@ -0,0 +1,89 @@
"""P2-05 WebSocket 推送 helper
把好感度变化事件推到 `device_{user_id}` channel layer group
设备端和手机端的 DeviceConsumer 都加入此分组会同时收到设计文档 §9.3
推送是fire-and-forget语义 channel layer 故障或用户不在线时不应阻塞 service
主流程全部用 try/except 包裹并仅日志记录
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
logger = logging.getLogger(__name__)
def _send(user_id: int, payload: Dict[str, Any]) -> None:
"""内部统一推送入口。channel_layer 不可用 / group_send 异常时静默吞掉但记日志。"""
try:
channel_layer = get_channel_layer()
if channel_layer is None:
logger.warning('[affinity.ws] channel_layer 未配置,跳过推送 payload=%s', payload)
return
async_to_sync(channel_layer.group_send)(f'device_{user_id}', payload)
except Exception as exc: # pragma: no cover — 推送失败不影响业务
logger.warning('[affinity.ws] group_send 失败 user_id=%s err=%s', user_id, exc)
def push_affinity_update(
user_id: int,
device_id: Optional[int],
*,
change: int,
before: int,
after: int,
rule_key: str,
source: str,
) -> None:
"""好感度数值变化事件。所有触发点(设备 / 手机 / 衰减 / 管理员调整)共用此事件。"""
_send(user_id, {
'type': 'affinity.update', # consumer 端 handler 名(消费者用 .replace('.', '_')
'event': 'affinity_update',
'device_id': device_id,
'change': change,
'before': before,
'after': after,
'rule_key': rule_key,
'source': source,
})
def push_level_up(
user_id: int,
device_id: Optional[int],
*,
old_level: int,
new_level: int,
rewards: List[Dict[str, Any]],
) -> None:
"""升级事件。rewards 是本次跨级一次性发放的奖励列表,每级一项。"""
_send(user_id, {
'type': 'affinity.level.up',
'event': 'level_up',
'device_id': device_id,
'old_level': old_level,
'new_level': new_level,
'rewards': rewards,
})
def push_level_down(
user_id: int,
device_id: Optional[int],
*,
old_level: int,
new_level: int,
) -> None:
"""降级事件。衰减导致的等级回退,不追回奖励但取消等级解锁内容。"""
_send(user_id, {
'type': 'affinity.level.down',
'event': 'level_down',
'device_id': device_id,
'old_level': old_level,
'new_level': new_level,
})