log-center/repair_agent/agent/task_manager.py
zyc b178d24e73
Some checks failed
Build and Deploy Log Center / build-and-deploy (push) Failing after 5m9s
fix pr
2026-02-25 16:35:28 +08:00

259 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Task Manager - 与 Log Center 交互
"""
import httpx
from typing import Optional
from loguru import logger
from ..config import settings
from ..models import Bug, BugStatus, RepairReport
class TaskManager:
"""负责与 Log Center 交互"""
def __init__(self):
self.base_url = settings.log_center_url
self.client = httpx.Client(timeout=30)
def fetch_pending_bugs(self, project_id: Optional[str] = None) -> list[Bug]:
"""
获取待修复的 Bug 列表(仅 NEW 状态)
PENDING_FIX 表示已修复、等待 PR 审核,不应被重新拉取修复。
Args:
project_id: 可选,筛选特定项目
Returns:
Bug 列表(已按 id 去重)
"""
all_bugs: dict[int, Bug] = {}
for source in ("runtime", "cicd", "deployment", "code_review"):
for status in ("NEW",):
try:
params: dict[str, str] = {"status": status, "source": source}
if project_id:
params["project_id"] = project_id
response = self.client.get(
f"{self.base_url}/api/v1/bugs",
params=params,
)
response.raise_for_status()
data = response.json()
for item in data.get("items", []):
bug_id = item["id"]
if bug_id in all_bugs:
continue
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
elif isinstance(stack_trace, dict):
stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()]
all_bugs[bug_id] = Bug(
id=bug_id,
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
severity=item.get("severity"),
)
except httpx.HTTPError as e:
logger.error(f"获取 {source}/{status} 状态 Bug 列表失败: {e}")
bugs = list(all_bugs.values())
logger.info(f"获取到 {len(bugs)} 个待修复 Bug")
return bugs
def fetch_failed_bugs(self, project_id: Optional[str] = None) -> list[Bug]:
"""
获取修复失败的 Bug 列表FIX_FAILED 状态,所有来源)
"""
all_bugs: dict[int, Bug] = {}
for source in ("runtime", "cicd", "deployment", "code_review"):
try:
params: dict[str, str] = {"status": "FIX_FAILED", "source": source}
if project_id:
params["project_id"] = project_id
response = self.client.get(
f"{self.base_url}/api/v1/bugs",
params=params,
)
response.raise_for_status()
data = response.json()
for item in data.get("items", []):
bug_id = item["id"]
if bug_id in all_bugs:
continue
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
elif isinstance(stack_trace, dict):
stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()]
all_bugs[bug_id] = Bug(
id=item["id"],
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus.FIX_FAILED,
retry_count=item.get("retry_count", 0),
severity=item.get("severity"),
)
except httpx.HTTPError as e:
logger.error(f"获取 {source}/FIX_FAILED Bug 列表失败: {e}")
bugs = list(all_bugs.values())
logger.info(f"获取到 {len(bugs)} 个 FIX_FAILED Bug")
return bugs
def update_status(self, bug_id: int, status: BugStatus, message: str = "") -> bool:
"""
更新 Bug 状态
Args:
bug_id: Bug ID
status: 新状态
message: 状态说明
Returns:
是否成功
"""
try:
response = self.client.put(
f"{self.base_url}/api/v1/tasks/{bug_id}/status",
json={
"status": status.value,
"message": message
}
)
response.raise_for_status()
logger.info(f"Bug #{bug_id} 状态更新为 {status.value}")
return True
except httpx.HTTPError as e:
logger.error(f"更新状态失败: {e}")
return False
def get_bug_detail(self, bug_id: int) -> Optional[Bug]:
"""获取 Bug 详情"""
try:
response = self.client.get(f"{self.base_url}/api/v1/bugs/{bug_id}")
response.raise_for_status()
item = response.json()
# stack_trace 可能是列表、字符串或字典
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
elif isinstance(stack_trace, dict):
stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()]
return Bug(
id=item["id"],
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
severity=item.get("severity"),
)
except httpx.HTTPError as e:
logger.error(f"获取 Bug 详情失败: {e}")
return None
def upload_report(self, report: RepairReport) -> bool:
"""上传修复报告"""
try:
response = self.client.post(
f"{self.base_url}/api/v1/repair/reports",
json=report.model_dump()
)
response.raise_for_status()
logger.info(f"修复报告已上传 (bugs: {report.error_log_ids})")
return True
except httpx.HTTPError as e:
logger.error(f"上传修复报告失败: {e}")
return False
def update_pr_info(self, bug_id: int, pr_number: int, pr_url: str, branch_name: str) -> bool:
"""回写 PR 信息到 Bug"""
try:
response = self.client.put(
f"{self.base_url}/api/v1/bugs/{bug_id}/pr-info",
json={
"pr_number": pr_number,
"pr_url": pr_url,
"branch_name": branch_name,
},
)
response.raise_for_status()
logger.info(f"Bug #{bug_id} PR 信息已更新: #{pr_number}")
return True
except httpx.HTTPError as e:
logger.error(f"更新 PR 信息失败: {e}")
return False
def update_severity(self, bug_id: int, severity: int, severity_reason: str = "") -> bool:
"""回写 Bug 严重等级"""
try:
response = self.client.put(
f"{self.base_url}/api/v1/bugs/{bug_id}/severity",
json={"severity": severity, "severity_reason": severity_reason},
)
response.raise_for_status()
logger.info(f"Bug #{bug_id} 严重等级已更新: {severity}/10")
return True
except httpx.HTTPError as e:
logger.error(f"更新严重等级失败: {e}")
return False
def get_project_info(self, project_id: str) -> Optional[dict]:
"""从 Log Center API 获取项目配置repo_url + local_path"""
try:
response = self.client.get(f"{self.base_url}/api/v1/projects/{project_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.warning(f"获取项目 {project_id} 配置失败: {e}")
return None
def close(self):
"""关闭连接"""
self.client.close()