""" Repair Agent CLI - 命令行入口 """ from typing import List, Optional import typer from loguru import logger from rich.console import Console from rich.table import Table from .agent import RepairEngine, TaskManager from .agent.scheduler import RepairScheduler from .config import settings app = typer.Typer( name="repair-agent", help="自动化 Bug 修复代理", ) console = Console() @app.command() def list( project: str = typer.Option(None, "--project", "-p", help="筛选项目ID"), ): """查看待修复的 Bug 列表""" task_manager = TaskManager() bugs = task_manager.fetch_pending_bugs(project) if not bugs: console.print("[yellow]没有待修复的 Bug[/yellow]") return table = Table(title="待修复 Bug 列表") table.add_column("ID", style="cyan") table.add_column("项目", style="green") table.add_column("错误类型", style="red") table.add_column("消息", style="white") table.add_column("文件", style="blue") table.add_column("状态", style="yellow") for bug in bugs: table.add_row( str(bug.id), bug.project_id, bug.error.type, bug.error.message[:50] + "..." if len(bug.error.message) > 50 else bug.error.message, bug.error.file_path or "-", bug.status.value, ) console.print(table) task_manager.close() @app.command() def fix( project: str = typer.Argument(..., help="项目ID (如 rtc_backend)"), test: bool = typer.Option(True, "--test/--no-test", help="是否运行测试"), commit: bool = typer.Option(False, "--commit", "-c", help="是否自动提交"), ): """修复指定项目的所有 Bug""" console.print(f"[bold blue]开始修复项目: {project}[/bold blue]") engine = RepairEngine() result = engine.fix_project( project_id=project, run_tests=test, auto_commit=commit, ) if result.total == 0: console.print("[yellow]没有需要修复的 Bug[/yellow]") else: console.print(f"\n[bold]修复结果:[/bold]") console.print(f" 总计: {result.total}") console.print(f" [green]成功: {result.success_count}[/green]") console.print(f" [red]失败: {result.failed_count}[/red]") if result.results: console.print("\n[bold]详细结果:[/bold]") for r in result.results: status = "[green]✓[/green]" if r.success else "[red]✗[/red]" console.print(f" {status} Bug #{r.bug_id}: {r.message}") if r.modified_files: console.print(f" 修改文件: {', '.join(r.modified_files)}") engine.close() @app.command("fix-one") def fix_one( bug_id: int = typer.Argument(..., help="Bug ID"), test: bool = typer.Option(True, "--test/--no-test", help="是否运行测试"), ): """修复单个 Bug""" console.print(f"[bold blue]开始修复 Bug #{bug_id}[/bold blue]") engine = RepairEngine() result = engine.fix_single_bug(bug_id, run_tests=test) if result.success: console.print(f"[green]✓ 修复成功![/green]") if result.modified_files: console.print(f" 修改文件: {', '.join(result.modified_files)}") else: console.print(f"[red]✗ 修复失败: {result.message}[/red]") engine.close() @app.command() def retry( project: str = typer.Option(None, "--project", "-p", help="筛选项目ID"), test: bool = typer.Option(True, "--test/--no-test", help="是否运行测试"), commit: bool = typer.Option(False, "--commit", "-c", help="是否自动提交"), ): """ 重新处理所有 FIX_FAILED 的 Bug。 流程:先分诊(triage)判断 Bug 是否为代码缺陷, 不可修复的标记为 CANNOT_REPRODUCE,可修复的重新尝试修复。 示例: python -m repair_agent retry # 处理所有 FIX_FAILED python -m repair_agent retry -p rtc_backend # 只处理 rtc_backend """ console.print("[bold blue]开始处理 FIX_FAILED Bug[/bold blue]") # 先展示待处理列表 task_manager = TaskManager() failed_bugs = task_manager.fetch_failed_bugs(project) task_manager.close() if not failed_bugs: console.print("[yellow]没有 FIX_FAILED 的 Bug[/yellow]") return table = Table(title="FIX_FAILED Bug 列表") table.add_column("ID", style="cyan") table.add_column("项目", style="green") table.add_column("错误类型", style="red") table.add_column("消息", style="white", max_width=50) table.add_column("文件", style="blue") for bug in failed_bugs: table.add_row( str(bug.id), bug.project_id, bug.error.type, bug.error.message[:50] + "..." if len(bug.error.message) > 50 else bug.error.message, bug.error.file_path or "-", ) console.print(table) console.print(f"\n共 {len(failed_bugs)} 个 Bug 待处理,开始分诊...\n") engine = RepairEngine() result = engine.retry_failed_project( project_id=project, run_tests=test, auto_commit=commit, ) console.print(f"\n[bold]处理结果:[/bold]") console.print(f" 总计: {result.total}") console.print(f" [green]成功: {result.success_count}[/green]") console.print(f" [red]失败: {result.failed_count}[/red]") if result.results: console.print("\n[bold]详细结果:[/bold]") for r in result.results: status_icon = "[green]✓[/green]" if r.success else "[red]✗[/red]" console.print(f" {status_icon} Bug #{r.bug_id}: {r.message}") if r.modified_files: console.print(f" 修改文件: {', '.join(r.modified_files)}") engine.close() @app.command() def status(): """查看配置状态""" console.print("[bold]Repair Agent 配置状态[/bold]\n") console.print(f"Log Center URL: [cyan]{settings.log_center_url}[/cyan]") console.print(f"Claude CLI: [cyan]{settings.claude_cli_path}[/cyan]") console.print(f"最大重试次数: [cyan]{settings.max_retry_count}[/cyan]") console.print(f"最大修改行数: [cyan]{settings.max_modified_lines}[/cyan]") console.print(f"最大修改文件数: [cyan]{settings.max_modified_files}[/cyan]") console.print(f"核心文件关键词: [cyan]{settings.critical_files}[/cyan]") console.print("\n[bold]项目配置:[/bold]") for pid in ["rtc_backend", "rtc_web", "airhub_app"]: path = settings.get_project_path(pid) repo = settings.get_github_repo(pid) git_status = f"[green]{repo}[/green]" if repo else "[yellow]未配置(Git 已禁用)[/yellow]" console.print(f" {pid}:") console.print(f" 路径: [blue]{path}[/blue]") console.print(f" GitHub: {git_status}") @app.command() def analyze( bug_id: int = typer.Argument(..., help="Bug ID"), ): """分析单个 Bug(不修复)""" engine = RepairEngine() bug = engine.task_manager.get_bug_detail(bug_id) if not bug: console.print(f"[red]Bug #{bug_id} 不存在[/red]") return project_path = settings.get_project_path(bug.project_id) if not project_path: console.print(f"[red]未找到项目路径: {bug.project_id}[/red]") return console.print(f"[bold blue]分析 Bug #{bug_id}[/bold blue]\n") console.print(bug.format_for_prompt()) console.print("\n[bold]AI 分析结果:[/bold]") success, output = engine.claude_service.analyze_bug(bug, project_path) if success: console.print(output) else: console.print(f"[red]分析失败: {output}[/red]") engine.close() @app.command() def watch( interval: int = typer.Option( None, "--interval", "-i", help="扫描间隔(秒),默认读取 WATCH_INTERVAL 环境变量或 3600", ), project: Optional[List[str]] = typer.Option( None, "--project", "-p", help="监控的项目ID,可多次指定,默认监控所有已配置项目", ), test: bool = typer.Option(True, "--test/--no-test", help="修复后是否运行测试"), commit: bool = typer.Option(False, "--commit", "-c", help="是否自动提交代码"), ): """ 启动定时扫描守护进程,每隔固定时间扫描新 Bug 并自动修复。 示例: python -m repair_agent watch # 默认每小时扫描所有项目 python -m repair_agent watch -i 1800 # 每 30 分钟扫描 python -m repair_agent watch -p rtc_backend # 只监控 rtc_backend python -m repair_agent watch -p rtc_backend -p rtc_web -c # 监控两个项目并自动提交 """ scan_interval = interval or settings.watch_interval projects = list(project) if project else None console.print("[bold blue]启动 Repair Scheduler[/bold blue]") console.print(f" 扫描间隔: [cyan]{scan_interval}s[/cyan]") console.print(f" 监控项目: [cyan]{', '.join(projects) if projects else '全部已配置项目'}[/cyan]") console.print(f" 运行测试: [cyan]{test}[/cyan]") console.print(f" 自动提交: [cyan]{commit}[/cyan]") console.print("\n[dim]按 Ctrl+C 停止[/dim]\n") scheduler = RepairScheduler( interval=scan_interval, run_tests=test, auto_commit=commit, projects=projects, ) scheduler.start() def main(): """入口函数""" app() if __name__ == "__main__": main()