zyc 874c873de9
Some checks failed
Build and Deploy Log Center / build-and-deploy (push) Failing after 1m7s
feat(monitor): K8s Monitor 从 API 动态加载项目映射,移除硬编码
monitor 启动时调用 GET /api/v1/projects 拉取项目列表,
自动生成 app label -> project_id 映射(下划线转短横线 + -dev 变体),
新项目只需在 Log Center 注册即可自动纳入 K8s 监控。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 10:25:04 +08:00

178 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
K8s Pod 健康监控 - 扫描异常 Pod 并上报到 Log Center
作为 K8s CronJob 每 5 分钟运行一次
"""
import os
import logging
import requests
from kubernetes import client, config
LOG_CENTER_URL = os.getenv("LOG_CENTER_URL", "https://qiyuan-log-center-api.airlabs.art")
NAMESPACE = os.getenv("MONITOR_NAMESPACE", "default")
# 需要监控的异常状态
ABNORMAL_STATES = {
"CrashLoopBackOff",
"ImagePullBackOff",
"ErrImagePull",
"OOMKilled",
"Error",
"CreateContainerConfigError",
"InvalidImageName",
"RunContainerError",
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# 动态映射表,启动时从 API 加载
APP_TO_PROJECT: dict[str, str] = {}
def load_project_mapping():
"""从 Log Center API 拉取项目列表,自动生成 app label -> project_id 映射。
映射规则project_id 中的下划线替换为短横线作为 app label
同时生成 -dev 后缀的变体用于开发环境。
例如 project_id="rtc_backend" -> app labels: "rtc-backend", "rtc-backend-dev"
"""
try:
resp = requests.get(f"{LOG_CENTER_URL}/api/v1/projects", timeout=10)
resp.raise_for_status()
projects = resp.json().get("projects", [])
mapping = {}
for project in projects:
pid = project["project_id"]
# project_id 下划线转短横线作为 app label
app_label = pid.replace("_", "-")
mapping[app_label] = pid
mapping[f"{app_label}-dev"] = pid
APP_TO_PROJECT.update(mapping)
logger.info(f"从 API 加载了 {len(projects)} 个项目,生成 {len(mapping)} 条映射")
except Exception as e:
logger.warning(f"从 API 加载项目映射失败: {e},将使用 app label 原值作为 project_id")
def get_project_id(pod_labels: dict) -> str:
app_name = pod_labels.get("app", "unknown")
return APP_TO_PROJECT.get(app_name, app_name)
def check_pod_health(pod) -> list[dict]:
"""检查单个 Pod 的异常状态,返回错误列表"""
errors = []
pod_name = pod.metadata.name
namespace = pod.metadata.namespace
labels = pod.metadata.labels or {}
if not pod.status or not pod.status.container_statuses:
return errors
for cs in pod.status.container_statuses:
state = cs.state
reason = None
message = None
if state.waiting:
reason = state.waiting.reason
message = state.waiting.message or ""
elif state.terminated and state.terminated.reason in ABNORMAL_STATES:
reason = state.terminated.reason
message = state.terminated.message or ""
if reason and reason in ABNORMAL_STATES:
errors.append({
"project_id": get_project_id(labels),
"reason": reason,
"message": message,
"pod_name": pod_name,
"container_name": cs.name,
"namespace": namespace,
"deployment_name": labels.get("app", pod_name),
"restart_count": cs.restart_count,
"node_name": pod.spec.node_name,
})
return errors
def get_container_logs(v1, pod_name: str, namespace: str, container: str, lines: int = 50) -> str:
"""获取容器日志,优先拿崩溃前的日志"""
try:
return v1.read_namespaced_pod_log(
name=pod_name, namespace=namespace, container=container,
tail_lines=lines, previous=True,
)
except Exception:
try:
return v1.read_namespaced_pod_log(
name=pod_name, namespace=namespace, container=container,
tail_lines=lines,
)
except Exception as e:
return f"获取日志失败: {e}"
def report_error(error: dict, logs: str):
"""上报错误到 Log Center"""
payload = {
"project_id": error["project_id"],
"environment": "production",
"level": "CRITICAL",
"source": "deployment",
"error": {
"type": error["reason"],
"message": f"{error['reason']}: {error['message']} (pod: {error['pod_name']}, container: {error['container_name']})",
"file_path": None,
"line_number": None,
"stack_trace": logs.split("\n")[-50:] if logs else [],
},
"context": {
"namespace": error["namespace"],
"pod_name": error["pod_name"],
"container_name": error["container_name"],
"deployment_name": error["deployment_name"],
"restart_count": error["restart_count"],
"node_name": error["node_name"],
},
}
try:
resp = requests.post(
f"{LOG_CENTER_URL}/api/v1/logs/report",
json=payload,
timeout=10,
)
logger.info(f"上报 {error['reason']} ({error['pod_name']}): {resp.json()}")
except Exception as e:
logger.error(f"上报失败: {e}")
def main():
# 从 Log Center API 动态加载项目映射
load_project_mapping()
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
v1 = client.CoreV1Api()
pods = v1.list_namespaced_pod(namespace=NAMESPACE)
total_errors = 0
for pod in pods.items:
errors = check_pod_health(pod)
for error in errors:
logs = get_container_logs(v1, error["pod_name"], error["namespace"], error["container_name"])
report_error(error, logs)
total_errors += 1
logger.info(f"扫描完成,命名空间 '{NAMESPACE}' 发现 {total_errors} 个异常 Pod")
if __name__ == "__main__":
main()