log-center/insert_new_bugs.py
zyc b178d24e73
Some checks failed
Build and Deploy Log Center / build-and-deploy (push) Failing after 5m9s
fix pr
2026-02-25 16:35:28 +08:00

132 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
插入 3 个新 bug 到 errorlog 表,对应 rtc_backend 中引入的 3 个代码 bug
"""
import asyncio
import json
import hashlib
from datetime import datetime, timezone
import asyncpg
DB_URL = "postgresql://log_center:JogNQdtrd3WY8CBCAiYfYEGx@pgm-7xv4811oj11j86htzo.pg.rds.aliyuncs.com:5432/log_center"
def make_fingerprint(project_id, error_type, file_path, line_number):
raw = f"{project_id}:{error_type}:{file_path}:{line_number}"
return hashlib.md5(raw.encode()).hexdigest()
BUGS = [
{
"project_id": "rtc_backend",
"environment": "production",
"level": "CRITICAL",
"source": "code_review",
"error_type": "SecurityVulnerability",
"error_message": "PaymentService.calc_refund_amount 退款比例校验缺失上限refund_ratio > 1 时可超额退款(如 200% 退款),造成严重资金损失",
"file_path": "app/services/payment_service.py",
"line_number": 139,
"stack_trace": json.dumps({
"file": "app/services/payment_service.py",
"line": 139,
"function": "calc_refund_amount",
"code": "if refund_ratio < 0:",
"detail": "原校验为 if not (0 < refund_ratio <= 1),现改为仅检查 < 0允许 refund_ratio > 1 通过"
}),
"context": json.dumps({
"expected": "if not (0 < refund_ratio <= 1): raise ValueError(...)",
"actual": "if refund_ratio < 0: raise ValueError(...)",
"impact": "攻击者可传入 refund_ratio=2.0 实现 200% 退款,直接导致资金损失"
}),
"severity": 10,
"severity_reason": "支付核心逻辑漏洞,可直接导致资金损失,攻击者可利用超额退款窃取资金",
},
{
"project_id": "rtc_backend",
"environment": "production",
"level": "ERROR",
"source": "code_review",
"error_type": "LogicError",
"error_message": "list_all_devices_admin 管理员设备列表分页偏移量计算错误page=1 时跳过前 page_size 条记录",
"file_path": "app/api/device_api.py",
"line_number": 177,
"stack_trace": json.dumps({
"file": "app/api/device_api.py",
"line": 177,
"function": "list_all_devices_admin",
"code": "start = page * page_size",
"detail": "应为 start = (page - 1) * page_size当前 page=1 时 start=20 跳过首页数据"
}),
"context": json.dumps({
"expected": "start = (page - 1) * page_size",
"actual": "start = page * page_size",
"impact": "管理员查看设备列表第一页会跳过前20条记录数据展示错误"
}),
"severity": 5,
"severity_reason": "分页逻辑错误导致管理端数据展示不完整,但不涉及数据泄露或安全问题",
},
{
"project_id": "rtc_backend",
"environment": "production",
"level": "ERROR",
"source": "code_review",
"error_type": "LogicError",
"error_message": "UserService.search_users 搜索用户时 is_active=False 过滤条件不生效,无法搜索已停用用户",
"file_path": "app/services/user_service.py",
"line_number": 104,
"stack_trace": json.dumps({
"file": "app/services/user_service.py",
"line": 104,
"function": "search_users",
"code": "if is_active:",
"detail": "应为 if is_active is not None:,当传入 is_active=False 时条件为假不会执行过滤"
}),
"context": json.dumps({
"expected": "if is_active is not None:",
"actual": "if is_active:",
"impact": "管理员搜索停用用户时过滤不生效,返回所有用户而非仅停用用户"
}),
"severity": 4,
"severity_reason": "布尔条件判断错误导致特定搜索场景失效,影响范围有限,不涉及安全问题",
},
]
async def main():
conn = await asyncpg.connect(DB_URL)
try:
for bug in BUGS:
fp = make_fingerprint(
bug["project_id"], bug["error_type"],
bug["file_path"], bug["line_number"]
)
row = await conn.fetchrow(
"""
INSERT INTO errorlog (
project_id, environment, level, source,
error_type, error_message, file_path, line_number,
stack_trace, context, fingerprint, status,
severity, severity_reason, timestamp,
retry_count, rejection_count
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8,
$9::jsonb, $10::jsonb, $11, $12,
$13, $14, $15,
$16, $17
)
RETURNING id
""",
bug["project_id"], bug["environment"], bug["level"], bug["source"],
bug["error_type"], bug["error_message"], bug["file_path"], bug["line_number"],
bug["stack_trace"], bug["context"], fp, "NEW",
bug["severity"], bug["severity_reason"], datetime.utcnow(),
0, 0,
)
print(f"Inserted bug #{row['id']}: severity={bug['severity']} - {bug['error_message'][:60]}...")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(main())