diff --git a/qy_lty/common/logging/__init__.py b/qy_lty/common/logging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qy_lty/common/logging/filters.py b/qy_lty/common/logging/filters.py new file mode 100644 index 0000000..607935a --- /dev/null +++ b/qy_lty/common/logging/filters.py @@ -0,0 +1,111 @@ +"""通用日志脱敏 Filter 集合。 + +本模块挂载到 settings.LOGGING.filters,由 LOGGING.handlers.{aliyun|console} 引用, +覆盖所有 logger → handler 路径,对 record 内 access_token 字段值脱敏(保留末 4 位)。 + +设计动机(per CONTEXT.md / RESEARCH.md): +- 当前仓库代码没有 logger 输出 CredentialSlot.access_token 明文的路径(已实证) +- 但 Phase 1 + Phase 2 + Phase 3 的 view 已让 access_token 进入「内存中可被随手 dump」的状态 +- 任何后续开发者写 logger.info(f"PUT body: {request.data}") 类型代码就会泄露 +- 本 filter 是「防御性兜底」:在 handler 层面统一兜住,不依赖每个 view 自律 +""" +import logging +import re + +from common.utils import mask_token + + +class AccessTokenMaskFilter(logging.Filter): + """识别日志记录中的 access_token 明文值并脱敏(保留末 4 位)。 + + 覆盖 4 种序列化形态: + 1. JSON 字符串(双引号): '"access_token": "VALUE"' + 2. Python dict repr(单引号):"'access_token': 'VALUE'" + 3. URL query: 'access_token=VALUE&...' + 4. 兜底(等号或冒号 + 空格): 'access_token: VALUE' / 'access_token = VALUE' + + 挂载点: + settings.LOGGING.filters['access_token_mask'] + → settings.LOGGING.handlers.{aliyun|console}.filters + + 设计要点: + - 仅识别 access_token 字段名为前缀锚点;不脱敏裸 token / Bearer / Authorization header + (那是另一类敏感数据,留 v2.x 候选优先级 #1 / #3 处理;详见 RESEARCH Pitfall 3) + - 同时改 record.msg 与 record.args,避免 Formatter 阶段再用 % 拼接出明文 + (详见 RESEARCH Pitfall 2) + - filter() 永远 return True,不丢弃 record(详见 RESEARCH Pitfall 1) + """ + + # 4 种序列化形态对应的正则模式 + _PATTERNS = ( + # 1) JSON 字符串:双引号;group 顺序 (前缀, 值, 后缀) + re.compile(r'("access_token"\s*:\s*")([^"]+)(")'), + # 2) Python dict repr:单引号;group 顺序 (前缀, 值, 后缀) + re.compile(r"('access_token'\s*:\s*')([^']+)(')"), + # 3) URL query:以 & / 空格 / 引号结尾;group 顺序 (前缀, 值) + re.compile(r'(access_token=)([^&\s"\']+)'), + # 4) 兜底:等号 / 冒号 + 可选空格;group 顺序 (前缀, 值) + # 用 [^\s,;)\]\}"\'&=]+ 作为终止符以避免吃到下一个字段; + # `&` / `=` 是 URL query 分隔符,必须排除,否则 Pattern 3 已脱敏的尾部 + # (形如 `access_token=********1234&u=1`)会被 Pattern 4 把 `********1234&u=1` + # 整段当 value,再次 mask 把末 4 位 `1234` 吃掉变 `&u=1` + re.compile(r'(access_token\s*[:=]\s*)([^\s,;)\]\}"\'&=]+)'), + ) + + # 快速短路:record.msg / args 中没有 access_token 字面量时直接返回,避免 4 次正则扫描 + _NEEDLE = 'access_token' + + def _sub(self, match: 're.Match') -> str: + """根据 group 数(2 或 3)调用 mask_token 重组匹配段。""" + groups = match.groups() + if len(groups) == 3: + # 模式 1 / 2:('"access_token":"', VALUE, '"') + return groups[0] + mask_token(groups[1]) + groups[2] + if len(groups) == 2: + # 模式 3 / 4:('access_token=', VALUE) + return groups[0] + mask_token(groups[1]) + return match.group(0) # 防御:未来若加新模式 group 数变了,原样返回避免崩溃 + + def _mask_in_text(self, text): + """对单个字符串依次应用 4 个正则;非字符串原样返回。""" + if not isinstance(text, str): + return text + if self._NEEDLE not in text.lower(): + return text + for pattern in self._PATTERNS: + text = pattern.sub(self._sub, text) + return text + + def filter(self, record: logging.LogRecord) -> bool: + # 策略:如果 record 同时含 msg + args(即 logger.info("...%s...", value) 形态), + # 先用 record.getMessage() 拼成最终字符串,再统一脱敏;脱敏后清空 args, + # 避免 Formatter 阶段重新用 % 拼接(一旦脱敏吃掉了 %s 占位符就会触发 TypeError)。 + # 否则(msg 是纯字符串、无 args / dict args / 非字符串 msg):分别处理 msg 和 args。 + if record.args and isinstance(record.msg, str) and isinstance(record.args, tuple): + # 1) tuple 形态 args:先用 getMessage 拼出最终文本,再整体脱敏,args 清空 + try: + expanded = record.getMessage() + except Exception: + # 拼接失败(如 args 与占位符不匹配)则退化到不脱敏,避免影响后续 handler 链路 + return True + record.msg = self._mask_in_text(expanded) + record.args = None + else: + # 2) record.msg 字符串脱敏(无 args 的纯字符串、或 dict args) + if isinstance(record.msg, str): + record.msg = self._mask_in_text(record.msg) + + # 3) dict 形态 args:按 key 直接脱敏 access_token;其它 string value 走通用扫描 + if record.args and isinstance(record.args, dict): + new_args = {} + for k, v in record.args.items(): + if k == 'access_token' and isinstance(v, str): + new_args[k] = mask_token(v) + elif isinstance(v, str): + new_args[k] = self._mask_in_text(v) + else: + new_args[k] = v + record.args = new_args + + # 永远不丢弃 record;filter 仅做改写 + return True