feat(03-02): 新建 common/logging/ 包 + AccessTokenMaskFilter
- common/logging/__init__.py 空文件 (package marker)
- common/logging/filters.py 含 AccessTokenMaskFilter(logging.Filter)
- 4 个正则模式:JSON / Python dict repr / URL query / 等号或冒号兜底
- 调 common.utils.mask_token 替换捕获组,保留末 4 位明文
- 兼容 logger.info('...%s...', value) tuple args 形态
- 不误伤 Authorization header / Bearer 字段(field-name 锚点)
- filter() 永远 return True 不丢弃 record
This commit is contained in:
parent
a58980fd73
commit
891a5ead7c
0
qy_lty/common/logging/__init__.py
Normal file
0
qy_lty/common/logging/__init__.py
Normal file
111
qy_lty/common/logging/filters.py
Normal file
111
qy_lty/common/logging/filters.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""通用日志脱敏 Filter 集合。
|
||||
|
||||
本模块挂载到 settings.LOGGING.filters,由 LOGGING.handlers.{aliyun|console} 引用,
|
||||
覆盖所有 logger → handler 路径,对 record 内 access_token 字段值脱敏(保留末 4 位)。
|
||||
|
||||
设计动机(per CONTEXT.md / RESEARCH.md):
|
||||
- 当前仓库代码没有 logger 输出 CredentialSlot.access_token 明文的路径(已实证)
|
||||
- 但 Phase 1 + Phase 2 + Phase 3 的 view 已让 access_token 进入「内存中可被随手 dump」的状态
|
||||
- 任何后续开发者写 logger.info(f"PUT body: {request.data}") 类型代码就会泄露
|
||||
- 本 filter 是「防御性兜底」:在 handler 层面统一兜住,不依赖每个 view 自律
|
||||
"""
|
||||
import logging
|
||||
import re
|
||||
|
||||
from common.utils import mask_token
|
||||
|
||||
|
||||
class AccessTokenMaskFilter(logging.Filter):
|
||||
"""识别日志记录中的 access_token 明文值并脱敏(保留末 4 位)。
|
||||
|
||||
覆盖 4 种序列化形态:
|
||||
1. JSON 字符串(双引号): '"access_token": "VALUE"'
|
||||
2. Python dict repr(单引号):"'access_token': 'VALUE'"
|
||||
3. URL query: 'access_token=VALUE&...'
|
||||
4. 兜底(等号或冒号 + 空格): 'access_token: VALUE' / 'access_token = VALUE'
|
||||
|
||||
挂载点:
|
||||
settings.LOGGING.filters['access_token_mask']
|
||||
→ settings.LOGGING.handlers.{aliyun|console}.filters
|
||||
|
||||
设计要点:
|
||||
- 仅识别 access_token 字段名为前缀锚点;不脱敏裸 token / Bearer / Authorization header
|
||||
(那是另一类敏感数据,留 v2.x 候选优先级 #1 / #3 处理;详见 RESEARCH Pitfall 3)
|
||||
- 同时改 record.msg 与 record.args,避免 Formatter 阶段再用 % 拼接出明文
|
||||
(详见 RESEARCH Pitfall 2)
|
||||
- filter() 永远 return True,不丢弃 record(详见 RESEARCH Pitfall 1)
|
||||
"""
|
||||
|
||||
# 4 种序列化形态对应的正则模式
|
||||
_PATTERNS = (
|
||||
# 1) JSON 字符串:双引号;group 顺序 (前缀, 值, 后缀)
|
||||
re.compile(r'("access_token"\s*:\s*")([^"]+)(")'),
|
||||
# 2) Python dict repr:单引号;group 顺序 (前缀, 值, 后缀)
|
||||
re.compile(r"('access_token'\s*:\s*')([^']+)(')"),
|
||||
# 3) URL query:以 & / 空格 / 引号结尾;group 顺序 (前缀, 值)
|
||||
re.compile(r'(access_token=)([^&\s"\']+)'),
|
||||
# 4) 兜底:等号 / 冒号 + 可选空格;group 顺序 (前缀, 值)
|
||||
# 用 [^\s,;)\]\}"\'&=]+ 作为终止符以避免吃到下一个字段;
|
||||
# `&` / `=` 是 URL query 分隔符,必须排除,否则 Pattern 3 已脱敏的尾部
|
||||
# (形如 `access_token=********1234&u=1`)会被 Pattern 4 把 `********1234&u=1`
|
||||
# 整段当 value,再次 mask 把末 4 位 `1234` 吃掉变 `&u=1`
|
||||
re.compile(r'(access_token\s*[:=]\s*)([^\s,;)\]\}"\'&=]+)'),
|
||||
)
|
||||
|
||||
# 快速短路:record.msg / args 中没有 access_token 字面量时直接返回,避免 4 次正则扫描
|
||||
_NEEDLE = 'access_token'
|
||||
|
||||
def _sub(self, match: 're.Match') -> str:
|
||||
"""根据 group 数(2 或 3)调用 mask_token 重组匹配段。"""
|
||||
groups = match.groups()
|
||||
if len(groups) == 3:
|
||||
# 模式 1 / 2:('"access_token":"', VALUE, '"')
|
||||
return groups[0] + mask_token(groups[1]) + groups[2]
|
||||
if len(groups) == 2:
|
||||
# 模式 3 / 4:('access_token=', VALUE)
|
||||
return groups[0] + mask_token(groups[1])
|
||||
return match.group(0) # 防御:未来若加新模式 group 数变了,原样返回避免崩溃
|
||||
|
||||
def _mask_in_text(self, text):
|
||||
"""对单个字符串依次应用 4 个正则;非字符串原样返回。"""
|
||||
if not isinstance(text, str):
|
||||
return text
|
||||
if self._NEEDLE not in text.lower():
|
||||
return text
|
||||
for pattern in self._PATTERNS:
|
||||
text = pattern.sub(self._sub, text)
|
||||
return text
|
||||
|
||||
def filter(self, record: logging.LogRecord) -> bool:
|
||||
# 策略:如果 record 同时含 msg + args(即 logger.info("...%s...", value) 形态),
|
||||
# 先用 record.getMessage() 拼成最终字符串,再统一脱敏;脱敏后清空 args,
|
||||
# 避免 Formatter 阶段重新用 % 拼接(一旦脱敏吃掉了 %s 占位符就会触发 TypeError)。
|
||||
# 否则(msg 是纯字符串、无 args / dict args / 非字符串 msg):分别处理 msg 和 args。
|
||||
if record.args and isinstance(record.msg, str) and isinstance(record.args, tuple):
|
||||
# 1) tuple 形态 args:先用 getMessage 拼出最终文本,再整体脱敏,args 清空
|
||||
try:
|
||||
expanded = record.getMessage()
|
||||
except Exception:
|
||||
# 拼接失败(如 args 与占位符不匹配)则退化到不脱敏,避免影响后续 handler 链路
|
||||
return True
|
||||
record.msg = self._mask_in_text(expanded)
|
||||
record.args = None
|
||||
else:
|
||||
# 2) record.msg 字符串脱敏(无 args 的纯字符串、或 dict args)
|
||||
if isinstance(record.msg, str):
|
||||
record.msg = self._mask_in_text(record.msg)
|
||||
|
||||
# 3) dict 形态 args:按 key 直接脱敏 access_token;其它 string value 走通用扫描
|
||||
if record.args and isinstance(record.args, dict):
|
||||
new_args = {}
|
||||
for k, v in record.args.items():
|
||||
if k == 'access_token' and isinstance(v, str):
|
||||
new_args[k] = mask_token(v)
|
||||
elif isinstance(v, str):
|
||||
new_args[k] = self._mask_in_text(v)
|
||||
else:
|
||||
new_args[k] = v
|
||||
record.args = new_args
|
||||
|
||||
# 永远不丢弃 record;filter 仅做改写
|
||||
return True
|
||||
Loading…
x
Reference in New Issue
Block a user