Some checks failed
Build and Deploy Log Center / build-and-deploy (push) Failing after 1m7s
monitor 启动时调用 GET /api/v1/projects 拉取项目列表, 自动生成 app label -> project_id 映射(下划线转短横线 + -dev 变体), 新项目只需在 Log Center 注册即可自动纳入 K8s 监控。 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
178 lines
5.7 KiB
Python
178 lines
5.7 KiB
Python
"""
|
||
K8s Pod 健康监控 - 扫描异常 Pod 并上报到 Log Center
|
||
作为 K8s CronJob 每 5 分钟运行一次
|
||
"""
|
||
import os
|
||
import logging
|
||
import requests
|
||
from kubernetes import client, config
|
||
|
||
LOG_CENTER_URL = os.getenv("LOG_CENTER_URL", "https://qiyuan-log-center-api.airlabs.art")
|
||
NAMESPACE = os.getenv("MONITOR_NAMESPACE", "default")
|
||
|
||
# 需要监控的异常状态
|
||
ABNORMAL_STATES = {
|
||
"CrashLoopBackOff",
|
||
"ImagePullBackOff",
|
||
"ErrImagePull",
|
||
"OOMKilled",
|
||
"Error",
|
||
"CreateContainerConfigError",
|
||
"InvalidImageName",
|
||
"RunContainerError",
|
||
}
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 动态映射表,启动时从 API 加载
|
||
APP_TO_PROJECT: dict[str, str] = {}
|
||
|
||
|
||
def load_project_mapping():
|
||
"""从 Log Center API 拉取项目列表,自动生成 app label -> project_id 映射。
|
||
|
||
映射规则:project_id 中的下划线替换为短横线作为 app label,
|
||
同时生成 -dev 后缀的变体用于开发环境。
|
||
例如 project_id="rtc_backend" -> app labels: "rtc-backend", "rtc-backend-dev"
|
||
"""
|
||
try:
|
||
resp = requests.get(f"{LOG_CENTER_URL}/api/v1/projects", timeout=10)
|
||
resp.raise_for_status()
|
||
projects = resp.json().get("projects", [])
|
||
|
||
mapping = {}
|
||
for project in projects:
|
||
pid = project["project_id"]
|
||
# project_id 下划线转短横线作为 app label
|
||
app_label = pid.replace("_", "-")
|
||
mapping[app_label] = pid
|
||
mapping[f"{app_label}-dev"] = pid
|
||
|
||
APP_TO_PROJECT.update(mapping)
|
||
logger.info(f"从 API 加载了 {len(projects)} 个项目,生成 {len(mapping)} 条映射")
|
||
except Exception as e:
|
||
logger.warning(f"从 API 加载项目映射失败: {e},将使用 app label 原值作为 project_id")
|
||
|
||
|
||
def get_project_id(pod_labels: dict) -> str:
|
||
app_name = pod_labels.get("app", "unknown")
|
||
return APP_TO_PROJECT.get(app_name, app_name)
|
||
|
||
|
||
def check_pod_health(pod) -> list[dict]:
|
||
"""检查单个 Pod 的异常状态,返回错误列表"""
|
||
errors = []
|
||
pod_name = pod.metadata.name
|
||
namespace = pod.metadata.namespace
|
||
labels = pod.metadata.labels or {}
|
||
|
||
if not pod.status or not pod.status.container_statuses:
|
||
return errors
|
||
|
||
for cs in pod.status.container_statuses:
|
||
state = cs.state
|
||
reason = None
|
||
message = None
|
||
|
||
if state.waiting:
|
||
reason = state.waiting.reason
|
||
message = state.waiting.message or ""
|
||
elif state.terminated and state.terminated.reason in ABNORMAL_STATES:
|
||
reason = state.terminated.reason
|
||
message = state.terminated.message or ""
|
||
|
||
if reason and reason in ABNORMAL_STATES:
|
||
errors.append({
|
||
"project_id": get_project_id(labels),
|
||
"reason": reason,
|
||
"message": message,
|
||
"pod_name": pod_name,
|
||
"container_name": cs.name,
|
||
"namespace": namespace,
|
||
"deployment_name": labels.get("app", pod_name),
|
||
"restart_count": cs.restart_count,
|
||
"node_name": pod.spec.node_name,
|
||
})
|
||
|
||
return errors
|
||
|
||
|
||
def get_container_logs(v1, pod_name: str, namespace: str, container: str, lines: int = 50) -> str:
|
||
"""获取容器日志,优先拿崩溃前的日志"""
|
||
try:
|
||
return v1.read_namespaced_pod_log(
|
||
name=pod_name, namespace=namespace, container=container,
|
||
tail_lines=lines, previous=True,
|
||
)
|
||
except Exception:
|
||
try:
|
||
return v1.read_namespaced_pod_log(
|
||
name=pod_name, namespace=namespace, container=container,
|
||
tail_lines=lines,
|
||
)
|
||
except Exception as e:
|
||
return f"获取日志失败: {e}"
|
||
|
||
|
||
def report_error(error: dict, logs: str):
|
||
"""上报错误到 Log Center"""
|
||
payload = {
|
||
"project_id": error["project_id"],
|
||
"environment": "production",
|
||
"level": "CRITICAL",
|
||
"source": "deployment",
|
||
"error": {
|
||
"type": error["reason"],
|
||
"message": f"{error['reason']}: {error['message']} (pod: {error['pod_name']}, container: {error['container_name']})",
|
||
"file_path": None,
|
||
"line_number": None,
|
||
"stack_trace": logs.split("\n")[-50:] if logs else [],
|
||
},
|
||
"context": {
|
||
"namespace": error["namespace"],
|
||
"pod_name": error["pod_name"],
|
||
"container_name": error["container_name"],
|
||
"deployment_name": error["deployment_name"],
|
||
"restart_count": error["restart_count"],
|
||
"node_name": error["node_name"],
|
||
},
|
||
}
|
||
|
||
try:
|
||
resp = requests.post(
|
||
f"{LOG_CENTER_URL}/api/v1/logs/report",
|
||
json=payload,
|
||
timeout=10,
|
||
)
|
||
logger.info(f"上报 {error['reason']} ({error['pod_name']}): {resp.json()}")
|
||
except Exception as e:
|
||
logger.error(f"上报失败: {e}")
|
||
|
||
|
||
def main():
|
||
# 从 Log Center API 动态加载项目映射
|
||
load_project_mapping()
|
||
|
||
try:
|
||
config.load_incluster_config()
|
||
except config.ConfigException:
|
||
config.load_kube_config()
|
||
|
||
v1 = client.CoreV1Api()
|
||
pods = v1.list_namespaced_pod(namespace=NAMESPACE)
|
||
|
||
total_errors = 0
|
||
for pod in pods.items:
|
||
errors = check_pod_health(pod)
|
||
for error in errors:
|
||
logs = get_container_logs(v1, error["pod_name"], error["namespace"], error["container_name"])
|
||
report_error(error, logs)
|
||
total_errors += 1
|
||
|
||
logger.info(f"扫描完成,命名空间 '{NAMESPACE}' 发现 {total_errors} 个异常 Pod")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|