zyc d58ca4b131
All checks were successful
Build and Deploy Log Center / build-and-deploy (push) Successful in 4m15s
feat(repair-agent): add triage and retry flow for FIX_FAILED bugs
- Add fetch_failed_bugs() to task_manager
- Add triage_bug() to claude_service for AI-based bug classification
- Add retry_failed_project() to core with triage→fix pipeline
- Add retry CLI command to __main__.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:01:27 +08:00

513 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Core Engine - 核心修复引擎
"""
from datetime import datetime
from typing import Optional
from loguru import logger
from ..config import settings
from ..models import Bug, BugStatus, FixResult, BatchFixResult, RepairReport
from .task_manager import TaskManager
from .git_manager import GitManager
from .claude_service import ClaudeService
from .test_runner import TestRunner
class RepairEngine:
"""核心修复引擎 - 编排整个修复流程"""
def __init__(self):
self.task_manager = TaskManager()
self.claude_service = ClaudeService()
def fix_project(
self,
project_id: str,
run_tests: bool = True,
auto_commit: bool = False,
) -> BatchFixResult:
"""
修复指定项目的所有待修复 Bug
Args:
project_id: 项目ID
run_tests: 是否运行测试
auto_commit: 是否自动提交
Returns:
BatchFixResult
"""
logger.info(f"开始修复项目: {project_id}")
# 从 API 获取项目配置(优先),回退到 .env
project_info = self.task_manager.get_project_info(project_id)
project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(project_id)
api_repo_url = (project_info and project_info.get("repo_url")) or ""
if not project_path:
logger.error(f"未找到项目路径配置: {project_id}")
return BatchFixResult(
project_id=project_id,
total=0,
success_count=0,
failed_count=0,
results=[],
)
# 获取待修复的 Bug
bugs = self.task_manager.fetch_pending_bugs(project_id)
if not bugs:
logger.info("没有待修复的 Bug")
return BatchFixResult(
project_id=project_id,
total=0,
success_count=0,
failed_count=0,
results=[],
)
logger.info(f"获取到 {len(bugs)} 个待修复 Bug")
# 检查是否启用 GitAPI 优先,回退 .env
github_repo = api_repo_url or settings.get_github_repo(project_id)
git_enabled = bool(github_repo)
git_manager = None
if git_enabled:
git_manager = GitManager(project_path, github_repo=github_repo)
git_manager.pull()
branch_name = f"fix/auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
if auto_commit:
git_manager.create_branch(branch_name)
else:
logger.info(f"项目 {project_id} 未配置仓库地址,跳过 Git 操作")
# 更新所有 Bug 状态为 FIXING
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIXING)
# 多轮修复循环
max_rounds = settings.max_retry_count # 默认 3
results = []
last_test_output = ""
last_diff = ""
for round_num in range(1, max_rounds + 1):
logger.info(f"=== 第 {round_num}/{max_rounds} 轮修复 ===")
# Step 1: 调用 Claude 修复
if round_num == 1:
success, output = self.claude_service.batch_fix_bugs(bugs, project_path)
else:
success, output = self.claude_service.retry_fix_bugs(
bugs, project_path,
previous_diff=last_diff,
previous_test_output=last_test_output,
round_num=round_num,
)
if not success:
# Claude CLI 本身执行失败,不重试
failure_reason = f"Claude CLI 执行失败: {output[:500]}"
logger.error(f"{failure_reason} (round {round_num})")
for bug in bugs:
self._upload_round_report(
bug=bug, project_id=project_id, round_num=round_num,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason=failure_reason,
status=BugStatus.FIX_FAILED,
)
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason)
results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason))
break
# Step 2: 获取变更
modified_files = []
diff = ""
if git_manager:
modified_files = git_manager.get_modified_files()
diff = git_manager.get_diff()
logger.info(f"{round_num} 轮修复完成,修改了 {len(modified_files)} 个文件")
# Step 3: 安全检查(仅在 Git 启用时,不重试)
if git_manager and not self._safety_check(modified_files, diff):
failure_reason = "安全检查未通过"
logger.warning(f"{failure_reason} (round {round_num})")
git_manager.reset_hard()
for bug in bugs:
self._upload_round_report(
bug=bug, project_id=project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output="", test_passed=False,
failure_reason=failure_reason,
status=BugStatus.FIX_FAILED,
)
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason)
results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason))
break
# Step 4: 运行测试
test_result = None
if run_tests:
test_runner = TestRunner(project_path, project_id)
test_result = test_runner.run_full_suite()
if not test_result.success:
last_test_output = test_result.output
last_diff = diff
is_last_round = (round_num == max_rounds)
failure_reason = f"测试未通过 (第 {round_num}/{max_rounds} 轮)"
# 上传本轮报告
for bug in bugs:
self._upload_round_report(
bug=bug, project_id=project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_result.output, test_passed=False,
failure_reason=failure_reason,
status=BugStatus.FIX_FAILED if is_last_round else BugStatus.FIXING,
)
# 回滚准备下一轮或最终失败
if git_manager:
git_manager.reset_hard()
if is_last_round:
final_msg = f"经过 {max_rounds} 轮修复仍未通过测试"
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, final_msg)
results.append(FixResult(bug_id=bug.id, success=False, message=final_msg))
else:
logger.info(f"{round_num} 轮测试未通过,准备第 {round_num + 1} 轮重试...")
continue # 进入下一轮
# Step 5: 测试通过(或跳过测试)— 成功!
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIXED)
self._upload_round_report(
bug=bug, project_id=project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_result.output if test_result else "Tests skipped",
test_passed=test_result.success if test_result else True,
failure_reason=None,
status=BugStatus.FIXED,
)
results.append(FixResult(
bug_id=bug.id, success=True,
message=f"修复成功 (第 {round_num} 轮)",
modified_files=modified_files, diff=diff,
))
# 自动提交(仅在 Git 启用时)
if git_enabled and auto_commit and modified_files and git_manager:
bug_ids = ", ".join([f"#{b.id}" for b in bugs])
git_manager.commit(f"fix: auto repair bugs {bug_ids}")
logger.info("代码已提交")
elif not git_enabled and auto_commit:
logger.info("未配置 GitHub 仓库,跳过自动提交")
break # 成功,退出循环
success_count = sum(1 for r in results if r.success)
return BatchFixResult(
project_id=project_id,
total=len(bugs),
success_count=success_count,
failed_count=len(bugs) - success_count,
results=results,
)
def retry_failed_project(
self,
project_id: Optional[str] = None,
run_tests: bool = True,
auto_commit: bool = False,
) -> BatchFixResult:
"""
处理 FIX_FAILED 状态的 Bug先分诊再修复。
流程:
1. 获取所有 FIX_FAILED Bug
2. 逐个分诊triage判断是否为可修复的代码缺陷
3. 不可修复的标记为 CANNOT_REPRODUCE
4. 可修复的重置为 PENDING_FIX 后调用 fix_project 修复
"""
logger.info(f"开始处理 FIX_FAILED Bug{f' (项目: {project_id})' if project_id else ''}")
failed_bugs = self.task_manager.fetch_failed_bugs(project_id)
if not failed_bugs:
logger.info("没有 FIX_FAILED 的 Bug")
return BatchFixResult(
project_id=project_id or "all",
total=0, success_count=0, failed_count=0, results=[],
)
results: list[FixResult] = []
bugs_to_fix: dict[str, list[Bug]] = {} # project_id → bugs
# Step 1: 逐个分诊
for bug in failed_bugs:
logger.info(f"分诊 Bug #{bug.id} ({bug.error.type}: {bug.error.message[:60]})")
project_info = self.task_manager.get_project_info(bug.project_id)
project_path = (
(project_info and project_info.get("local_path"))
or settings.get_project_path(bug.project_id)
)
if not project_path:
logger.warning(f"Bug #{bug.id}: 未找到项目路径 {bug.project_id},跳过")
results.append(FixResult(
bug_id=bug.id, success=False,
message=f"未找到项目路径: {bug.project_id}",
))
continue
# 调用 Claude 分诊
self.task_manager.update_status(bug.id, BugStatus.VERIFYING)
success, output = self.claude_service.triage_bug(bug, project_path)
if not success:
logger.warning(f"Bug #{bug.id}: 分诊执行失败,保留 FIX_FAILED")
self.task_manager.update_status(
bug.id, BugStatus.FIX_FAILED, f"分诊失败: {output[:200]}"
)
results.append(FixResult(
bug_id=bug.id, success=False, message="分诊执行失败",
))
continue
# 解析判决
if "VERDICT:CANNOT_REPRODUCE" in output:
logger.info(f"Bug #{bug.id}: 判定为无法复现")
self.task_manager.update_status(
bug.id, BugStatus.CANNOT_REPRODUCE,
"AI 分诊判定:非代码缺陷或无法复现",
)
self._upload_round_report(
bug=bug, project_id=bug.project_id, round_num=0,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason="AI 分诊:无法复现",
status=BugStatus.CANNOT_REPRODUCE,
)
results.append(FixResult(
bug_id=bug.id, success=True,
message="标记为 CANNOT_REPRODUCE",
))
elif "VERDICT:FIX" in output:
logger.info(f"Bug #{bug.id}: 判定为可修复,加入修复队列")
self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX)
bugs_to_fix.setdefault(bug.project_id, []).append(bug)
else:
logger.warning(f"Bug #{bug.id}: 分诊输出无 VERDICT 标记,默认加入修复队列")
self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX)
bugs_to_fix.setdefault(bug.project_id, []).append(bug)
# Step 2: 按项目批量修复
for pid, bugs in bugs_to_fix.items():
logger.info(f"开始修复项目 {pid}{len(bugs)} 个 Bug")
fix_result = self.fix_project(
project_id=pid,
run_tests=run_tests,
auto_commit=auto_commit,
)
results.extend(fix_result.results)
success_count = sum(1 for r in results if r.success)
total = len(failed_bugs)
return BatchFixResult(
project_id=project_id or "all",
total=total,
success_count=success_count,
failed_count=total - success_count,
results=results,
)
def _upload_round_report(
self,
bug: Bug,
project_id: str,
round_num: int,
ai_analysis: str,
diff: str,
modified_files: list[str],
test_output: str,
test_passed: bool,
failure_reason: Optional[str],
status: BugStatus,
):
"""上传某一轮的修复报告"""
try:
report = RepairReport(
error_log_id=bug.id,
status=status,
project_id=project_id,
ai_analysis=ai_analysis,
fix_plan="See AI Analysis",
code_diff=diff,
modified_files=modified_files,
test_output=test_output,
test_passed=test_passed,
repair_round=round_num,
failure_reason=failure_reason,
)
self.task_manager.upload_report(report)
except Exception as e:
logger.error(f"上传第 {round_num} 轮报告失败: {e}")
def _safety_check(self, modified_files: list[str], diff: str) -> bool:
"""
安全检查
Args:
modified_files: 修改的文件列表
diff: Git diff
Returns:
是否通过检查
"""
# 检查修改文件数量
if len(modified_files) > settings.max_modified_files:
logger.warning(f"修改文件数超限: {len(modified_files)} > {settings.max_modified_files}")
return False
# 检查修改行数
added_lines = diff.count("\n+") - diff.count("\n+++")
deleted_lines = diff.count("\n-") - diff.count("\n---")
total_lines = added_lines + deleted_lines
if total_lines > settings.max_modified_lines:
logger.warning(f"修改行数超限: {total_lines} > {settings.max_modified_lines}")
return False
# 检查核心文件
critical_keywords = settings.get_critical_files()
for file_path in modified_files:
for keyword in critical_keywords:
if keyword.lower() in file_path.lower():
logger.warning(f"修改了核心文件: {file_path}")
return False
return True
def fix_single_bug(
self,
bug_id: int,
run_tests: bool = True,
) -> FixResult:
"""修复单个 Bug带多轮重试"""
bug = self.task_manager.get_bug_detail(bug_id)
if not bug:
return FixResult(
bug_id=bug_id,
success=False,
message="Bug 不存在",
)
# 从 API 获取项目配置(优先),回退到 .env
project_info = self.task_manager.get_project_info(bug.project_id)
project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id)
github_repo = (project_info and project_info.get("repo_url")) or settings.get_github_repo(bug.project_id)
if not project_path:
return FixResult(
bug_id=bug_id,
success=False,
message=f"未找到项目路径: {bug.project_id}",
)
self.task_manager.update_status(bug_id, BugStatus.FIXING)
max_rounds = settings.max_retry_count
git_manager = GitManager(project_path, github_repo=github_repo) if github_repo else GitManager(project_path)
last_test_output = ""
last_diff = ""
for round_num in range(1, max_rounds + 1):
logger.info(f"=== Bug #{bug_id}{round_num}/{max_rounds} 轮修复 ===")
# 调用 Claude
if round_num == 1:
success, output = self.claude_service.batch_fix_bugs([bug], project_path)
else:
success, output = self.claude_service.retry_fix_bugs(
[bug], project_path,
previous_diff=last_diff,
previous_test_output=last_test_output,
round_num=round_num,
)
if not success:
failure_reason = f"Claude CLI 执行失败: {output[:500]}"
self._upload_round_report(
bug=bug, project_id=bug.project_id, round_num=round_num,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason=failure_reason, status=BugStatus.FIX_FAILED,
)
self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason)
return FixResult(bug_id=bug_id, success=False, message=failure_reason)
modified_files = git_manager.get_modified_files()
diff = git_manager.get_diff()
# 运行测试
test_result = None
if run_tests:
test_runner = TestRunner(project_path, bug.project_id)
test_result = test_runner.run_full_suite()
if not test_result.success:
last_test_output = test_result.output
last_diff = diff
is_last_round = (round_num == max_rounds)
failure_reason = f"测试未通过 (第 {round_num}/{max_rounds} 轮)"
self._upload_round_report(
bug=bug, project_id=bug.project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_result.output, test_passed=False,
failure_reason=failure_reason,
status=BugStatus.FIX_FAILED if is_last_round else BugStatus.FIXING,
)
git_manager.reset_hard()
if is_last_round:
final_msg = f"经过 {max_rounds} 轮修复仍未通过测试"
self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, final_msg)
return FixResult(bug_id=bug_id, success=False, message=final_msg)
logger.info(f"{round_num} 轮测试未通过,准备第 {round_num + 1} 轮重试...")
continue
# 测试通过 — 成功
self.task_manager.update_status(bug_id, BugStatus.FIXED)
self._upload_round_report(
bug=bug, project_id=bug.project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_result.output if test_result else "Tests skipped",
test_passed=True, failure_reason=None, status=BugStatus.FIXED,
)
return FixResult(
bug_id=bug_id, success=True,
message=f"修复成功 (第 {round_num} 轮)",
modified_files=modified_files, diff=diff,
)
# 不应到达这里,但做安全兜底
return FixResult(bug_id=bug_id, success=False, message="修复流程异常结束")
def close(self):
"""关闭资源"""
self.task_manager.close()