pmc f26e78c545 feat(affinity-P2): service 层落地 — 唯一写入入口 + Redis 计数器 + 等级映射 + 跨级奖励 + WS 推送 (P2-01~P2-05)
新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:

- counters.py (P2-02):Redis 三类计数器
  - affinity💿{device_id}:{rule_key} 冷却
  - affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
  - affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
  - 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
  - cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
  - event_id 60s 去重防客户端重复上报

- levels.py (P2-03):等级映射
  - map_value_to_level / update_device_level / progress_to_next_level
  - update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])

- ws.py (P2-05):WebSocket 推送 helper
  - 3 类事件 affinity_update / level_up / level_down
  - asgiref.async_to_sync 包装 channel_layer.group_send
  - 推送故障 fire-and-forget 仅日志记录,不阻塞主流程

- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
  - grant_levels(user_device, from_level, to_level) 逐级独立事务
  - UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
  - _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现

- services.py (P2-01):AffinityService 主入口
  - apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
  - 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
  - admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
  - 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)

- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查

Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。

设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 09:35:53 +08:00

148 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""P2-02 Redis 计数器工具
封装三类计数器与冷却判断,统一用 django-redis (`django.core.cache`) 接入。
Key 命名约定(与设计文档 §4.3 触发流程一致):
affinity:cd:{device_id}:{rule_key} — 冷却(值任意,存在即冷却中)
affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} — 单规则单设备日累计(绝对值)
affinity:daily:{device_id}:_global:{YYYYMMDD} — 全局正向日累计(仅正向规则)
自然日基准:`AffinitySetting.timezone`(默认 Asia/Shanghai全用户统一。
TTL 策略:日计数器 TTL=48h远超 1 自然日边界,老 key 自然过期);冷却 TTL=规则 cooldown_seconds。
"""
from __future__ import annotations
from datetime import datetime
from typing import Optional
from django.core.cache import cache
from django.utils import timezone
try:
from zoneinfo import ZoneInfo # Python 3.9+
except ImportError: # pragma: no cover
from backports.zoneinfo import ZoneInfo # type: ignore
# 日计数器 TTL — 48 小时足以覆盖最长时区偏移和单次"跨天延迟"场景
DAILY_COUNTER_TTL = 48 * 3600
# ---------- 时区与日期 ----------
def get_setting_timezone() -> str:
"""读取 AffinitySetting.timezone容错表不存在或单例缺失时回退 Asia/Shanghai"""
try:
from userapp.models import AffinitySetting
return AffinitySetting.get_solo().timezone or 'Asia/Shanghai'
except Exception: # AppConfig 启动期 / 测试中表未建好等场景
return 'Asia/Shanghai'
def local_today_str(tz_name: Optional[str] = None) -> str:
"""返回基于 AffinitySetting.timezone 的"今日"日期字符串 YYYYMMDD"""
tz_name = tz_name or get_setting_timezone()
now_utc = timezone.now()
local_now = now_utc.astimezone(ZoneInfo(tz_name))
return local_now.strftime('%Y%m%d')
def local_today_date(tz_name: Optional[str] = None):
"""返回 AffinitySetting.timezone 的"今日" date 对象(供 UserAffinityDailyCounter.date 用)"""
tz_name = tz_name or get_setting_timezone()
now_utc = timezone.now()
return now_utc.astimezone(ZoneInfo(tz_name)).date()
# ---------- 冷却 ----------
def cooldown_key(device_id: int, rule_key: str) -> str:
return f'affinity:cd:{device_id}:{rule_key}'
def is_in_cooldown(device_id: int, rule_key: str) -> bool:
"""返回 True 表示尚未过冷却,本次触发应拒绝"""
return cache.get(cooldown_key(device_id, rule_key)) is not None
def set_cooldown(device_id: int, rule_key: str, seconds: int) -> None:
"""设置冷却。seconds<=0 时不做任何事(视为无冷却规则)"""
if seconds <= 0:
return
cache.set(cooldown_key(device_id, rule_key), 1, timeout=seconds)
# ---------- 单规则日计数器 ----------
def rule_daily_key(device_id: int, rule_key: str, date_str: Optional[str] = None) -> str:
return f'affinity:daily:{device_id}:{rule_key}:{date_str or local_today_str()}'
def get_rule_daily(device_id: int, rule_key: str) -> int:
"""读取本规则本设备今日累计(绝对值累加,不区分正负)"""
val = cache.get(rule_daily_key(device_id, rule_key))
return int(val) if val is not None else 0
def incr_rule_daily(device_id: int, rule_key: str, delta_abs: int) -> int:
"""原子累加本规则日计数器返回累加后的值。delta_abs 必须 > 0。
首次写入时通过 cache.add 设置 TTL之后用 cache.incr 原子累加TTL 不变。
"""
if delta_abs <= 0:
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
key = rule_daily_key(device_id, rule_key)
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
return delta_abs
try:
return cache.incr(key, delta_abs)
except ValueError:
# cache.add 写入后 race-condition 失败极端情况兜底
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
return delta_abs
# ---------- 全局日计数器(仅正向汇总)----------
def global_daily_key(device_id: int, date_str: Optional[str] = None) -> str:
return f'affinity:daily:{device_id}:_global:{date_str or local_today_str()}'
def get_global_daily(device_id: int) -> int:
"""读取本设备今日全局正向好感度累计(跨规则汇总)"""
val = cache.get(global_daily_key(device_id))
return int(val) if val is not None else 0
def incr_global_daily(device_id: int, delta_abs: int) -> int:
"""原子累加全局日计数器(仅在正向变化时调用)。返回累加后的值。"""
if delta_abs <= 0:
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
key = global_daily_key(device_id)
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
return delta_abs
try:
return cache.incr(key, delta_abs)
except ValueError:
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
return delta_abs
# ---------- 事件去重event_id 60s 缓存)----------
def event_seen_key(event_id: str) -> str:
return f'affinity:event:{event_id}'
def event_already_processed(event_id: str) -> bool:
"""若 event_id 已在 60s 内处理过,返回 True"""
if not event_id:
return False
return cache.get(event_seen_key(event_id)) is not None
def mark_event_processed(event_id: str, ttl_seconds: int = 60) -> None:
if not event_id:
return
cache.set(event_seen_key(event_id), 1, timeout=ttl_seconds)