""" Core Engine - 核心修复引擎 """ import os import re import subprocess from datetime import datetime from typing import Optional from loguru import logger from ..config import settings from ..models import Bug, BugStatus, FixResult, BatchFixResult, RepairReport from .task_manager import TaskManager from .git_manager import GitManager from .claude_service import ClaudeService def run_repair_test_file(project_path: str, test_file: str, timeout: int = 120) -> str: """ 运行 Claude 生成的测试文件,返回测试输出。 Args: project_path: 项目根目录 test_file: 测试文件名(如 repair_test_bug_28.py) timeout: 超时秒数 Returns: 测试输出文本(包含命令和结果) """ test_path = os.path.join(project_path, test_file) if not os.path.exists(test_path): logger.warning(f"测试文件不存在: {test_path}") return "" # 检测项目类型,选择运行方式 manage_py = os.path.join(project_path, "manage.py") if os.path.exists(manage_py): # Django 项目:用 --pattern 精确匹配,只运行该文件中的测试 cmd = [ "python", "manage.py", "test", "--pattern", test_file, "--top-level-directory", ".", "--keepdb", "-v", "2", ] else: # 其他项目:直接用 pytest 运行指定文件 cmd = ["python", "-m", "pytest", test_file, "-v"] cmd_str = " ".join(cmd) logger.info(f"运行测试: {cmd_str}") try: result = subprocess.run( cmd, cwd=project_path, capture_output=True, text=True, timeout=timeout, ) output = f"$ {cmd_str}\n{result.stdout}{result.stderr}" if result.returncode == 0: logger.info(f"测试通过: {test_file}") else: logger.warning(f"测试失败 (returncode={result.returncode}): {test_file}") return output.strip() except subprocess.TimeoutExpired: logger.error(f"测试超时 ({timeout}s): {test_file}") return f"$ {cmd_str}\n测试执行超时 ({timeout}秒)" except Exception as e: logger.error(f"测试执行异常: {e}") return f"$ {cmd_str}\n测试执行异常: {e}" def cleanup_repair_test_file(project_path: str, test_file: str): """删除 Claude 生成的临时测试文件""" test_path = os.path.join(project_path, test_file) try: if os.path.exists(test_path): os.remove(test_path) logger.debug(f"已清理测试文件: {test_file}") except Exception as e: logger.warning(f"清理测试文件失败: {e}") class RepairEngine: """核心修复引擎 - 编排整个修复流程""" def __init__(self): self.task_manager = TaskManager() self.claude_service = ClaudeService() def fix_project( self, project_id: str, run_tests: bool = True, auto_commit: bool = False, ) -> BatchFixResult: """ 修复指定项目的所有待修复 Bug Args: project_id: 项目ID run_tests: 是否运行测试 auto_commit: 是否自动提交 Returns: BatchFixResult """ logger.info(f"开始修复项目: {project_id}") # 从 API 获取项目配置(优先),回退到 .env project_info = self.task_manager.get_project_info(project_id) project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(project_id) api_repo_url = (project_info and project_info.get("repo_url")) or "" if not project_path: logger.error(f"未找到项目路径配置: {project_id}") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) # 获取待修复的 Bug bugs = self.task_manager.fetch_pending_bugs(project_id) if not bugs: logger.info("没有待修复的 Bug") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) logger.info(f"获取到 {len(bugs)} 个待修复 Bug") # 检查是否启用 Git(API 优先,回退 .env) github_repo = api_repo_url or settings.get_github_repo(project_id) git_enabled = bool(github_repo) git_manager = None if git_enabled: git_manager = GitManager(project_path, github_repo=github_repo) git_manager.pull() branch_name = f"fix/auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}" if auto_commit: git_manager.create_branch(branch_name) else: logger.info(f"项目 {project_id} 未配置仓库地址,跳过 Git 操作") # 更新所有 Bug 状态为 FIXING for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXING) # 多轮修复循环 max_rounds = settings.max_retry_count # 默认 3 results = [] last_test_output = "" last_diff = "" try: for round_num in range(1, max_rounds + 1): logger.info(f"=== 第 {round_num}/{max_rounds} 轮修复 ===") # Step 1: 调用 Claude 修复 if round_num == 1: success, output = self.claude_service.batch_fix_bugs(bugs, project_path) else: success, output = self.claude_service.retry_fix_bugs( bugs, project_path, previous_diff=last_diff, previous_test_output=last_test_output, round_num=round_num, ) if not success: # Claude CLI 本身执行失败,不重试 failure_reason = f"Claude CLI 执行失败: {output[:500]}" logger.error(f"{failure_reason} (round {round_num})") for bug in bugs: self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason) results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason)) break # Step 2: 获取变更 modified_files = [] diff = "" if git_manager: modified_files = git_manager.get_modified_files() diff = git_manager.get_diff() logger.info(f"第 {round_num} 轮修复完成,修改了 {len(modified_files)} 个文件") # Step 3: 安全检查(仅在 Git 启用时,不重试) if git_manager and not self._safety_check(modified_files, diff): failure_reason = "安全检查未通过" logger.warning(f"{failure_reason} (round {round_num})") git_manager.reset_hard() for bug in bugs: self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason) results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason)) break # Step 4: 运行 Claude 生成的测试文件 bug_ids_str = "_".join(str(b.id) for b in bugs) test_file = f"repair_test_bug_{bug_ids_str}.py" test_output = run_repair_test_file(project_path, test_file) test_passed = bool(test_output) and "FAILED" not in test_output and "Error" not in test_output.split("\n")[-5:].__repr__() if not test_output: test_output = "Claude 未生成测试文件" logger.warning(f"测试文件 {test_file} 不存在,跳过测试验证") # 清理临时测试文件 cleanup_repair_test_file(project_path, test_file) for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXED) self._upload_round_report( bug=bug, project_id=project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_output, test_passed=test_passed, failure_reason=None, status=BugStatus.FIXED, ) results.append(FixResult( bug_id=bug.id, success=True, message=f"修复成功 (第 {round_num} 轮)", modified_files=modified_files, diff=diff, )) # 自动提交、合并到 main 并推送(仅在 Git 启用时) if git_enabled and auto_commit and modified_files and git_manager: bug_ids = ", ".join([f"#{b.id}" for b in bugs]) git_manager.commit(f"fix: auto repair bugs {bug_ids}") logger.info("代码已提交") git_manager.push() logger.info("fix 分支已推送") # 合并 fix 分支到 main 并推送,触发 CI/CD if git_manager.merge_to_main_and_push(): logger.info("已合并到 main 并推送") else: logger.warning("合并到 main 失败,请手动合并") elif not git_enabled and auto_commit: logger.info("未配置 GitHub 仓库,跳过自动提交") break # 成功,退出循环 except Exception as e: # 兜底:标记为 FIX_FAILED,防止死循环(可通过 retry 命令重新处理) failure_reason = f"修复流程异常终止: {str(e)[:500]}" logger.exception(failure_reason) for bug in bugs: if bug.id not in {r.bug_id for r in results}: self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason) results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason)) success_count = sum(1 for r in results if r.success) return BatchFixResult( project_id=project_id, total=len(bugs), success_count=success_count, failed_count=len(bugs) - success_count, results=results, ) def retry_failed_project( self, project_id: Optional[str] = None, run_tests: bool = True, auto_commit: bool = False, ) -> BatchFixResult: """ 处理 FIX_FAILED 状态的 Bug:先分诊,再修复。 流程: 1. 获取所有 FIX_FAILED Bug 2. 逐个分诊(triage):判断是否为可修复的代码缺陷 3. 不可修复的标记为 CANNOT_REPRODUCE 4. 可修复的重置为 PENDING_FIX 后调用 fix_project 修复 """ logger.info(f"开始处理 FIX_FAILED Bug{f' (项目: {project_id})' if project_id else ''}") failed_bugs = self.task_manager.fetch_failed_bugs(project_id) if not failed_bugs: logger.info("没有 FIX_FAILED 的 Bug") return BatchFixResult( project_id=project_id or "all", total=0, success_count=0, failed_count=0, results=[], ) results: list[FixResult] = [] bugs_to_fix: dict[str, list[Bug]] = {} # project_id → bugs # Step 1: 逐个分诊 for bug in failed_bugs: logger.info(f"分诊 Bug #{bug.id} ({bug.error.type}: {bug.error.message[:60]})") project_info = self.task_manager.get_project_info(bug.project_id) project_path = ( (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id) ) if not project_path: logger.warning(f"Bug #{bug.id}: 未找到项目路径 {bug.project_id},跳过") results.append(FixResult( bug_id=bug.id, success=False, message=f"未找到项目路径: {bug.project_id}", )) continue # 调用 Claude 分诊 self.task_manager.update_status(bug.id, BugStatus.VERIFYING) success, output = self.claude_service.triage_bug(bug, project_path) if not success: logger.warning(f"Bug #{bug.id}: 分诊执行失败,保留 FIX_FAILED") self.task_manager.update_status( bug.id, BugStatus.FIX_FAILED, f"分诊失败: {output[:200]}" ) results.append(FixResult( bug_id=bug.id, success=False, message="分诊执行失败", )) continue # 解析判决 if "VERDICT:CANNOT_REPRODUCE" in output: logger.info(f"Bug #{bug.id}: 判定为无法复现") self.task_manager.update_status( bug.id, BugStatus.CANNOT_REPRODUCE, "AI 分诊判定:非代码缺陷或无法复现", ) self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=0, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason="AI 分诊:无法复现", status=BugStatus.CANNOT_REPRODUCE, ) results.append(FixResult( bug_id=bug.id, success=True, message="标记为 CANNOT_REPRODUCE", )) elif "VERDICT:FIX" in output: logger.info(f"Bug #{bug.id}: 判定为可修复,加入修复队列") self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX) bugs_to_fix.setdefault(bug.project_id, []).append(bug) else: logger.warning(f"Bug #{bug.id}: 分诊输出无 VERDICT 标记,默认加入修复队列") self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX) bugs_to_fix.setdefault(bug.project_id, []).append(bug) # Step 2: 按项目批量修复 for pid, bugs in bugs_to_fix.items(): logger.info(f"开始修复项目 {pid} 的 {len(bugs)} 个 Bug") fix_result = self.fix_project( project_id=pid, run_tests=run_tests, auto_commit=auto_commit, ) results.extend(fix_result.results) success_count = sum(1 for r in results if r.success) total = len(failed_bugs) return BatchFixResult( project_id=project_id or "all", total=total, success_count=success_count, failed_count=total - success_count, results=results, ) def _upload_round_report( self, bug: Bug, project_id: str, round_num: int, ai_analysis: str, diff: str, modified_files: list[str], test_output: str, test_passed: bool, failure_reason: Optional[str], status: BugStatus, ): """上传某一轮的修复报告""" try: report = RepairReport( error_log_id=bug.id, status=status, project_id=project_id, ai_analysis=ai_analysis, fix_plan="See AI Analysis", code_diff=diff, modified_files=modified_files, test_output=test_output, test_passed=test_passed, repair_round=round_num, failure_reason=failure_reason, ) self.task_manager.upload_report(report) except Exception as e: logger.error(f"上传第 {round_num} 轮报告失败: {e}") def _safety_check(self, modified_files: list[str], diff: str) -> bool: """ 安全检查 Args: modified_files: 修改的文件列表 diff: Git diff Returns: 是否通过检查 """ # 检查修改文件数量 if len(modified_files) > settings.max_modified_files: logger.warning(f"修改文件数超限: {len(modified_files)} > {settings.max_modified_files}") return False # 检查修改行数 added_lines = diff.count("\n+") - diff.count("\n+++") deleted_lines = diff.count("\n-") - diff.count("\n---") total_lines = added_lines + deleted_lines if total_lines > settings.max_modified_lines: logger.warning(f"修改行数超限: {total_lines} > {settings.max_modified_lines}") return False # 检查核心文件 critical_keywords = settings.get_critical_files() for file_path in modified_files: for keyword in critical_keywords: if keyword.lower() in file_path.lower(): logger.warning(f"修改了核心文件: {file_path}") return False return True def fix_single_bug( self, bug_id: int, run_tests: bool = True, ) -> FixResult: """修复单个 Bug(带多轮重试)""" bug = self.task_manager.get_bug_detail(bug_id) if not bug: return FixResult( bug_id=bug_id, success=False, message="Bug 不存在", ) # 从 API 获取项目配置(优先),回退到 .env project_info = self.task_manager.get_project_info(bug.project_id) project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id) github_repo = (project_info and project_info.get("repo_url")) or settings.get_github_repo(bug.project_id) if not project_path: return FixResult( bug_id=bug_id, success=False, message=f"未找到项目路径: {bug.project_id}", ) self.task_manager.update_status(bug_id, BugStatus.FIXING) max_rounds = settings.max_retry_count git_manager = GitManager(project_path, github_repo=github_repo) if github_repo else GitManager(project_path) last_test_output = "" last_diff = "" try: for round_num in range(1, max_rounds + 1): logger.info(f"=== Bug #{bug_id} 第 {round_num}/{max_rounds} 轮修复 ===") # 调用 Claude if round_num == 1: success, output = self.claude_service.batch_fix_bugs([bug], project_path) else: success, output = self.claude_service.retry_fix_bugs( [bug], project_path, previous_diff=last_diff, previous_test_output=last_test_output, round_num=round_num, ) if not success: failure_reason = f"Claude CLI 执行失败: {output[:500]}" self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=round_num, ai_analysis=output, diff="", modified_files=[], test_output="", test_passed=False, failure_reason=failure_reason, status=BugStatus.FIX_FAILED, ) self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason) return FixResult(bug_id=bug_id, success=False, message=failure_reason) modified_files = git_manager.get_modified_files() diff = git_manager.get_diff() # 运行 Claude 生成的测试文件 test_file = f"repair_test_bug_{bug_id}.py" test_output = run_repair_test_file(project_path, test_file) test_passed = bool(test_output) and "FAILED" not in test_output if not test_output: test_output = "Claude 未生成测试文件" logger.warning(f"测试文件 {test_file} 不存在,跳过测试验证") # 清理临时测试文件 cleanup_repair_test_file(project_path, test_file) self.task_manager.update_status(bug_id, BugStatus.FIXED) self._upload_round_report( bug=bug, project_id=bug.project_id, round_num=round_num, ai_analysis=output, diff=diff, modified_files=modified_files, test_output=test_output, test_passed=test_passed, failure_reason=None, status=BugStatus.FIXED, ) return FixResult( bug_id=bug_id, success=True, message=f"修复成功 (第 {round_num} 轮)", modified_files=modified_files, diff=diff, ) except Exception as e: # 兜底:标记为 FIX_FAILED,防止死循环(可通过 retry 命令重新处理) failure_reason = f"修复流程异常终止: {str(e)[:500]}" logger.exception(failure_reason) self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason) return FixResult(bug_id=bug_id, success=False, message=failure_reason) # 不应到达这里,但做安全兜底 return FixResult(bug_id=bug_id, success=False, message="修复流程异常结束") def close(self): """关闭资源""" self.task_manager.close()