feat(03-02): 新建 common/logging/ 包 + AccessTokenMaskFilter

- common/logging/__init__.py 空文件 (package marker)
- common/logging/filters.py 含 AccessTokenMaskFilter(logging.Filter)
- 4 个正则模式:JSON / Python dict repr / URL query / 等号或冒号兜底
- 调 common.utils.mask_token 替换捕获组,保留末 4 位明文
- 兼容 logger.info('...%s...', value) tuple args 形态
- 不误伤 Authorization header / Bearer 字段(field-name 锚点)
- filter() 永远 return True 不丢弃 record
This commit is contained in:
pmc 2026-05-08 10:25:01 +08:00
parent a58980fd73
commit 891a5ead7c
2 changed files with 111 additions and 0 deletions

View File

View File

@ -0,0 +1,111 @@
"""通用日志脱敏 Filter 集合。
本模块挂载到 settings.LOGGING.filters LOGGING.handlers.{aliyun|console} 引用
覆盖所有 logger handler 路径 record access_token 字段值脱敏保留末 4
设计动机per CONTEXT.md / RESEARCH.md
- 当前仓库代码没有 logger 输出 CredentialSlot.access_token 明文的路径已实证
- Phase 1 + Phase 2 + Phase 3 view 已让 access_token 进入内存中可被随手 dump的状态
- 任何后续开发者写 logger.info(f"PUT body: {request.data}") 类型代码就会泄露
- filter 防御性兜底 handler 层面统一兜住不依赖每个 view 自律
"""
import logging
import re
from common.utils import mask_token
class AccessTokenMaskFilter(logging.Filter):
"""识别日志记录中的 access_token 明文值并脱敏(保留末 4 位)。
覆盖 4 种序列化形态
1. JSON 字符串双引号 '"access_token": "VALUE"'
2. Python dict repr单引号"'access_token': 'VALUE'"
3. URL query 'access_token=VALUE&...'
4. 兜底等号或冒号 + 空格 'access_token: VALUE' / 'access_token = VALUE'
挂载点
settings.LOGGING.filters['access_token_mask']
settings.LOGGING.handlers.{aliyun|console}.filters
设计要点
- 仅识别 access_token 字段名为前缀锚点不脱敏裸 token / Bearer / Authorization header
那是另一类敏感数据 v2.x 候选优先级 #1 / #3 处理;详见 RESEARCH Pitfall 3
- 同时改 record.msg record.args避免 Formatter 阶段再用 % 拼接出明文
详见 RESEARCH Pitfall 2
- filter() 永远 return True不丢弃 record详见 RESEARCH Pitfall 1
"""
# 4 种序列化形态对应的正则模式
_PATTERNS = (
# 1) JSON 字符串双引号group 顺序 (前缀, 值, 后缀)
re.compile(r'("access_token"\s*:\s*")([^"]+)(")'),
# 2) Python dict repr单引号group 顺序 (前缀, 值, 后缀)
re.compile(r"('access_token'\s*:\s*')([^']+)(')"),
# 3) URL query以 & / 空格 / 引号结尾group 顺序 (前缀, 值)
re.compile(r'(access_token=)([^&\s"\']+)'),
# 4) 兜底:等号 / 冒号 + 可选空格group 顺序 (前缀, 值)
# 用 [^\s,;)\]\}"\'&=]+ 作为终止符以避免吃到下一个字段;
# `&` / `=` 是 URL query 分隔符,必须排除,否则 Pattern 3 已脱敏的尾部
# (形如 `access_token=********1234&u=1`)会被 Pattern 4 把 `********1234&u=1`
# 整段当 value再次 mask 把末 4 位 `1234` 吃掉变 `&u=1`
re.compile(r'(access_token\s*[:=]\s*)([^\s,;)\]\}"\'&=]+)'),
)
# 快速短路record.msg / args 中没有 access_token 字面量时直接返回,避免 4 次正则扫描
_NEEDLE = 'access_token'
def _sub(self, match: 're.Match') -> str:
"""根据 group 数2 或 3调用 mask_token 重组匹配段。"""
groups = match.groups()
if len(groups) == 3:
# 模式 1 / 2('"access_token":"', VALUE, '"')
return groups[0] + mask_token(groups[1]) + groups[2]
if len(groups) == 2:
# 模式 3 / 4('access_token=', VALUE)
return groups[0] + mask_token(groups[1])
return match.group(0) # 防御:未来若加新模式 group 数变了,原样返回避免崩溃
def _mask_in_text(self, text):
"""对单个字符串依次应用 4 个正则;非字符串原样返回。"""
if not isinstance(text, str):
return text
if self._NEEDLE not in text.lower():
return text
for pattern in self._PATTERNS:
text = pattern.sub(self._sub, text)
return text
def filter(self, record: logging.LogRecord) -> bool:
# 策略:如果 record 同时含 msg + args即 logger.info("...%s...", value) 形态),
# 先用 record.getMessage() 拼成最终字符串,再统一脱敏;脱敏后清空 args
# 避免 Formatter 阶段重新用 % 拼接(一旦脱敏吃掉了 %s 占位符就会触发 TypeError
# 否则msg 是纯字符串、无 args / dict args / 非字符串 msg分别处理 msg 和 args。
if record.args and isinstance(record.msg, str) and isinstance(record.args, tuple):
# 1) tuple 形态 args先用 getMessage 拼出最终文本再整体脱敏args 清空
try:
expanded = record.getMessage()
except Exception:
# 拼接失败(如 args 与占位符不匹配)则退化到不脱敏,避免影响后续 handler 链路
return True
record.msg = self._mask_in_text(expanded)
record.args = None
else:
# 2) record.msg 字符串脱敏(无 args 的纯字符串、或 dict args
if isinstance(record.msg, str):
record.msg = self._mask_in_text(record.msg)
# 3) dict 形态 args按 key 直接脱敏 access_token其它 string value 走通用扫描
if record.args and isinstance(record.args, dict):
new_args = {}
for k, v in record.args.items():
if k == 'access_token' and isinstance(v, str):
new_args[k] = mask_token(v)
elif isinstance(v, str):
new_args[k] = self._mask_in_text(v)
else:
new_args[k] = v
record.args = new_args
# 永远不丢弃 recordfilter 仅做改写
return True