新增 6 个模块,把好感度变化的全部副作用收敛到一个调用入口:
- counters.py (P2-02):Redis 三类计数器
- affinity💿{device_id}:{rule_key} 冷却
- affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} 单规则日上限
- affinity:daily:{device_id}:_global:{YYYYMMDD} 全局正向日上限
- 自然日按 AffinitySetting.timezone (Asia/Shanghai 默认) 通过 zoneinfo 计算
- cache.add + cache.incr 实现 set-if-not-exists + atomic-incr 语义,TTL 48h
- event_id 60s 去重防客户端重复上报
- levels.py (P2-03):等级映射
- map_value_to_level / update_device_level / progress_to_next_level
- update_device_level 仅 level 变化时 save(update_fields=['affinity_level'])
- ws.py (P2-05):WebSocket 推送 helper
- 3 类事件 affinity_update / level_up / level_down
- asgiref.async_to_sync 包装 channel_layer.group_send
- 推送故障 fire-and-forget 仅日志记录,不阻塞主流程
- rewards.py (P2-04):跨级奖励发放(A3 方案 B)
- grant_levels(user_device, from_level, to_level) 逐级独立事务
- UserLevelRewardGrant 唯一约束保证幂等(决策 11:衰减回升不补发)
- _dispatch_reward_to_external_systems 是 STUB,P3/P4 接虚拟货币/道具 app 时实现
- services.py (P2-01):AffinityService 主入口
- apply(user_id, device_id, rule_key, source, event_id, metadata, operator_admin_id, reason)
- 10 步流水线 [event_id 去重 → 取规则 → 冷却 → 取 UserDevice.active → 计算 + single_cap 钳位 → 规则日上限 → 全局日上限 → 原子写库 → Redis 累加 → 奖励 → WS 推送]
- admin_adjust 绕过 rule 与冷却,但走 [0, max_affinity] 钳位 + log + 等级缓存 + 奖励 + WS
- 返回 ApplyResult dataclass 含 ApplyOutcome 枚举(applied / noop_no_rule / noop_cooldown / noop_*_daily_cap / noop_event_duplicate / noop_value_boundary / error)
- permissions.py:IsAdminUserStaff 复用 IsAuthenticated + is_staff 检查
Smoke test 6 项全 PASS:no_rule / chat applied / event_id 去重 / 冷却拦截 / admin_adjust / max_affinity 钳位。
AffinityLog 写库 / UserLevelRewardGrant 幂等 / level 缓存更新 均经事务原子保证。
设计依据:docs/好感度系统功能与规则设计.md §4.3 触发流程 + §6 等级规则 + §9 数据契约。
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
148 lines
5.4 KiB
Python
148 lines
5.4 KiB
Python
"""P2-02 Redis 计数器工具
|
||
|
||
封装三类计数器与冷却判断,统一用 django-redis (`django.core.cache`) 接入。
|
||
|
||
Key 命名约定(与设计文档 §4.3 触发流程一致):
|
||
affinity:cd:{device_id}:{rule_key} — 冷却(值任意,存在即冷却中)
|
||
affinity:daily:{device_id}:{rule_key}:{YYYYMMDD} — 单规则单设备日累计(绝对值)
|
||
affinity:daily:{device_id}:_global:{YYYYMMDD} — 全局正向日累计(仅正向规则)
|
||
|
||
自然日基准:`AffinitySetting.timezone`(默认 Asia/Shanghai),全用户统一。
|
||
TTL 策略:日计数器 TTL=48h(远超 1 自然日边界,老 key 自然过期);冷却 TTL=规则 cooldown_seconds。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
|
||
from django.core.cache import cache
|
||
from django.utils import timezone
|
||
|
||
try:
|
||
from zoneinfo import ZoneInfo # Python 3.9+
|
||
except ImportError: # pragma: no cover
|
||
from backports.zoneinfo import ZoneInfo # type: ignore
|
||
|
||
# 日计数器 TTL — 48 小时足以覆盖最长时区偏移和单次"跨天延迟"场景
|
||
DAILY_COUNTER_TTL = 48 * 3600
|
||
|
||
|
||
# ---------- 时区与日期 ----------
|
||
|
||
def get_setting_timezone() -> str:
|
||
"""读取 AffinitySetting.timezone(容错:表不存在或单例缺失时回退 Asia/Shanghai)"""
|
||
try:
|
||
from userapp.models import AffinitySetting
|
||
return AffinitySetting.get_solo().timezone or 'Asia/Shanghai'
|
||
except Exception: # AppConfig 启动期 / 测试中表未建好等场景
|
||
return 'Asia/Shanghai'
|
||
|
||
|
||
def local_today_str(tz_name: Optional[str] = None) -> str:
|
||
"""返回基于 AffinitySetting.timezone 的"今日"日期字符串 YYYYMMDD"""
|
||
tz_name = tz_name or get_setting_timezone()
|
||
now_utc = timezone.now()
|
||
local_now = now_utc.astimezone(ZoneInfo(tz_name))
|
||
return local_now.strftime('%Y%m%d')
|
||
|
||
|
||
def local_today_date(tz_name: Optional[str] = None):
|
||
"""返回 AffinitySetting.timezone 的"今日" date 对象(供 UserAffinityDailyCounter.date 用)"""
|
||
tz_name = tz_name or get_setting_timezone()
|
||
now_utc = timezone.now()
|
||
return now_utc.astimezone(ZoneInfo(tz_name)).date()
|
||
|
||
|
||
# ---------- 冷却 ----------
|
||
|
||
def cooldown_key(device_id: int, rule_key: str) -> str:
|
||
return f'affinity:cd:{device_id}:{rule_key}'
|
||
|
||
|
||
def is_in_cooldown(device_id: int, rule_key: str) -> bool:
|
||
"""返回 True 表示尚未过冷却,本次触发应拒绝"""
|
||
return cache.get(cooldown_key(device_id, rule_key)) is not None
|
||
|
||
|
||
def set_cooldown(device_id: int, rule_key: str, seconds: int) -> None:
|
||
"""设置冷却。seconds<=0 时不做任何事(视为无冷却规则)"""
|
||
if seconds <= 0:
|
||
return
|
||
cache.set(cooldown_key(device_id, rule_key), 1, timeout=seconds)
|
||
|
||
|
||
# ---------- 单规则日计数器 ----------
|
||
|
||
def rule_daily_key(device_id: int, rule_key: str, date_str: Optional[str] = None) -> str:
|
||
return f'affinity:daily:{device_id}:{rule_key}:{date_str or local_today_str()}'
|
||
|
||
|
||
def get_rule_daily(device_id: int, rule_key: str) -> int:
|
||
"""读取本规则本设备今日累计(绝对值累加,不区分正负)"""
|
||
val = cache.get(rule_daily_key(device_id, rule_key))
|
||
return int(val) if val is not None else 0
|
||
|
||
|
||
def incr_rule_daily(device_id: int, rule_key: str, delta_abs: int) -> int:
|
||
"""原子累加本规则日计数器,返回累加后的值。delta_abs 必须 > 0。
|
||
|
||
首次写入时通过 cache.add 设置 TTL;之后用 cache.incr 原子累加,TTL 不变。
|
||
"""
|
||
if delta_abs <= 0:
|
||
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
|
||
key = rule_daily_key(device_id, rule_key)
|
||
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
|
||
return delta_abs
|
||
try:
|
||
return cache.incr(key, delta_abs)
|
||
except ValueError:
|
||
# cache.add 写入后 race-condition 失败极端情况兜底
|
||
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
|
||
return delta_abs
|
||
|
||
|
||
# ---------- 全局日计数器(仅正向汇总)----------
|
||
|
||
def global_daily_key(device_id: int, date_str: Optional[str] = None) -> str:
|
||
return f'affinity:daily:{device_id}:_global:{date_str or local_today_str()}'
|
||
|
||
|
||
def get_global_daily(device_id: int) -> int:
|
||
"""读取本设备今日全局正向好感度累计(跨规则汇总)"""
|
||
val = cache.get(global_daily_key(device_id))
|
||
return int(val) if val is not None else 0
|
||
|
||
|
||
def incr_global_daily(device_id: int, delta_abs: int) -> int:
|
||
"""原子累加全局日计数器(仅在正向变化时调用)。返回累加后的值。"""
|
||
if delta_abs <= 0:
|
||
raise ValueError(f'delta_abs must be > 0, got {delta_abs}')
|
||
key = global_daily_key(device_id)
|
||
if cache.add(key, delta_abs, timeout=DAILY_COUNTER_TTL):
|
||
return delta_abs
|
||
try:
|
||
return cache.incr(key, delta_abs)
|
||
except ValueError:
|
||
cache.set(key, delta_abs, timeout=DAILY_COUNTER_TTL)
|
||
return delta_abs
|
||
|
||
|
||
# ---------- 事件去重(event_id 60s 缓存)----------
|
||
|
||
def event_seen_key(event_id: str) -> str:
|
||
return f'affinity:event:{event_id}'
|
||
|
||
|
||
def event_already_processed(event_id: str) -> bool:
|
||
"""若 event_id 已在 60s 内处理过,返回 True"""
|
||
if not event_id:
|
||
return False
|
||
return cache.get(event_seen_key(event_id)) is not None
|
||
|
||
|
||
def mark_event_processed(event_id: str, ttl_seconds: int = 60) -> None:
|
||
if not event_id:
|
||
return
|
||
cache.set(event_seen_key(event_id), 1, timeout=ttl_seconds)
|