""" Task Manager - 与 Log Center 交互 """ import httpx from typing import Optional from loguru import logger from ..config import settings from ..models import Bug, BugStatus, RepairReport class TaskManager: """负责与 Log Center 交互""" def __init__(self): self.base_url = settings.log_center_url self.client = httpx.Client(timeout=30) def fetch_pending_bugs(self, project_id: Optional[str] = None) -> list[Bug]: """ 获取待修复的 Bug 列表(仅 NEW 状态) PENDING_FIX 表示已修复、等待 PR 审核,不应被重新拉取修复。 Args: project_id: 可选,筛选特定项目 Returns: Bug 列表(已按 id 去重) """ all_bugs: dict[int, Bug] = {} for source in ("runtime", "cicd", "deployment", "code_review"): for status in ("NEW",): try: params: dict[str, str] = {"status": status, "source": source} if project_id: params["project_id"] = project_id response = self.client.get( f"{self.base_url}/api/v1/bugs", params=params, ) response.raise_for_status() data = response.json() for item in data.get("items", []): bug_id = item["id"] if bug_id in all_bugs: continue stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") elif isinstance(stack_trace, dict): stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()] all_bugs[bug_id] = Bug( id=bug_id, project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus(item.get("status", "NEW")), retry_count=item.get("retry_count", 0), severity=item.get("severity"), ) except httpx.HTTPError as e: logger.error(f"获取 {source}/{status} 状态 Bug 列表失败: {e}") bugs = list(all_bugs.values()) logger.info(f"获取到 {len(bugs)} 个待修复 Bug") return bugs def fetch_failed_bugs(self, project_id: Optional[str] = None) -> list[Bug]: """ 获取修复失败的 Bug 列表(FIX_FAILED 状态,所有来源) """ all_bugs: dict[int, Bug] = {} for source in ("runtime", "cicd", "deployment", "code_review"): try: params: dict[str, str] = {"status": "FIX_FAILED", "source": source} if project_id: params["project_id"] = project_id response = self.client.get( f"{self.base_url}/api/v1/bugs", params=params, ) response.raise_for_status() data = response.json() for item in data.get("items", []): bug_id = item["id"] if bug_id in all_bugs: continue stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") elif isinstance(stack_trace, dict): stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()] all_bugs[bug_id] = Bug( id=item["id"], project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus.FIX_FAILED, retry_count=item.get("retry_count", 0), severity=item.get("severity"), ) except httpx.HTTPError as e: logger.error(f"获取 {source}/FIX_FAILED Bug 列表失败: {e}") bugs = list(all_bugs.values()) logger.info(f"获取到 {len(bugs)} 个 FIX_FAILED Bug") return bugs def update_status(self, bug_id: int, status: BugStatus, message: str = "") -> bool: """ 更新 Bug 状态 Args: bug_id: Bug ID status: 新状态 message: 状态说明 Returns: 是否成功 """ try: response = self.client.put( f"{self.base_url}/api/v1/tasks/{bug_id}/status", json={ "status": status.value, "message": message } ) response.raise_for_status() logger.info(f"Bug #{bug_id} 状态更新为 {status.value}") return True except httpx.HTTPError as e: logger.error(f"更新状态失败: {e}") return False def get_bug_detail(self, bug_id: int) -> Optional[Bug]: """获取 Bug 详情""" try: response = self.client.get(f"{self.base_url}/api/v1/bugs/{bug_id}") response.raise_for_status() item = response.json() # stack_trace 可能是列表、字符串或字典 stack_trace = item.get("stack_trace") if isinstance(stack_trace, str): stack_trace = stack_trace.split("\n") elif isinstance(stack_trace, dict): stack_trace = [f"{k}: {v}" for k, v in stack_trace.items()] return Bug( id=item["id"], project_id=item["project_id"], environment=item.get("environment", "production"), level=item.get("level", "ERROR"), error={ "type": item.get("error_type", "Unknown"), "message": item.get("error_message", ""), "file_path": item.get("file_path"), "line_number": item.get("line_number"), "stack_trace": stack_trace, }, context=item.get("context"), status=BugStatus(item.get("status", "NEW")), retry_count=item.get("retry_count", 0), severity=item.get("severity"), ) except httpx.HTTPError as e: logger.error(f"获取 Bug 详情失败: {e}") return None def upload_report(self, report: RepairReport) -> bool: """上传修复报告""" try: response = self.client.post( f"{self.base_url}/api/v1/repair/reports", json=report.model_dump() ) response.raise_for_status() logger.info(f"修复报告已上传 (bugs: {report.error_log_ids})") return True except httpx.HTTPError as e: logger.error(f"上传修复报告失败: {e}") return False def update_pr_info(self, bug_id: int, pr_number: int, pr_url: str, branch_name: str) -> bool: """回写 PR 信息到 Bug""" try: response = self.client.put( f"{self.base_url}/api/v1/bugs/{bug_id}/pr-info", json={ "pr_number": pr_number, "pr_url": pr_url, "branch_name": branch_name, }, ) response.raise_for_status() logger.info(f"Bug #{bug_id} PR 信息已更新: #{pr_number}") return True except httpx.HTTPError as e: logger.error(f"更新 PR 信息失败: {e}") return False def update_severity(self, bug_id: int, severity: int, severity_reason: str = "") -> bool: """回写 Bug 严重等级""" try: response = self.client.put( f"{self.base_url}/api/v1/bugs/{bug_id}/severity", json={"severity": severity, "severity_reason": severity_reason}, ) response.raise_for_status() logger.info(f"Bug #{bug_id} 严重等级已更新: {severity}/10") return True except httpx.HTTPError as e: logger.error(f"更新严重等级失败: {e}") return False def get_project_info(self, project_id: str) -> Optional[dict]: """从 Log Center API 获取项目配置(repo_url + local_path)""" try: response = self.client.get(f"{self.base_url}/api/v1/projects/{project_id}") response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.warning(f"获取项目 {project_id} 配置失败: {e}") return None def close(self): """关闭连接""" self.client.close()