log-center/repair_agent/agent/task_manager.py
zyc 0d4b2d634c
All checks were successful
Build and Deploy Log Center / build-and-deploy (push) Successful in 2m24s
feat: 扩展日志收集,支持 CI/CD 构建错误和 K8s 部署错误
新增两种日志来源(cicd / deployment),使日志中台覆盖"构建→部署→运行"全链路:

后端变更:
- models.py: 新增 LogSource 枚举和 source 字段,file_path/line_number 改为可选
- main.py: 按来源生成不同指纹策略,所有查询端点支持 source 筛选,仪表盘新增来源分布统计
- database.py: 新增 4 条迁移 SQL(source 字段、索引、字段可空)
- task_manager.py: 修复 Agent 仅拉取 runtime 来源的缺陷

新增组件:
- k8s-monitor/: K8s Pod 健康监控脚本(Python),每 5 分钟检测异常 Pod 并上报
- k8s/monitor-cronjob.yaml: CronJob + RBAC 部署清单
- scripts/report-cicd-error.sh: CI/CD 错误上报 Bash 脚本
- scripts/gitea-actions-example.yaml: Gitea Actions 集成示例

前端变更:
- api.ts: 类型定义更新,支持 source 字段
- BugList.tsx: 新增来源筛选标签页和来源列
- BugDetail.tsx: 按来源条件渲染(CI/CD 信息、部署信息),非 runtime 禁用修复按钮
- Dashboard.tsx: 新增来源分布表格
- index.css: 来源标签样式(source-badge)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 10:20:16 +08:00

155 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Task Manager - 与 Log Center 交互
"""
import httpx
from typing import Optional
from loguru import logger
from ..config import settings
from ..models import Bug, BugStatus, RepairReport
class TaskManager:
"""负责与 Log Center 交互"""
def __init__(self):
self.base_url = settings.log_center_url
self.client = httpx.Client(timeout=30)
def fetch_pending_bugs(self, project_id: Optional[str] = None) -> list[Bug]:
"""
获取待修复的 Bug 列表(包括 NEW 和 PENDING_FIX 状态)
Args:
project_id: 可选,筛选特定项目
Returns:
Bug 列表(已按 id 去重)
"""
all_bugs: dict[int, Bug] = {}
for status in ("NEW", "PENDING_FIX"):
try:
params: dict[str, str] = {"status": status, "source": "runtime"}
if project_id:
params["project_id"] = project_id
response = self.client.get(
f"{self.base_url}/api/v1/bugs",
params=params,
)
response.raise_for_status()
data = response.json()
for item in data.get("items", []):
bug_id = item["id"]
if bug_id in all_bugs:
continue
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
all_bugs[bug_id] = Bug(
id=bug_id,
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
)
except httpx.HTTPError as e:
logger.error(f"获取 {status} 状态 Bug 列表失败: {e}")
bugs = list(all_bugs.values())
logger.info(f"获取到 {len(bugs)} 个待修复 BugNEW + PENDING_FIX")
return bugs
def update_status(self, bug_id: int, status: BugStatus, message: str = "") -> bool:
"""
更新 Bug 状态
Args:
bug_id: Bug ID
status: 新状态
message: 状态说明
Returns:
是否成功
"""
try:
response = self.client.put(
f"{self.base_url}/api/v1/tasks/{bug_id}/status",
json={
"status": status.value,
"message": message
}
)
response.raise_for_status()
logger.info(f"Bug #{bug_id} 状态更新为 {status.value}")
return True
except httpx.HTTPError as e:
logger.error(f"更新状态失败: {e}")
return False
def get_bug_detail(self, bug_id: int) -> Optional[Bug]:
"""获取 Bug 详情"""
try:
response = self.client.get(f"{self.base_url}/api/v1/bugs/{bug_id}")
response.raise_for_status()
item = response.json()
# stack_trace 可能是列表或字符串
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
return Bug(
id=item["id"],
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
)
except httpx.HTTPError as e:
logger.error(f"获取 Bug 详情失败: {e}")
return None
def upload_report(self, report: RepairReport) -> bool:
"""上传修复报告"""
try:
response = self.client.post(
f"{self.base_url}/api/v1/repair/reports",
json=report.model_dump()
)
response.raise_for_status()
logger.info(f"Bug #{report.error_log_id} 修复报告已上传")
return True
except httpx.HTTPError as e:
logger.error(f"上传修复报告失败: {e}")
return False
def close(self):
"""关闭连接"""
self.client.close()