""" Claude Service - 调用 Claude Code CLI """ import subprocess from typing import Optional from loguru import logger from ..config import settings from ..models import Bug class ClaudeService: """通过 CLI 调用 Claude Code""" def __init__(self): self.cli_path = settings.claude_cli_path self.timeout = settings.claude_timeout def execute_prompt( self, prompt: str, cwd: str, allowed_tools: Optional[str] = None, ) -> tuple[bool, str]: """ 执行 Claude CLI 命令 Args: prompt: 提示词 cwd: 工作目录 allowed_tools: 可选,限制可用工具(逗号分隔)。 为 None 时配合 --dangerously-skip-permissions 允许所有工具。 Returns: (成功与否, 输出内容) """ try: cmd = [ self.cli_path, "-p", prompt, "--dangerously-skip-permissions", ] if allowed_tools: cmd.extend(["--allowedTools", allowed_tools]) logger.debug(f"执行 Claude CLI (cwd={cwd})") logger.debug(f"Prompt 长度: {len(prompt)} 字符") result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=self.timeout, ) output = result.stdout + result.stderr if result.returncode == 0: logger.info("Claude CLI 执行成功") logger.debug(f"输出前 500 字符: {output[:500]}") return True, output.strip() else: logger.error(f"Claude CLI 执行失败 (code={result.returncode}): {output[:500]}") return False, output.strip() except subprocess.TimeoutExpired: logger.error(f"Claude CLI 超时 ({self.timeout}秒)") return False, "执行超时" except FileNotFoundError: logger.error(f"Claude CLI 未找到: {self.cli_path}") return False, "Claude CLI 未安装" except Exception as e: logger.error(f"Claude CLI 执行异常: {e}") return False, str(e) def batch_fix_bugs( self, bugs: list[Bug], project_path: str, ) -> tuple[bool, str]: """ 批量修复多个 Bug Args: bugs: Bug 列表 project_path: 项目路径 Returns: (成功与否, Claude 输出) """ if not bugs: return False, "没有需要修复的 Bug" # 构造批量修复 Prompt prompt_parts = [ f"你是一个自动化 Bug 修复代理。请直接修复以下 {len(bugs)} 个 Bug。", "", "重要:你必须使用工具(Read、Edit、Write 等)直接修改源代码文件来修复 Bug。", "不要只是分析问题或给出建议,你需要实际定位文件并编辑代码。", "", ] for bug in bugs: prompt_parts.append(bug.format_for_prompt()) prompt_parts.append("") prompt_parts.extend([ "## 修复要求", "1. 先用 Grep/Glob 定位相关源代码文件", "2. 用 Read 读取文件内容,理解上下文", "3. 用 Edit 或 Write 直接修改代码来修复 Bug", "4. 每个 Bug 只做最小必要的改动", "5. 确保不破坏现有功能", "6. 修复完成后简要说明每个 Bug 的修复方式", "", "请立即开始修复,直接编辑文件。", ]) prompt = "\n".join(prompt_parts) logger.info(f"开始批量修复 {len(bugs)} 个 Bug...") return self.execute_prompt(prompt, project_path) def retry_fix_bugs( self, bugs: list[Bug], project_path: str, previous_diff: str, previous_test_output: str, round_num: int, ) -> tuple[bool, str]: """ 重试修复 Bug,带上次失败的上下文 Args: bugs: Bug 列表 project_path: 项目路径 previous_diff: 上轮修复产生的 diff previous_test_output: 上轮测试输出 round_num: 当前轮次 Returns: (成功与否, Claude 输出) """ prompt_parts = [ f"你是一个自动化 Bug 修复代理。这是第 {round_num} 次修复尝试。", "", "## 上次修复尝试失败", "", "上次你进行了如下代码修改:", "```diff", previous_diff[:3000], "```", "", "但是测试未通过,测试输出如下:", "```", previous_test_output[:3000], "```", "", "请分析上次修复失败的原因,避免同样的错误,重新修复以下 Bug:", "", ] for bug in bugs: prompt_parts.append(bug.format_for_prompt()) prompt_parts.append("") prompt_parts.extend([ "## 修复要求", "1. 先分析上次测试失败的原因", "2. 用 Grep/Glob 定位相关源代码文件", "3. 用 Read 读取文件内容,理解上下文", "4. 用 Edit 或 Write 直接修改代码来修复 Bug", "5. 每个 Bug 只做最小必要的改动", "6. 确保不破坏现有功能", "7. 修复完成后简要说明每个 Bug 的修复方式和与上次的区别", "", "请立即开始修复,直接编辑文件。", ]) prompt = "\n".join(prompt_parts) logger.info(f"开始第 {round_num} 轮修复 {len(bugs)} 个 Bug...") return self.execute_prompt(prompt, project_path) def analyze_bug(self, bug: Bug, project_path: str) -> tuple[bool, str]: """ 分析单个 Bug(不修复) Args: bug: Bug 对象 project_path: 项目路径 Returns: (成功与否, 分析结果) """ prompt = f""" 请分析以下 Bug,但不要修改任何代码: {bug.format_for_prompt()} 请分析: 1. 错误的根本原因 2. 建议的修复方案 3. 可能影响的其他文件 """ return self.execute_prompt(prompt, project_path, allowed_tools="Read,Grep,Glob") def review_changes(self, diff: str, project_path: str) -> tuple[bool, str]: """ 审核代码变更 Args: diff: Git diff 内容 project_path: 项目路径 Returns: (是否通过, 审核意见) """ prompt = f""" 请审核以下代码变更是否安全: ```diff {diff} ``` 审核要点: 1. 是否修复了目标问题 2. 是否引入了新的 Bug 3. 是否破坏了原有业务逻辑 4. 是否有安全风险 如果审核通过,回复 "APPROVED"。否则说明问题。 """ return self.execute_prompt(prompt, project_path, allowed_tools="Read,Grep,Glob")