""" Core Engine - 核心修复引擎 """ from datetime import datetime from typing import Optional from loguru import logger from ..config import settings from ..models import Bug, BugStatus, FixResult, BatchFixResult, RepairReport from .task_manager import TaskManager from .git_manager import GitManager from .claude_service import ClaudeService from .test_runner import TestRunner class RepairEngine: """核心修复引擎 - 编排整个修复流程""" def __init__(self): self.task_manager = TaskManager() self.claude_service = ClaudeService() def fix_project( self, project_id: str, run_tests: bool = True, auto_commit: bool = False, ) -> BatchFixResult: """ 修复指定项目的所有待修复 Bug Args: project_id: 项目ID run_tests: 是否运行测试 auto_commit: 是否自动提交 Returns: BatchFixResult """ logger.info(f"开始修复项目: {project_id}") # 从 API 获取项目配置(优先),回退到 .env project_info = self.task_manager.get_project_info(project_id) project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(project_id) api_repo_url = (project_info and project_info.get("repo_url")) or "" if not project_path: logger.error(f"未找到项目路径配置: {project_id}") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) # 获取待修复的 Bug bugs = self.task_manager.fetch_pending_bugs(project_id) if not bugs: logger.info("没有待修复的 Bug") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) logger.info(f"获取到 {len(bugs)} 个待修复 Bug") # 检查是否启用 Git(API 优先,回退 .env) github_repo = api_repo_url or settings.get_github_repo(project_id) git_enabled = bool(github_repo) git_manager = None if git_enabled: git_manager = GitManager(project_path, github_repo=github_repo) git_manager.pull() branch_name = f"fix/auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}" if auto_commit: git_manager.create_branch(branch_name) else: logger.info(f"项目 {project_id} 未配置仓库地址,跳过 Git 操作") # 更新所有 Bug 状态为 FIXING for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXING) # 多轮修复循环 max_rounds = settings.max_retry_count # 默认 3 results = [] last_test_output = "" last_diff = "" for round_num in range(1, max_rounds + 1): logger.info(f"=== 第 {round_num}/{max_rounds} 轮修复 ===") # Step 1: 调用 Claude 修复 if round_num == 1: success, output = self.claude_service.batch_fix_bugs(bugs, project_path) else: success, output = self.claude_service.retry_fix_bugs( bugs, project_path, previous_diff=last_diff, previous_test_output=last_test_output, round_num=round_num, ) if not success: # Claude CLI 本身执行失败,不重试 failure_reason = f"Claude CLI 执行失败: {output[:500]}" logger.error(f"{failure_reason} (round {round_num})") for bug in bugs: self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason) results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason)) break # Step 2: 获取变更 modified_files = [] diff = "" if git_manager: modified_files = git_manager.get_modified_files() diff = git_manager.get_diff() logger.info(f"第 {round_num} 轮修复完成,修改了 {len(modified_files)} 个文件") # Step 3: 安全检查(仅在 Git 启用时,不重试) if git_manager and not self._safety_check(modified_files, diff): failure_reason = "安全检查未通过" logger.warning(f"{failure_reason} (round {round_num})") git_manager.reset_hard() for bug in bugs: self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason) results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason)) break # Step 4: 运行测试 test_result = None if run_tests: test_runner = TestRunner(project_path, project_id) test_result = test_runner.run_full_suite() if not test_result.success: last_test_output = test_result.output last_diff = diff is_last_round = (round_num == max_rounds) failure_reason = f"测试未通过 (第 {round_num}/{max_rounds} 轮)" # 上传本轮报告 for bug in bugs: self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_result.output, test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED if is_last_round else BugStatus.FIXING, ) # 回滚准备下一轮或最终失败 if git_manager: git_manager.reset_hard() if is_last_round: final_msg = f"经过 {max_rounds} 轮修复仍未通过测试" for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, final_msg) results.append(FixResult(bug_id=bug.id, success=False, message=final_msg)) else: logger.info(f"第 {round_num} 轮测试未通过,准备第 {round_num + 1} 轮重试...") continue # 进入下一轮 # Step 5: 测试通过(或跳过测试)— 成功! for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXED) self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_result.output if test_result else "Tests skipped", test_passed=test_result.success if test_result else True, failure_reason=None, status=BugStatus.FIXED, ) results.append(FixResult( bug_id=bug.id, success=True, message=f"修复成功 (第 {round_num} 轮)", modified_files=modified_files, diff=diff, )) # 自动提交、合并到 main 并推送(仅在 Git 启用时) if git_enabled and auto_commit and modified_files and git_manager: bug_ids = ", ".join([f"#{b.id}" for b in bugs]) git_manager.commit(f"fix: auto repair bugs {bug_ids}") logger.info("代码已提交") git_manager.push() logger.info("fix 分支已推送") # 合并 fix 分支到 main 并推送,触发 CI/CD if git_manager.merge_to_main_and_push(): logger.info("已合并到 main 并推送") else: logger.warning("合并到 main 失败,请手动合并") elif not git_enabled and auto_commit: logger.info("未配置 GitHub 仓库,跳过自动提交") break # 成功,退出循环 success_count = sum(1 for r in results if r.success) return BatchFixResult( project_id=project_id, total=len(bugs), success_count=success_count, failed_count=len(bugs) - success_count, results=results, ) def retry_failed_project( self, project_id: Optional[str] = None, run_tests: bool = True, auto_commit: bool = False, ) -> BatchFixResult: """ 处理 FIX_FAILED 状态的 Bug:先分诊,再修复。 流程: 1. 获取所有 FIX_FAILED Bug 2. 逐个分诊(triage):判断是否为可修复的代码缺陷 3. 不可修复的标记为 CANNOT_REPRODUCE 4. 可修复的重置为 PENDING_FIX 后调用 fix_project 修复 """ logger.info(f"开始处理 FIX_FAILED Bug{f' (项目: {project_id})' if project_id else ''}") failed_bugs = self.task_manager.fetch_failed_bugs(project_id) if not failed_bugs: logger.info("没有 FIX_FAILED 的 Bug") return BatchFixResult( project_id=project_id or "all", total=0, success_count=0, failed_count=0, results=[], ) results: list[FixResult] = [] bugs_to_fix: dict[str, list[Bug]] = {} # project_id → bugs # Step 1: 逐个分诊 for bug in failed_bugs: logger.info(f"分诊 Bug #{bug.id} ({bug.error.type}: {bug.error.message[:60]})") project_info = self.task_manager.get_project_info(bug.project_id) project_path = ( (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id) ) if not project_path: logger.warning(f"Bug #{bug.id}: 未找到项目路径 {bug.project_id},跳过") results.append(FixResult( bug_id=bug.id, success=False, message=f"未找到项目路径: {bug.project_id}", )) continue # 调用 Claude 分诊 self.task_manager.update_status(bug.id, BugStatus.VERIFYING) success, output = self.claude_service.triage_bug(bug, project_path) if not success: logger.warning(f"Bug #{bug.id}: 分诊执行失败,保留 FIX_FAILED") self.task_manager.update_status( bug.id, BugStatus.FIX_FAILED, f"分诊失败: {output[:200]}" ) results.append(FixResult( bug_id=bug.id, success=False, message="分诊执行失败", )) continue # 解析判决 if "VERDICT:CANNOT_REPRODUCE" in output: logger.info(f"Bug #{bug.id}: 判定为无法复现") self.task_manager.update_status( bug.id, BugStatus.CANNOT_REPRODUCE, "AI 分诊判定:非代码缺陷或无法复现", ) self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=0, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason="AI 分诊:无法复现", status=BugStatus.CANNOT_REPRODUCE, ) results.append(FixResult( bug_id=bug.id, success=True, message="标记为 CANNOT_REPRODUCE", )) elif "VERDICT:FIX" in output: logger.info(f"Bug #{bug.id}: 判定为可修复,加入修复队列") self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX) bugs_to_fix.setdefault(bug.project_id, []).append(bug) else: logger.warning(f"Bug #{bug.id}: 分诊输出无 VERDICT 标记,默认加入修复队列") self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX) bugs_to_fix.setdefault(bug.project_id, []).append(bug) # Step 2: 按项目批量修复 for pid, bugs in bugs_to_fix.items(): logger.info(f"开始修复项目 {pid} 的 {len(bugs)} 个 Bug") fix_result = self.fix_project( project_id=pid, run_tests=run_tests, auto_commit=auto_commit, ) results.extend(fix_result.results) success_count = sum(1 for r in results if r.success) total = len(failed_bugs) return BatchFixResult( project_id=project_id or "all", total=total, success_count=success_count, failed_count=total - success_count, results=results, ) def _upload_round_report( self, bug: Bug, project_id: str, round_num: int, ai_analysis: str, diff: str, modified_files: list[str], test_output: str, test_passed: bool, failure_reason: Optional[str], status: BugStatus, ): """上传某一轮的修复报告""" try: report = RepairReport( error_log_id=bug.id, status=status, project_id=project_id, ai_analysis=ai_analysis, fix_plan="See AI Analysis", code_diff=diff, modified_files=modified_files, test_output=test_output, test_passed=test_passed, repair_round=round_num, failure_reason=failure_reason, ) self.task_manager.upload_report(report) except Exception as e: logger.error(f"上传第 {round_num} 轮报告失败: {e}") def _safety_check(self, modified_files: list[str], diff: str) -> bool: """ 安全检查 Args: modified_files: 修改的文件列表 diff: Git diff Returns: 是否通过检查 """ # 检查修改文件数量 if len(modified_files) > settings.max_modified_files: logger.warning(f"修改文件数超限: {len(modified_files)} > {settings.max_modified_files}") return False # 检查修改行数 added_lines = diff.count("\n+") - diff.count("\n+++") deleted_lines = diff.count("\n-") - diff.count("\n---") total_lines = added_lines + deleted_lines if total_lines > settings.max_modified_lines: logger.warning(f"修改行数超限: {total_lines} > {settings.max_modified_lines}") return False # 检查核心文件 critical_keywords = settings.get_critical_files() for file_path in modified_files: for keyword in critical_keywords: if keyword.lower() in file_path.lower(): logger.warning(f"修改了核心文件: {file_path}") return False return True def fix_single_bug( self, bug_id: int, run_tests: bool = True, ) -> FixResult: """修复单个 Bug(带多轮重试)""" bug = self.task_manager.get_bug_detail(bug_id) if not bug: return FixResult( bug_id=bug_id, success=False, message="Bug 不存在", ) # 从 API 获取项目配置(优先),回退到 .env project_info = self.task_manager.get_project_info(bug.project_id) project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id) github_repo = (project_info and project_info.get("repo_url")) or settings.get_github_repo(bug.project_id) if not project_path: return FixResult( bug_id=bug_id, success=False, message=f"未找到项目路径: {bug.project_id}", ) self.task_manager.update_status(bug_id, BugStatus.FIXING) max_rounds = settings.max_retry_count git_manager = GitManager(project_path, github_repo=github_repo) if github_repo else GitManager(project_path) last_test_output = "" last_diff = "" for round_num in range(1, max_rounds + 1): logger.info(f"=== Bug #{bug_id} 第 {round_num}/{max_rounds} 轮修复 ===") # 调用 Claude if round_num == 1: success, output = self.claude_service.batch_fix_bugs([bug], project_path) else: success, output = self.claude_service.retry_fix_bugs( [bug], project_path, previous_diff=last_diff, previous_test_output=last_test_output, round_num=round_num, ) if not success: failure_reason = f"Claude CLI 执行失败: {output[:500]}" self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=round_num, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason) return FixResult(bug_id=bug_id, success=False, message=failure_reason) modified_files = git_manager.get_modified_files() diff = git_manager.get_diff() # 运行测试 test_result = None if run_tests: test_runner = TestRunner(project_path, bug.project_id) test_result = test_runner.run_full_suite() if not test_result.success: last_test_output = test_result.output last_diff = diff is_last_round = (round_num == max_rounds) failure_reason = f"测试未通过 (第 {round_num}/{max_rounds} 轮)" self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_result.output, test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED if is_last_round else BugStatus.FIXING, ) git_manager.reset_hard() if is_last_round: final_msg = f"经过 {max_rounds} 轮修复仍未通过测试" self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, final_msg) return FixResult(bug_id=bug_id, success=False, message=final_msg) logger.info(f"第 {round_num} 轮测试未通过,准备第 {round_num + 1} 轮重试...") continue # 测试通过 — 成功 self.task_manager.update_status(bug_id, BugStatus.FIXED) self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_result.output if test_result else "Tests skipped", test_passed=True, failure_reason=None, status=BugStatus.FIXED, ) return FixResult( bug_id=bug_id, success=True, message=f"修复成功 (第 {round_num} 轮)", modified_files=modified_files, diff=diff, ) # 不应到达这里,但做安全兜底 return FixResult(bug_id=bug_id, success=False, message="修复流程异常结束") def close(self): """关闭资源""" self.task_manager.close()