""" Task Manager - 与 Log Center 交互 """ import httpx from typing import Optional from loguru import logger from ..config import settings from ..models import Bug, BugStatus, RepairReport class TaskManager: """负责与 Log Center 交互""" def __init__(self): self.base_url = settings.log_center_url self.client = httpx.Client(timeout=30) def fetch_pending_bugs(self, project_id: Optional[str] = None) -> list[Bug]: """ 获取待修复的 Bug 列表(包括 NEW 和 PENDING_FIX 状态) Args: project_id: 可选,筛选特定项目 Returns: Bug 列表(已按 id 去重) """ all_bugs: dict[int, Bug] = {} for source in ("runtime", "cicd", "deployment"): for status in ("NEW", "PENDING_FIX"): try: params: dict[str, str] = {"status": status, "source": source} if project_id: params["project_id"] = project_id response = self.client.get( f"{self.base_url}/api/v1/bugs", params=params, ) response.raise_for_status() data = response.json() for item in data.get("items", []): bug_id = item["id"] if bug_id in all_bugs: continue stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") all_bugs[bug_id] = Bug( id=bug_id, project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus(item.get("status", "NEW")), retry_count=item.get("retry_count", 0), ) except httpx.HTTPError as e: logger.error(f"获取 {source}/{status} 状态 Bug 列表失败: {e}") bugs = list(all_bugs.values()) logger.info(f"获取到 {len(bugs)} 个待修复 Bug(runtime + cicd + deployment)") return bugs def fetch_failed_bugs(self, project_id: Optional[str] = None) -> list[Bug]: """ 获取修复失败的 Bug 列表(FIX_FAILED 状态,所有来源) """ all_bugs: dict[int, Bug] = {} for source in ("runtime", "cicd", "deployment"): try: params: dict[str, str] = {"status": "FIX_FAILED", "source": source} if project_id: params["project_id"] = project_id response = self.client.get( f"{self.base_url}/api/v1/bugs", params=params, ) response.raise_for_status() data = response.json() for item in data.get("items", []): bug_id = item["id"] if bug_id in all_bugs: continue stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") all_bugs[bug_id] = Bug( id=item["id"], project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus.FIX_FAILED, retry_count=item.get("retry_count", 0), ) except httpx.HTTPError as e: logger.error(f"获取 {source}/FIX_FAILED Bug 列表失败: {e}") bugs = list(all_bugs.values()) logger.info(f"获取到 {len(bugs)} 个 FIX_FAILED Bug") return bugs def update_status(self, bug_id: int, status: BugStatus, message: str = "") -> bool: """ 更新 Bug 状态 Args: bug_id: Bug ID status: 新状态 message: 状态说明 Returns: 是否成功 """ try: response = self.client.put( f"{self.base_url}/api/v1/tasks/{bug_id}/status", json={ "status": status.value, "message": message } ) response.raise_for_status() logger.info(f"Bug #{bug_id} 状态更新为 {status.value}") return True except httpx.HTTPError as e: logger.error(f"更新状态失败: {e}") return False def get_bug_detail(self, bug_id: int) -> Optional[Bug]: """获取 Bug 详情""" try: response = self.client.get(f"{self.base_url}/api/v1/bugs/{bug_id}") response.raise_for_status() item = response.json() # stack_trace 可能是列表或字符串 stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") return Bug( id=item["id"], project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus(item.get("status", "NEW")), retry_count=item.get("retry_count", 0), ) except httpx.HTTPError as e: logger.error(f"获取 Bug 详情失败: {e}") return None def upload_report(self, report: RepairReport) -> bool: """上传修复报告""" try: response = self.client.post( f"{self.base_url}/api/v1/repair/reports", json=report.model_dump() ) response.raise_for_status() logger.info(f"Bug #{report.error_log_id} 修复报告已上传") return True except httpx.HTTPError as e: logger.error(f"上传修复报告失败: {e}") return False def get_project_info(self, project_id: str) -> Optional[dict]: """从 Log Center API 获取项目配置(repo_url + local_path)""" try: response = self.client.get(f"{self.base_url}/api/v1/projects/{project_id}") response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.warning(f"获取项目 {project_id} 配置失败: {e}") return None def close(self): """关闭连接""" self.client.close()