zyc b178d24e73
Some checks failed
Build and Deploy Log Center / build-and-deploy (push) Failing after 5m9s
fix pr
2026-02-25 16:35:28 +08:00

632 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Core Engine - 核心修复引擎
"""
import os
import re
import subprocess
from datetime import datetime
from typing import Optional
from loguru import logger
from ..config import settings
from ..models import Bug, BugStatus, FixResult, BatchFixResult, RepairReport
from .task_manager import TaskManager
from .git_manager import GitManager
from .claude_service import ClaudeService
def run_repair_test_file(project_path: str, test_file: str, timeout: int = 120) -> str:
"""
运行 Claude 生成的测试文件,返回测试输出。
Args:
project_path: 项目根目录
test_file: 测试文件名(如 repair_test_bug_28.py
timeout: 超时秒数
Returns:
测试输出文本(包含命令和结果)
"""
test_path = os.path.join(project_path, test_file)
if not os.path.exists(test_path):
logger.warning(f"测试文件不存在: {test_path}")
return ""
# 检测项目类型,选择运行方式
manage_py = os.path.join(project_path, "manage.py")
if os.path.exists(manage_py):
# Django 项目:用 --pattern 精确匹配,只运行该文件中的测试
cmd = [
"python", "manage.py", "test",
"--pattern", test_file,
"--top-level-directory", ".",
"--keepdb", "-v", "2",
]
else:
# 其他项目:直接用 pytest 运行指定文件
cmd = ["python", "-m", "pytest", test_file, "-v"]
cmd_str = " ".join(cmd)
logger.info(f"运行测试: {cmd_str}")
try:
result = subprocess.run(
cmd,
cwd=project_path,
capture_output=True,
text=True,
timeout=timeout,
)
output = f"$ {cmd_str}\n{result.stdout}{result.stderr}"
if result.returncode == 0:
logger.info(f"测试通过: {test_file}")
else:
logger.warning(f"测试失败 (returncode={result.returncode}): {test_file}")
return output.strip()
except subprocess.TimeoutExpired:
logger.error(f"测试超时 ({timeout}s): {test_file}")
return f"$ {cmd_str}\n测试执行超时 ({timeout}秒)"
except Exception as e:
logger.error(f"测试执行异常: {e}")
return f"$ {cmd_str}\n测试执行异常: {e}"
def cleanup_repair_test_file(project_path: str, test_file: str):
"""删除 Claude 生成的临时测试文件"""
test_path = os.path.join(project_path, test_file)
try:
if os.path.exists(test_path):
os.remove(test_path)
logger.debug(f"已清理测试文件: {test_file}")
except Exception as e:
logger.warning(f"清理测试文件失败: {e}")
class RepairEngine:
"""核心修复引擎 - 编排整个修复流程"""
def __init__(self):
self.task_manager = TaskManager()
self.claude_service = ClaudeService()
def fix_project(
self,
project_id: str,
run_tests: bool = True,
auto_commit: bool = False,
) -> BatchFixResult:
"""
修复指定项目的所有待修复 Bug
Args:
project_id: 项目ID
run_tests: 是否运行测试
auto_commit: 是否自动提交
Returns:
BatchFixResult
"""
logger.info(f"开始修复项目: {project_id}")
# 从 API 获取项目配置(优先),回退到 .env
project_info = self.task_manager.get_project_info(project_id)
project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(project_id)
api_repo_url = (project_info and project_info.get("repo_url")) or ""
if not project_path:
logger.error(f"未找到项目路径配置: {project_id}")
return BatchFixResult(
project_id=project_id,
total=0,
success_count=0,
failed_count=0,
results=[],
)
# 获取待修复的 Bug
bugs = self.task_manager.fetch_pending_bugs(project_id)
if not bugs:
logger.info("没有待修复的 Bug")
return BatchFixResult(
project_id=project_id,
total=0,
success_count=0,
failed_count=0,
results=[],
)
logger.info(f"获取到 {len(bugs)} 个待修复 Bug")
# 检查是否启用 GitAPI 优先,回退 .env
github_repo = api_repo_url or settings.get_github_repo(project_id)
git_enabled = bool(github_repo)
git_manager = None
if git_enabled:
git_manager = GitManager(project_path, github_repo=github_repo)
git_manager.pull()
branch_name = f"fix/auto-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
if auto_commit:
git_manager.create_branch(branch_name)
else:
logger.info(f"项目 {project_id} 未配置仓库地址,跳过 Git 操作")
# 更新所有 Bug 状态为 FIXING
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIXING)
# 多轮修复循环
max_rounds = settings.max_retry_count # 默认 3
results = []
last_test_output = ""
last_diff = ""
try:
for round_num in range(1, max_rounds + 1):
logger.info(f"=== 第 {round_num}/{max_rounds} 轮修复 ===")
# Step 1: 调用 Claude 修复
if round_num == 1:
success, output = self.claude_service.batch_fix_bugs(bugs, project_path)
else:
success, output = self.claude_service.retry_fix_bugs(
bugs, project_path,
previous_diff=last_diff,
previous_test_output=last_test_output,
round_num=round_num,
)
if not success:
# Claude CLI 本身执行失败,不重试
failure_reason = f"Claude CLI 执行失败: {output[:500]}"
logger.error(f"{failure_reason} (round {round_num})")
self._upload_round_report(
bugs=bugs, project_id=project_id, round_num=round_num,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason=failure_reason,
status=BugStatus.FIX_FAILED,
)
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason)
results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason))
break
# Step 2: 获取变更
modified_files = []
diff = ""
if git_manager:
modified_files = git_manager.get_modified_files()
diff = git_manager.get_diff()
logger.info(f"{round_num} 轮修复完成,修改了 {len(modified_files)} 个文件")
# Step 3: 安全检查(软性警告,不阻断修复流程)
if git_manager and not self._safety_check(modified_files, diff):
logger.warning(f"安全检查警告 (round {round_num}):修改涉及核心文件或超出限制,继续修复流程")
# Step 4: 运行 Claude 生成的测试文件
bug_ids_str = "_".join(str(b.id) for b in bugs)
test_file = f"repair_test_bug_{bug_ids_str}.py"
test_output = run_repair_test_file(project_path, test_file)
test_passed = bool(test_output) and "FAILED" not in test_output and "Error" not in test_output.split("\n")[-5:].__repr__()
if not test_output:
test_output = "Claude 未生成测试文件"
logger.warning(f"测试文件 {test_file} 不存在,跳过测试验证")
# 没有测试文件时视为通过(无法验证)
test_passed = True
# 清理临时测试文件
cleanup_repair_test_file(project_path, test_file)
# Step 5: 测试失败 → 记录本轮结果,进入下一轮重试
if not test_passed:
logger.warning(f"{round_num} 轮测试未通过,{'进入下一轮重试' if round_num < max_rounds else '已达最大轮次'}")
last_test_output = test_output
last_diff = diff
self._upload_round_report(
bugs=bugs, project_id=project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_output, test_passed=False,
failure_reason=f"{round_num} 轮测试失败",
status=BugStatus.FIXING,
)
# 重置代码变更,让下一轮从干净状态开始
if git_manager:
git_manager.reset_hard()
# 最后一轮还失败 → 标记为 FIX_FAILED
if round_num == max_rounds:
failure_reason = f"经过 {max_rounds} 轮修复,测试仍未通过"
for bug in bugs:
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason)
results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason))
continue # 进入下一轮
# Step 6: 判断是否需要强制 PR 审核severity >= 8 的 bug 必须走 PR
max_severity = max((b.severity or 0) for b in bugs)
needs_pr_review = max_severity >= 8
if needs_pr_review:
logger.info(f"批次中存在高严重等级 bug (max severity={max_severity}),强制走 PR 审核流程")
# Step 7: 测试通过 → 提交代码并创建 PR
pr_info = None
if git_enabled and auto_commit and modified_files and git_manager:
bug_ids = ", ".join([f"#{b.id}" for b in bugs])
git_manager.commit(f"fix: auto repair bugs {bug_ids}")
logger.info("代码已提交")
git_manager.push()
logger.info("fix 分支已推送")
# 创建 PR
bug_summary = "\n".join([
f"- #{b.id}: {b.error.type} - {b.error.message[:80]}"
for b in bugs
])
severity_note = f"\n\n⚠️ **包含高严重等级 Bug (max={max_severity}/10),需人工审核**" if needs_pr_review else ""
pr_title = f"fix: auto repair bugs {bug_ids}"
pr_body = f"## 自动修复报告\n\n修复的 Bug:\n{bug_summary}{severity_note}\n\n修改文件:\n" + \
"\n".join([f"- `{f}`" for f in modified_files])
success, pr_info = git_manager.create_pr(pr_title, pr_body)
if success and pr_info:
logger.info(f"PR 创建成功: {pr_info['pr_url']}")
else:
logger.warning("创建 PR 失败,修复已提交到 fix 分支")
elif not git_enabled and auto_commit:
logger.info("未配置仓库地址,跳过自动提交")
# 根据是否有 PR 或是否需要强制审核来决定 Bug 状态
# 高严重等级 bug即使 PR 创建失败也不能直接标记 FIXED保持 PENDING_FIX 等待人工处理
if pr_info:
final_status = BugStatus.PENDING_FIX
elif needs_pr_review:
final_status = BugStatus.PENDING_FIX
logger.warning("高严重等级 bug PR 创建失败,状态设为 PENDING_FIX 等待人工处理")
else:
final_status = BugStatus.FIXED
# 上传一条批量修复报告(含 PR 信息)
self._upload_round_report(
bugs=bugs, project_id=project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_output,
test_passed=True,
failure_reason=None,
status=final_status,
pr_url=pr_info["pr_url"] if pr_info else None,
pr_number=pr_info["pr_number"] if pr_info else None,
branch_name=pr_info["branch_name"] if pr_info else None,
)
for bug in bugs:
self.task_manager.update_status(bug.id, final_status)
# 回写 PR 信息到 Bug
if pr_info:
self.task_manager.update_pr_info(
bug.id,
pr_info["pr_number"],
pr_info["pr_url"],
pr_info["branch_name"],
)
results.append(FixResult(
bug_id=bug.id, success=True,
message=f"修复成功 (第 {round_num} 轮)" + (f", PR #{pr_info['pr_number']}" if pr_info else ""),
modified_files=modified_files, diff=diff,
))
break # 测试通过,退出循环
except Exception as e:
# 兜底:标记为 FIX_FAILED防止死循环可通过 retry 命令重新处理)
failure_reason = f"修复流程异常终止: {str(e)[:500]}"
logger.exception(failure_reason)
for bug in bugs:
if bug.id not in {r.bug_id for r in results}:
self.task_manager.update_status(bug.id, BugStatus.FIX_FAILED, failure_reason)
results.append(FixResult(bug_id=bug.id, success=False, message=failure_reason))
success_count = sum(1 for r in results if r.success)
return BatchFixResult(
project_id=project_id,
total=len(bugs),
success_count=success_count,
failed_count=len(bugs) - success_count,
results=results,
)
def retry_failed_project(
self,
project_id: Optional[str] = None,
run_tests: bool = True,
auto_commit: bool = False,
) -> BatchFixResult:
"""
处理 FIX_FAILED 状态的 Bug先分诊再修复。
流程:
1. 获取所有 FIX_FAILED Bug
2. 逐个分诊triage判断是否为可修复的代码缺陷
3. 不可修复的标记为 CANNOT_REPRODUCE
4. 可修复的重置为 PENDING_FIX 后调用 fix_project 修复
"""
logger.info(f"开始处理 FIX_FAILED Bug{f' (项目: {project_id})' if project_id else ''}")
failed_bugs = self.task_manager.fetch_failed_bugs(project_id)
if not failed_bugs:
logger.info("没有 FIX_FAILED 的 Bug")
return BatchFixResult(
project_id=project_id or "all",
total=0, success_count=0, failed_count=0, results=[],
)
results: list[FixResult] = []
bugs_to_fix: dict[str, list[Bug]] = {} # project_id → bugs
# Step 1: 逐个分诊
for bug in failed_bugs:
logger.info(f"分诊 Bug #{bug.id} ({bug.error.type}: {bug.error.message[:60]})")
project_info = self.task_manager.get_project_info(bug.project_id)
project_path = (
(project_info and project_info.get("local_path"))
or settings.get_project_path(bug.project_id)
)
if not project_path:
logger.warning(f"Bug #{bug.id}: 未找到项目路径 {bug.project_id},跳过")
results.append(FixResult(
bug_id=bug.id, success=False,
message=f"未找到项目路径: {bug.project_id}",
))
continue
# 调用 Claude 分诊
self.task_manager.update_status(bug.id, BugStatus.VERIFYING)
success, output = self.claude_service.triage_bug(bug, project_path)
if not success:
logger.warning(f"Bug #{bug.id}: 分诊执行失败,保留 FIX_FAILED")
self.task_manager.update_status(
bug.id, BugStatus.FIX_FAILED, f"分诊失败: {output[:200]}"
)
results.append(FixResult(
bug_id=bug.id, success=False, message="分诊执行失败",
))
continue
# 解析判决
if "VERDICT:CANNOT_REPRODUCE" in output:
logger.info(f"Bug #{bug.id}: 判定为无法复现")
self.task_manager.update_status(
bug.id, BugStatus.CANNOT_REPRODUCE,
"AI 分诊判定:非代码缺陷或无法复现",
)
self._upload_round_report(
bugs=[bug], project_id=bug.project_id, round_num=0,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason="AI 分诊:无法复现",
status=BugStatus.CANNOT_REPRODUCE,
)
results.append(FixResult(
bug_id=bug.id, success=True,
message="标记为 CANNOT_REPRODUCE",
))
elif "VERDICT:FIX" in output:
logger.info(f"Bug #{bug.id}: 判定为可修复,加入修复队列")
self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX)
bugs_to_fix.setdefault(bug.project_id, []).append(bug)
else:
logger.warning(f"Bug #{bug.id}: 分诊输出无 VERDICT 标记,默认加入修复队列")
self.task_manager.update_status(bug.id, BugStatus.PENDING_FIX)
bugs_to_fix.setdefault(bug.project_id, []).append(bug)
# Step 2: 按项目批量修复
for pid, bugs in bugs_to_fix.items():
logger.info(f"开始修复项目 {pid}{len(bugs)} 个 Bug")
fix_result = self.fix_project(
project_id=pid,
run_tests=run_tests,
auto_commit=auto_commit,
)
results.extend(fix_result.results)
success_count = sum(1 for r in results if r.success)
total = len(failed_bugs)
return BatchFixResult(
project_id=project_id or "all",
total=total,
success_count=success_count,
failed_count=total - success_count,
results=results,
)
def _upload_round_report(
self,
bugs: list[Bug],
project_id: str,
round_num: int,
ai_analysis: str,
diff: str,
modified_files: list[str],
test_output: str,
test_passed: bool,
failure_reason: Optional[str],
status: BugStatus,
pr_url: Optional[str] = None,
pr_number: Optional[int] = None,
branch_name: Optional[str] = None,
):
"""上传某一轮的修复报告(一次批量修复 = 一条报告)"""
try:
report = RepairReport(
error_log_ids=[b.id for b in bugs],
status=status,
project_id=project_id,
ai_analysis=ai_analysis,
fix_plan="See AI Analysis",
code_diff=diff,
modified_files=modified_files,
test_output=test_output,
test_passed=test_passed,
repair_round=round_num,
failure_reason=failure_reason,
pr_url=pr_url,
pr_number=pr_number,
branch_name=branch_name,
)
self.task_manager.upload_report(report)
except Exception as e:
logger.error(f"上传第 {round_num} 轮报告失败: {e}")
def _safety_check(self, modified_files: list[str], diff: str) -> bool:
"""
安全检查
Args:
modified_files: 修改的文件列表
diff: Git diff
Returns:
是否通过检查
"""
# 检查修改文件数量
if len(modified_files) > settings.max_modified_files:
logger.warning(f"修改文件数超限: {len(modified_files)} > {settings.max_modified_files}")
return False
# 检查修改行数
added_lines = diff.count("\n+") - diff.count("\n+++")
deleted_lines = diff.count("\n-") - diff.count("\n---")
total_lines = added_lines + deleted_lines
if total_lines > settings.max_modified_lines:
logger.warning(f"修改行数超限: {total_lines} > {settings.max_modified_lines}")
return False
# 检查核心文件
critical_keywords = settings.get_critical_files()
for file_path in modified_files:
for keyword in critical_keywords:
if keyword.lower() in file_path.lower():
logger.warning(f"修改了核心文件: {file_path}")
return False
return True
def fix_single_bug(
self,
bug_id: int,
run_tests: bool = True,
) -> FixResult:
"""修复单个 Bug带多轮重试"""
bug = self.task_manager.get_bug_detail(bug_id)
if not bug:
return FixResult(
bug_id=bug_id,
success=False,
message="Bug 不存在",
)
# 从 API 获取项目配置(优先),回退到 .env
project_info = self.task_manager.get_project_info(bug.project_id)
project_path = (project_info and project_info.get("local_path")) or settings.get_project_path(bug.project_id)
github_repo = (project_info and project_info.get("repo_url")) or settings.get_github_repo(bug.project_id)
if not project_path:
return FixResult(
bug_id=bug_id,
success=False,
message=f"未找到项目路径: {bug.project_id}",
)
self.task_manager.update_status(bug_id, BugStatus.FIXING)
max_rounds = settings.max_retry_count
git_manager = GitManager(project_path, github_repo=github_repo) if github_repo else GitManager(project_path)
last_test_output = ""
last_diff = ""
try:
for round_num in range(1, max_rounds + 1):
logger.info(f"=== Bug #{bug_id}{round_num}/{max_rounds} 轮修复 ===")
# 调用 Claude
if round_num == 1:
success, output = self.claude_service.batch_fix_bugs([bug], project_path)
else:
success, output = self.claude_service.retry_fix_bugs(
[bug], project_path,
previous_diff=last_diff,
previous_test_output=last_test_output,
round_num=round_num,
)
if not success:
failure_reason = f"Claude CLI 执行失败: {output[:500]}"
self._upload_round_report(
bugs=[bug], project_id=bug.project_id, round_num=round_num,
ai_analysis=output, diff="", modified_files=[],
test_output="", test_passed=False,
failure_reason=failure_reason, status=BugStatus.FIX_FAILED,
)
self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason)
return FixResult(bug_id=bug_id, success=False, message=failure_reason)
modified_files = git_manager.get_modified_files()
diff = git_manager.get_diff()
# 运行 Claude 生成的测试文件
test_file = f"repair_test_bug_{bug_id}.py"
test_output = run_repair_test_file(project_path, test_file)
test_passed = bool(test_output) and "FAILED" not in test_output
if not test_output:
test_output = "Claude 未生成测试文件"
logger.warning(f"测试文件 {test_file} 不存在,跳过测试验证")
# 清理临时测试文件
cleanup_repair_test_file(project_path, test_file)
self.task_manager.update_status(bug_id, BugStatus.FIXED)
self._upload_round_report(
bugs=[bug], project_id=bug.project_id, round_num=round_num,
ai_analysis=output, diff=diff, modified_files=modified_files,
test_output=test_output,
test_passed=test_passed, failure_reason=None, status=BugStatus.FIXED,
)
return FixResult(
bug_id=bug_id, success=True,
message=f"修复成功 (第 {round_num} 轮)",
modified_files=modified_files, diff=diff,
)
except Exception as e:
# 兜底:标记为 FIX_FAILED防止死循环可通过 retry 命令重新处理)
failure_reason = f"修复流程异常终止: {str(e)[:500]}"
logger.exception(failure_reason)
self.task_manager.update_status(bug_id, BugStatus.FIX_FAILED, failure_reason)
return FixResult(bug_id=bug_id, success=False, message=failure_reason)
# 不应到达这里,但做安全兜底
return FixResult(bug_id=bug_id, success=False, message="修复流程异常结束")
def close(self):
"""关闭资源"""
self.task_manager.close()