log-center/repair_agent/agent/task_manager.py
zyc fe62f9ca81
All checks were successful
Build and Deploy Log Center / build-and-deploy (push) Successful in 1m27s
fix bug
2026-02-12 13:23:59 +08:00

155 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Task Manager - 与 Log Center 交互
"""
import httpx
from typing import Optional
from loguru import logger
from ..config import settings
from ..models import Bug, BugStatus, RepairReport
class TaskManager:
"""负责与 Log Center 交互"""
def __init__(self):
self.base_url = settings.log_center_url
self.client = httpx.Client(timeout=30)
def fetch_pending_bugs(self, project_id: Optional[str] = None) -> list[Bug]:
"""
获取待修复的 Bug 列表(包括 NEW 和 PENDING_FIX 状态)
Args:
project_id: 可选,筛选特定项目
Returns:
Bug 列表(已按 id 去重)
"""
all_bugs: dict[int, Bug] = {}
for status in ("NEW", "PENDING_FIX"):
try:
params: dict[str, str] = {"status": status}
if project_id:
params["project_id"] = project_id
response = self.client.get(
f"{self.base_url}/api/v1/bugs",
params=params,
)
response.raise_for_status()
data = response.json()
for item in data.get("items", []):
bug_id = item["id"]
if bug_id in all_bugs:
continue
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
all_bugs[bug_id] = Bug(
id=bug_id,
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
)
except httpx.HTTPError as e:
logger.error(f"获取 {status} 状态 Bug 列表失败: {e}")
bugs = list(all_bugs.values())
logger.info(f"获取到 {len(bugs)} 个待修复 BugNEW + PENDING_FIX")
return bugs
def update_status(self, bug_id: int, status: BugStatus, message: str = "") -> bool:
"""
更新 Bug 状态
Args:
bug_id: Bug ID
status: 新状态
message: 状态说明
Returns:
是否成功
"""
try:
response = self.client.put(
f"{self.base_url}/api/v1/tasks/{bug_id}/status",
json={
"status": status.value,
"message": message
}
)
response.raise_for_status()
logger.info(f"Bug #{bug_id} 状态更新为 {status.value}")
return True
except httpx.HTTPError as e:
logger.error(f"更新状态失败: {e}")
return False
def get_bug_detail(self, bug_id: int) -> Optional[Bug]:
"""获取 Bug 详情"""
try:
response = self.client.get(f"{self.base_url}/api/v1/bugs/{bug_id}")
response.raise_for_status()
item = response.json()
# stack_trace 可能是列表或字符串
stack_trace = item.get("stack_trace")
if isinstance(stack_trace, str):
stack_trace = stack_trace.split("\n")
return Bug(
id=item["id"],
project_id=item["project_id"],
environment=item.get("environment", "production"),
level=item.get("level", "ERROR"),
error={
"type": item.get("error_type", "Unknown"),
"message": item.get("error_message", ""),
"file_path": item.get("file_path"),
"line_number": item.get("line_number"),
"stack_trace": stack_trace,
},
context=item.get("context"),
status=BugStatus(item.get("status", "NEW")),
retry_count=item.get("retry_count", 0),
)
except httpx.HTTPError as e:
logger.error(f"获取 Bug 详情失败: {e}")
return None
def upload_report(self, report: RepairReport) -> bool:
"""上传修复报告"""
try:
response = self.client.post(
f"{self.base_url}/api/v1/repair/reports",
json=report.model_dump()
)
response.raise_for_status()
logger.info(f"Bug #{report.error_log_id} 修复报告已上传")
return True
except httpx.HTTPError as e:
logger.error(f"上传修复报告失败: {e}")
return False
def close(self):
"""关闭连接"""
self.client.close()