""" Core Engine - 核心修复引擎 """ from datetime import datetime from typing import Optional from loguru import logger from ..config import settings from ..models import Bug, BugStatus, FixResult, BatchFixResult, RepairReport from .task_manager import TaskManager from .git_manager import GitManager from .claude_service import ClaudeService from .test_runner import TestRunner class RepairEngine: """核心修复引擎 - 编排整个修复流程""" def __init__(self): self.task_manager = TaskManager() self.claude_service = ClaudeService() def fix_project( self, project_id: str, run_tests: bool = True, auto_commit: bool = False, ) -> BatchFixResult: """ 修复指定项目的所有待修复 Bug Args: project_id: 项目ID run_tests: 是否运行测试 auto_commit: 是否自动提交 Returns: BatchFixResult """ logger.info(f"开始修复项目: {project_id}") # 获取项目路径 project_path = settings.get_project_path(project_id) if not project_path: logger.error(f"未找到项目路径配置: {project_id}") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) # 获取待修复的 Bug bugs = self.task_manager.fetch_pending_bugs(project_id) if not bugs: logger.info("没有待修复的 Bug") return BatchFixResult( project_id=project_id, total=0, success_count=0, failed_count=0, results=[], ) logger.info(f"获取到 {len(bugs)} 个待修复 Bug") # 初始化 Git 管理器 git_manager = GitManager(project_path) # 拉取最新代码 git_manager.pull() # 创建修复分支 branch_name = f"fix/auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}" if auto_commit: git_manager.create_branch(branch_name) # 更新所有 Bug 状态为 FIXING for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXING) # 批量修复 success, output = self.claude_service.batch_fix_bugs(bugs, project_path) results = [] if success: # 获取修改的文件 modified_files = git_manager.get_modified_files() diff = git_manager.get_diff() logger.info(f"修复完成,修改了 {len(modified_files)} 个文件") # 安全检查 if not self._safety_check(modified_files, diff): logger.warning("安全检查未通过,回滚更改") git_manager.reset_hard() for bug in bugs: self.task_manager.update_status( bug.id, BugStatus.FIX_FAILED, "安全检查未通过" ) results.append(FixResult( bug_id=bug.id, success=False, message="安全检查未通过", )) return BatchFixResult( project_id=project_id, total=len(bugs), success_count=0, failed_count=len(bugs), results=results, ) # 运行测试 test_result = None if run_tests: test_runner = TestRunner(project_path, project_id) test_result = test_runner.run_full_suite() if not test_result.success: logger.error("测试未通过,回滚更改") git_manager.reset_hard() for bug in bugs: self.task_manager.update_status( bug.id, BugStatus.FIX_FAILED, f"测试未通过: {test_result.output[:200]}" ) results.append(FixResult( bug_id=bug.id, success=False, message="测试未通过", )) return BatchFixResult( project_id=project_id, total=len(bugs), success_count=0, failed_count=len(bugs), results=results, ) # 标记成功并上传报告 for bug in bugs: self.task_manager.update_status(bug.id, BugStatus.FIXED) # 上传修复报告 try: report = RepairReport( error_log_id=bug.id, status=BugStatus.FIXED, project_id=project_id, ai_analysis=output, fix_plan="See AI Analysis", code_diff=diff, modified_files=modified_files, test_output=test_result.output if test_result else "Tests skipped", test_passed=test_result.success if test_result else True ) self.task_manager.upload_report(report) except Exception as e: logger.error(f"上传报告失败: {e}") results.append(FixResult( bug_id=bug.id, success=True, message="修复成功", modified_files=modified_files, diff=diff, )) # 自动提交 if auto_commit and modified_files: bug_ids = ", ".join([f"#{b.id}" for b in bugs]) git_manager.commit(f"fix: auto repair bugs {bug_ids}") logger.info("代码已提交") else: logger.error(f"修复失败: {output}") for bug in bugs: self.task_manager.update_status( bug.id, BugStatus.FIX_FAILED, output[:200] ) results.append(FixResult( bug_id=bug.id, success=False, message=output[:200], )) success_count = sum(1 for r in results if r.success) return BatchFixResult( project_id=project_id, total=len(bugs), success_count=success_count, failed_count=len(bugs) - success_count, results=results, ) def _safety_check(self, modified_files: list[str], diff: str) -> bool: """ 安全检查 Args: modified_files: 修改的文件列表 diff: Git diff Returns: 是否通过检查 """ # 检查修改文件数量 if len(modified_files) > settings.max_modified_files: logger.warning(f"修改文件数超限: {len(modified_files)} > {settings.max_modified_files}") return False # 检查修改行数 added_lines = diff.count("\n+") - diff.count("\n+++") deleted_lines = diff.count("\n-") - diff.count("\n---") total_lines = added_lines + deleted_lines if total_lines > settings.max_modified_lines: logger.warning(f"修改行数超限: {total_lines} > {settings.max_modified_lines}") return False # 检查核心文件 critical_keywords = settings.get_critical_files() for file_path in modified_files: for keyword in critical_keywords: if keyword.lower() in file_path.lower(): logger.warning(f"修改了核心文件: {file_path}") return False return True def fix_single_bug( self, bug_id: int, run_tests: bool = True, ) -> FixResult: """修复单个 Bug""" bug = self.task_manager.get_bug_detail(bug_id) if not bug: return FixResult( bug_id=bug_id, success=False, message="Bug 不存在", ) project_path = settings.get_project_path(bug.project_id) if not project_path: return FixResult( bug_id=bug_id, success=False, message=f"未找到项目路径: {bug.project_id}", ) # 单个 Bug 也使用批量修复接口 success, output = self.claude_service.batch_fix_bugs([bug], project_path) if success: git_manager = GitManager(project_path) modified_files = git_manager.get_modified_files() if run_tests: test_runner = TestRunner(project_path, bug.project_id) test_result = test_runner.run_full_suite() if not test_result.success: git_manager.reset_hard() self.task_manager.update_status( bug_id, BugStatus.FIX_FAILED, "测试未通过" ) return FixResult( bug_id=bug_id, success=False, message="测试未通过", ) self.task_manager.update_status(bug_id, BugStatus.FIXED) return FixResult( bug_id=bug_id, success=True, message="修复成功", modified_files=modified_files, ) else: self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, output[:200]) return FixResult( bug_id=bug_id, success=False, message=output[:200], ) def close(self): """关闭资源""" self.task_manager.close()