log-center/k8s-monitor/monitor.py
zyc 0d4b2d634c
All checks were successful
Build and Deploy Log Center / build-and-deploy (push) Successful in 2m24s
feat: 扩展日志收集,支持 CI/CD 构建错误和 K8s 部署错误
新增两种日志来源(cicd / deployment),使日志中台覆盖"构建→部署→运行"全链路:

后端变更:
- models.py: 新增 LogSource 枚举和 source 字段,file_path/line_number 改为可选
- main.py: 按来源生成不同指纹策略,所有查询端点支持 source 筛选,仪表盘新增来源分布统计
- database.py: 新增 4 条迁移 SQL(source 字段、索引、字段可空)
- task_manager.py: 修复 Agent 仅拉取 runtime 来源的缺陷

新增组件:
- k8s-monitor/: K8s Pod 健康监控脚本(Python),每 5 分钟检测异常 Pod 并上报
- k8s/monitor-cronjob.yaml: CronJob + RBAC 部署清单
- scripts/report-cicd-error.sh: CI/CD 错误上报 Bash 脚本
- scripts/gitea-actions-example.yaml: Gitea Actions 集成示例

前端变更:
- api.ts: 类型定义更新,支持 source 字段
- BugList.tsx: 新增来源筛选标签页和来源列
- BugDetail.tsx: 按来源条件渲染(CI/CD 信息、部署信息),非 runtime 禁用修复按钮
- Dashboard.tsx: 新增来源分布表格
- index.css: 来源标签样式(source-badge)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 10:20:16 +08:00

156 lines
4.7 KiB
Python

"""
K8s Pod 健康监控 - 扫描异常 Pod 并上报到 Log Center
作为 K8s CronJob 每 5 分钟运行一次
"""
import os
import logging
import requests
from kubernetes import client, config
LOG_CENTER_URL = os.getenv("LOG_CENTER_URL", "https://qiyuan-log-center-api.airlabs.art")
NAMESPACE = os.getenv("MONITOR_NAMESPACE", "default")
# 需要监控的异常状态
ABNORMAL_STATES = {
"CrashLoopBackOff",
"ImagePullBackOff",
"ErrImagePull",
"OOMKilled",
"Error",
"CreateContainerConfigError",
"InvalidImageName",
"RunContainerError",
}
# Pod label app -> project_id 映射
APP_TO_PROJECT = {
"rtc-backend": "rtc_backend",
"rtc-backend-dev": "rtc_backend",
"rtc-web": "rtc_web",
"rtc-web-dev": "rtc_web",
"log-center-api": "log_center",
"log-center-web": "log_center",
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def get_project_id(pod_labels: dict) -> str:
app_name = pod_labels.get("app", "unknown")
return APP_TO_PROJECT.get(app_name, app_name)
def check_pod_health(pod) -> list[dict]:
"""检查单个 Pod 的异常状态,返回错误列表"""
errors = []
pod_name = pod.metadata.name
namespace = pod.metadata.namespace
labels = pod.metadata.labels or {}
if not pod.status or not pod.status.container_statuses:
return errors
for cs in pod.status.container_statuses:
state = cs.state
reason = None
message = None
if state.waiting:
reason = state.waiting.reason
message = state.waiting.message or ""
elif state.terminated and state.terminated.reason in ABNORMAL_STATES:
reason = state.terminated.reason
message = state.terminated.message or ""
if reason and reason in ABNORMAL_STATES:
errors.append({
"project_id": get_project_id(labels),
"reason": reason,
"message": message,
"pod_name": pod_name,
"container_name": cs.name,
"namespace": namespace,
"deployment_name": labels.get("app", pod_name),
"restart_count": cs.restart_count,
"node_name": pod.spec.node_name,
})
return errors
def get_container_logs(v1, pod_name: str, namespace: str, container: str, lines: int = 50) -> str:
"""获取容器日志,优先拿崩溃前的日志"""
try:
return v1.read_namespaced_pod_log(
name=pod_name, namespace=namespace, container=container,
tail_lines=lines, previous=True,
)
except Exception:
try:
return v1.read_namespaced_pod_log(
name=pod_name, namespace=namespace, container=container,
tail_lines=lines,
)
except Exception as e:
return f"获取日志失败: {e}"
def report_error(error: dict, logs: str):
"""上报错误到 Log Center"""
payload = {
"project_id": error["project_id"],
"environment": "production",
"level": "CRITICAL",
"source": "deployment",
"error": {
"type": error["reason"],
"message": f"{error['reason']}: {error['message']} (pod: {error['pod_name']}, container: {error['container_name']})",
"file_path": None,
"line_number": None,
"stack_trace": logs.split("\n")[-50:] if logs else [],
},
"context": {
"namespace": error["namespace"],
"pod_name": error["pod_name"],
"container_name": error["container_name"],
"deployment_name": error["deployment_name"],
"restart_count": error["restart_count"],
"node_name": error["node_name"],
},
}
try:
resp = requests.post(
f"{LOG_CENTER_URL}/api/v1/logs/report",
json=payload,
timeout=10,
)
logger.info(f"上报 {error['reason']} ({error['pod_name']}): {resp.json()}")
except Exception as e:
logger.error(f"上报失败: {e}")
def main():
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace=NAMESPACE)
total_errors = 0
for pod in pods.items:
errors = check_pod_health(pod)
for error in errors:
logs = get_container_logs(v1, error["pod_name"], error["namespace"], error["container_name"])
report_error(error, logs)
total_errors += 1
logger.info(f"扫描完成,命名空间 '{NAMESPACE}' 发现 {total_errors} 个异常 Pod")
if __name__ == "__main__":
main()