from fastapi import FastAPI, Depends, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel import select, func, text from sqlalchemy.exc import IntegrityError from .database import init_db, get_session, engine from .models import ErrorLog, ErrorLogCreate, LogStatus, TaskStatusUpdate, RepairTask, RepairTaskCreate, Project, ProjectUpdate from .gitea_client import GiteaClient from .self_report import self_report_error from datetime import datetime, timedelta from typing import Optional, List from pydantic import BaseModel import hashlib import json app = FastAPI(title="Log Center & AIOps Control Plane") # CORS for frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, restrict to your domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def on_startup(): await init_db() await _register_self_projects() async def _register_self_projects(): """启动时注册日志中台自身的项目信息。""" from sqlalchemy.orm import sessionmaker as sa_sessionmaker projects = [ { "project_id": "log_center_api", "name": "Log Center API", "repo_url": "https://gitea.airlabs.art/zyc/log-center.git", "local_path": "/Users/maidong/Desktop/zyc/qy_gitlab/log_center", "description": "日志中台 FastAPI 后端服务", }, { "project_id": "log_center_web", "name": "Log Center Web", "repo_url": "https://gitea.airlabs.art/zyc/log-center.git", "local_path": "/Users/maidong/Desktop/zyc/qy_gitlab/log_center", "description": "日志中台 React 管理端", }, ] async_session = sa_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with async_session() as session: for proj_data in projects: stmt = select(Project).where(Project.project_id == proj_data["project_id"]) result = await session.exec(stmt) existing = result.first() if not existing: session.add(Project(**proj_data)) else: # 更新元信息(仓库地址、路径等可能变更) for key, value in proj_data.items(): if key != "project_id": setattr(existing, key, value) existing.updated_at = datetime.utcnow() session.add(existing) await session.commit() @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """捕获所有未处理异常,上报到自身数据库后返回 500。""" await self_report_error(exc, context={ "url": str(request.url), "method": request.method, }) return JSONResponse( status_code=500, content={"detail": "Internal Server Error"}, ) def generate_fingerprint(log: ErrorLogCreate) -> str: source = log.source if source == "cicd": ctx = log.context or {} # 加入 error_message 避免同一 job 不同错误被去重 raw = f"{log.project_id}|cicd|{log.error.get('type')}|{ctx.get('job_name', 'unknown')}|{ctx.get('step_name', 'unknown')}|{log.error.get('message', '')}" elif source == "deployment": ctx = log.context or {} raw = f"{log.project_id}|deployment|{log.error.get('type')}|{ctx.get('namespace', 'default')}|{ctx.get('deployment_name', 'unknown')}" else: raw = f"{log.project_id}|{log.error.get('type')}|{log.error.get('file_path')}|{log.error.get('line_number')}" return hashlib.md5(raw.encode()).hexdigest() # ==================== Log Reporting ==================== @app.post("/api/v1/logs/report", tags=["Logs"]) async def report_log(log_data: ErrorLogCreate, session: AsyncSession = Depends(get_session)): fingerprint = generate_fingerprint(log_data) # Check deduplication statement = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint) results = await session.exec(statement) existing_log = results.first() if existing_log: # If exists and not resolved, update error content but keep status if existing_log.status not in [LogStatus.DEPLOYED, LogStatus.FIXED, LogStatus.VERIFIED]: # 更新错误内容,确保始终反映最新的错误信息 existing_log.error_message = log_data.error.get("message", existing_log.error_message) existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace) existing_log.context = log_data.context or existing_log.context existing_log.timestamp = log_data.timestamp or datetime.utcnow() if log_data.commit_hash: existing_log.commit_hash = log_data.commit_hash session.add(existing_log) await session.commit() await session.refresh(existing_log) return {"message": "Log deduplicated (content updated)", "id": existing_log.id, "status": existing_log.status} # If it was resolved but happened again -> Regression! Reset to NEW existing_log.status = LogStatus.NEW existing_log.error_message = log_data.error.get("message", existing_log.error_message) existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace) existing_log.context = log_data.context or existing_log.context existing_log.timestamp = log_data.timestamp or datetime.utcnow() existing_log.retry_count = 0 if log_data.commit_hash: existing_log.commit_hash = log_data.commit_hash session.add(existing_log) await session.commit() await session.refresh(existing_log) return {"message": "Regression detected, reopened", "id": existing_log.id} # Upsert Project record proj_stmt = select(Project).where(Project.project_id == log_data.project_id) proj_result = await session.exec(proj_stmt) project = proj_result.first() if not project: project = Project(project_id=log_data.project_id) if log_data.repo_url: project.repo_url = log_data.repo_url project.updated_at = datetime.utcnow() session.add(project) # Create new new_log = ErrorLog( project_id=log_data.project_id, environment=log_data.environment, level=log_data.level, source=log_data.source, error_type=log_data.error.get("type"), error_message=log_data.error.get("message"), file_path=log_data.error.get("file_path"), line_number=log_data.error.get("line_number"), stack_trace=log_data.error.get("stack_trace"), context=log_data.context, version=log_data.version, commit_hash=log_data.commit_hash, fingerprint=fingerprint, timestamp=log_data.timestamp or datetime.utcnow() ) session.add(new_log) try: await session.commit() except IntegrityError: await session.rollback() # Race condition: another request inserted the same fingerprint concurrently statement = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint) results = await session.exec(statement) existing_log = results.first() if existing_log: if existing_log.status not in [LogStatus.DEPLOYED, LogStatus.FIXED, LogStatus.VERIFIED]: existing_log.error_message = log_data.error.get("message", existing_log.error_message) existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace) existing_log.context = log_data.context or existing_log.context existing_log.timestamp = log_data.timestamp or datetime.utcnow() if log_data.commit_hash: existing_log.commit_hash = log_data.commit_hash session.add(existing_log) await session.commit() await session.refresh(existing_log) return {"message": "Log deduplicated (content updated)", "id": existing_log.id, "status": existing_log.status} existing_log.status = LogStatus.NEW existing_log.error_message = log_data.error.get("message", existing_log.error_message) existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace) existing_log.context = log_data.context or existing_log.context existing_log.timestamp = log_data.timestamp or datetime.utcnow() existing_log.retry_count = 0 if log_data.commit_hash: existing_log.commit_hash = log_data.commit_hash session.add(existing_log) await session.commit() await session.refresh(existing_log) return {"message": "Regression detected, reopened", "id": existing_log.id} raise await session.refresh(new_log) return {"message": "Log reported", "id": new_log.id} # ==================== Agent Tasks ==================== @app.get("/api/v1/tasks/pending", tags=["Tasks"]) async def get_pending_tasks(project_id: str = None, source: Optional[str] = None, session: AsyncSession = Depends(get_session)): query = select(ErrorLog).where(ErrorLog.status == LogStatus.NEW) if project_id: query = query.where(ErrorLog.project_id == project_id) if source: query = query.where(ErrorLog.source == source) results = await session.exec(query) return results.all() @app.put("/api/v1/tasks/{task_id}/status", tags=["Tasks"]) async def update_task_status( task_id: int, status_update: TaskStatusUpdate, session: AsyncSession = Depends(get_session) ): statement = select(ErrorLog).where(ErrorLog.id == task_id) results = await session.exec(statement) task = results.first() if not task: raise HTTPException(status_code=404, detail="Task not found") task.status = status_update.status if status_update.message and status_update.status == LogStatus.FIX_FAILED: task.failure_reason = status_update.message session.add(task) await session.commit() await session.refresh(task) return {"message": "Status updated", "id": task.id, "status": task.status} class PRInfoUpdate(BaseModel): pr_number: int pr_url: str branch_name: str class SeverityUpdate(BaseModel): severity: int severity_reason: Optional[str] = None @app.put("/api/v1/bugs/{bug_id}/pr-info", tags=["Tasks"]) async def update_bug_pr_info( bug_id: int, pr_info: PRInfoUpdate, session: AsyncSession = Depends(get_session), ): """repair agent 创建 PR 后回写 PR 信息""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") bug.pr_number = pr_info.pr_number bug.pr_url = pr_info.pr_url bug.branch_name = pr_info.branch_name session.add(bug) await session.commit() await session.refresh(bug) return {"message": "PR info updated", "bug_id": bug.id} @app.put("/api/v1/bugs/{bug_id}/severity", tags=["Tasks"]) async def update_bug_severity( bug_id: int, data: SeverityUpdate, session: AsyncSession = Depends(get_session), ): """repair agent 评估后回写 Bug 严重等级""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") bug.severity = data.severity bug.severity_reason = data.severity_reason session.add(bug) await session.commit() await session.refresh(bug) return {"message": "Severity updated", "bug_id": bug.id, "severity": bug.severity} class PRRejectRequest(BaseModel): """拒绝/驳回请求""" reason: str # ==================== Repair Reports ==================== @app.post("/api/v1/repair/reports", tags=["Repair"]) async def create_repair_report(report: RepairTaskCreate, session: AsyncSession = Depends(get_session)): """Upload a new repair report (one per batch, may cover multiple bugs)""" repair_task = RepairTask.from_orm(report) session.add(repair_task) # Update all related bugs' status and failure_reason if report.status in [LogStatus.FIXED, LogStatus.FIX_FAILED, LogStatus.PENDING_FIX, LogStatus.FIXING]: for bug_id in report.error_log_ids: log_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(log_stmt) error_log = results.first() if error_log: error_log.status = report.status if report.failure_reason and report.status == LogStatus.FIX_FAILED: error_log.failure_reason = report.failure_reason session.add(error_log) await session.commit() await session.refresh(repair_task) return {"message": "Report uploaded", "id": repair_task.id} @app.get("/api/v1/repair/reports", tags=["Repair"]) async def get_repair_reports( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), project_id: Optional[str] = None, error_log_id: Optional[int] = None, session: AsyncSession = Depends(get_session) ): """Get repair reports list, optionally filtered by project or bug""" query = select(RepairTask).order_by(RepairTask.created_at.desc()) if project_id: query = query.where(RepairTask.project_id == project_id) if error_log_id: # PostgreSQL JSONB contains: error_log_ids @> '[28]' query = query.where( text(f"error_log_ids @> '{json.dumps([error_log_id])}'::jsonb") ) offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) results = await session.exec(query) tasks = results.all() # Get total count_query = select(func.count(RepairTask.id)) if project_id: count_query = count_query.where(RepairTask.project_id == project_id) if error_log_id: count_query = count_query.where( text(f"error_log_ids @> '{json.dumps([error_log_id])}'::jsonb") ) count_result = await session.exec(count_query) total = count_result.one() return { "items": tasks, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } @app.get("/api/v1/repair/reports/{report_id}", tags=["Repair"]) async def get_repair_report_detail(report_id: int, session: AsyncSession = Depends(get_session)): """Get detailed repair report""" statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) task = results.first() if not task: raise HTTPException(status_code=404, detail="Report not found") return task @app.post("/api/v1/repair/reports/{report_id}/approve", tags=["Repair"]) async def approve_report(report_id: int, session: AsyncSession = Depends(get_session)): """ 批准修复报告:合并 PR(如有)并将所有关联 Bug 标记为 FIXED """ statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) report = results.first() if not report: raise HTTPException(status_code=404, detail="Report not found") if report.status != LogStatus.PENDING_FIX: raise HTTPException(status_code=400, detail=f"报告状态不是等待审核: {report.status}") # 如有 PR,调用 Gitea API 合并 if report.pr_url: gitea_client = GiteaClient() success, message = gitea_client.merge_pr_by_url(report.pr_url) if not success: raise HTTPException(status_code=502, detail=f"合并 PR 失败: {message}") # 更新报告状态 report.status = LogStatus.FIXED session.add(report) # 更新所有关联 Bug 状态 updated_ids = [] for bug_id in (report.error_log_ids or []): bug_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) bug_results = await session.exec(bug_stmt) bug = bug_results.first() if bug: bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) updated_ids.append(bug_id) await session.commit() return { "message": f"修复已批准,{len(updated_ids)} 个缺陷已更新", "report_id": report_id, "updated_bug_ids": updated_ids, } @app.post("/api/v1/repair/reports/{report_id}/reject", tags=["Repair"]) async def reject_report( report_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 驳回修复报告:关闭 PR(如有)并将所有关联 Bug 重置为 NEW """ statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) report = results.first() if not report: raise HTTPException(status_code=404, detail="Report not found") if report.status != LogStatus.PENDING_FIX: raise HTTPException(status_code=400, detail=f"报告状态不是等待审核: {report.status}") # 如有 PR,关闭 if report.pr_url: gitea_client = GiteaClient() success, message = gitea_client.close_pr_by_url(report.pr_url, request.reason) if not success: raise HTTPException(status_code=500, detail=f"关闭 PR 失败: {message}") # 更新报告状态 report.status = LogStatus.FIX_FAILED report.failure_reason = f"人工驳回: {request.reason}" session.add(report) # 重置所有关联 Bug 为 NEW updated_ids = [] rejection_info = json.dumps({ "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, "report_id": report_id, }, ensure_ascii=False) for bug_id in (report.error_log_ids or []): bug_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) bug_results = await session.exec(bug_stmt) bug = bug_results.first() if bug: bug.status = LogStatus.NEW bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() bug.rejection_reason = rejection_info session.add(bug) updated_ids.append(bug_id) await session.commit() return { "message": f"修复已驳回,{len(updated_ids)} 个缺陷将重新修复", "report_id": report_id, "updated_bug_ids": updated_ids, } # ==================== Dashboard APIs ==================== @app.get("/api/v1/dashboard/stats", tags=["Dashboard"]) async def get_dashboard_stats(source: Optional[str] = None, session: AsyncSession = Depends(get_session)): """Get overall statistics for dashboard""" today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) def _apply_source(q): return q.where(ErrorLog.source == source) if source else q # Total bugs total_query = _apply_source(select(func.count(ErrorLog.id))) total_result = await session.exec(total_query) total_bugs = total_result.one() # Today's new bugs today_query = _apply_source(select(func.count(ErrorLog.id)).where(ErrorLog.timestamp >= today)) today_result = await session.exec(today_query) today_bugs = today_result.one() # Count by status status_counts = {} for status in LogStatus: count_query = _apply_source(select(func.count(ErrorLog.id)).where(ErrorLog.status == status)) count_result = await session.exec(count_query) status_counts[status.value] = count_result.one() # 修复率 = (FIXED + VERIFIED + DEPLOYED + CANNOT_REPRODUCE) / Total resolved_count = ( status_counts.get("FIXED", 0) + status_counts.get("VERIFIED", 0) + status_counts.get("DEPLOYED", 0) + status_counts.get("CANNOT_REPRODUCE", 0) ) fix_rate = round((resolved_count / total_bugs * 100), 2) if total_bugs > 0 else 0 # Source distribution from .models import LogSource source_counts = {} for src in LogSource: sq = select(func.count(ErrorLog.id)).where(ErrorLog.source == src.value) sr = await session.exec(sq) source_counts[src.value] = sr.one() return { "total_bugs": total_bugs, "today_bugs": today_bugs, "fix_rate": fix_rate, "status_distribution": status_counts, "source_distribution": source_counts, } @app.get("/api/v1/bugs", tags=["Dashboard"]) async def get_bugs_list( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), status: Optional[LogStatus] = None, project_id: Optional[str] = None, source: Optional[str] = None, session: AsyncSession = Depends(get_session) ): """Get paginated list of bugs with optional filters""" query = select(ErrorLog).order_by(ErrorLog.timestamp.desc()) if status: query = query.where(ErrorLog.status == status) if project_id: query = query.where(ErrorLog.project_id == project_id) if source: query = query.where(ErrorLog.source == source) # Pagination offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) results = await session.exec(query) bugs = results.all() # Get total count for pagination info count_query = select(func.count(ErrorLog.id)) if status: count_query = count_query.where(ErrorLog.status == status) if project_id: count_query = count_query.where(ErrorLog.project_id == project_id) if source: count_query = count_query.where(ErrorLog.source == source) count_result = await session.exec(count_query) total = count_result.one() return { "items": bugs, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } @app.get("/api/v1/bugs/{bug_id}", tags=["Dashboard"]) async def get_bug_detail(bug_id: int, session: AsyncSession = Depends(get_session)): """Get detailed information about a specific bug""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") return bug # ==================== Project Management ==================== @app.get("/api/v1/projects", tags=["Projects"]) async def get_projects(session: AsyncSession = Depends(get_session)): """Get list of all projects with full info""" query = select(Project).order_by(Project.updated_at.desc()) results = await session.exec(query) projects = results.all() return {"projects": projects} @app.get("/api/v1/projects/{project_id}", tags=["Projects"]) async def get_project_detail(project_id: str, session: AsyncSession = Depends(get_session)): """Get single project detail""" statement = select(Project).where(Project.project_id == project_id) results = await session.exec(statement) project = results.first() if not project: raise HTTPException(status_code=404, detail="Project not found") return project @app.put("/api/v1/projects/{project_id}", tags=["Projects"]) async def update_project(project_id: str, data: ProjectUpdate, session: AsyncSession = Depends(get_session)): """Update project config (repo_url, local_path, name, description)""" statement = select(Project).where(Project.project_id == project_id) results = await session.exec(statement) project = results.first() if not project: raise HTTPException(status_code=404, detail="Project not found") update_data = data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(project, key, value) project.updated_at = datetime.utcnow() session.add(project) await session.commit() await session.refresh(project) return project @app.delete("/api/v1/projects/{project_id}", tags=["Projects"]) async def delete_project(project_id: str, session: AsyncSession = Depends(get_session)): """Delete a project by project_id""" statement = select(Project).where(Project.project_id == project_id) results = await session.exec(statement) project = results.first() if not project: raise HTTPException(status_code=404, detail="Project not found") await session.delete(project) await session.commit() return {"message": "Project deleted"} @app.get("/", tags=["Health"]) async def health_check(): return {"status": "ok"} # ==================== PR 操作 ==================== @app.post("/api/v1/bugs/{bug_id}/merge-pr", tags=["PR Operations"]) async def merge_pr( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 批准并合并 PR 流程: 1. 调用 Gitea API 合并 PR 2. 更新 Bug 状态 → MERGED """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"Bug 状态不是待修复,当前状态: {bug.status}" ) if not bug.pr_url: raise HTTPException(status_code=400, detail="Bug 没有关联的 PR") # 调用 Gitea API 合并 PR gitea_client = GiteaClient() success, message = gitea_client.merge_pr_by_url(bug.pr_url) if success: # 更新 Bug 状态 bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) await session.commit() await session.refresh(bug) return { "message": "PR 已合并", "pr_url": bug.pr_url, "bug_id": bug.id, "new_status": bug.status, } else: raise HTTPException(status_code=500, detail=f"合并失败: {message}") @app.post("/api/v1/bugs/{bug_id}/close-pr", tags=["PR Operations"]) async def close_pr( bug_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 拒绝修复并关闭 PR 流程: 1. 添加拒绝原因评论到 PR 2. 调用 Gitea API 关闭 PR 3. 更新 Bug 状态 → PENDING_FIX 4. 记录拒绝原因 5. Agent 会自动检测并重新修复 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"Bug 状态不是待修复,当前状态: {bug.status}" ) if not bug.pr_url: raise HTTPException(status_code=400, detail="Bug 没有关联的 PR") gitea_client = GiteaClient() # 关闭 PR(带拒绝原因) success, message = gitea_client.close_pr_by_url(bug.pr_url, request.reason) if not success: raise HTTPException(status_code=500, detail=f"关闭 PR 失败: {message}") # 更新 Bug 状态 bug.status = LogStatus.PENDING_FIX bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() # 记录拒绝原因 rejection_info = { "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, "previous_pr": { "pr_number": bug.pr_number, "pr_url": bug.pr_url, "branch": bug.branch_name, }, } bug.rejection_reason = json.dumps(rejection_info, ensure_ascii=False) session.add(bug) await session.commit() await session.refresh(bug) return { "message": "PR 已拒绝,Bug 将重新修复", "rejection_count": bug.rejection_count, "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/retry", tags=["PR Operations"]) async def retry_fix( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 重新尝试修复失败的 Bug 将 FIX_FAILED 状态的 Bug 重置为 NEW,让 repair agent 重新扫描 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.FIX_FAILED: raise HTTPException( status_code=400, detail=f"只能重试修复失败的 Bug,当前状态: {bug.status}" ) # 重置状态为 NEW bug.status = LogStatus.NEW # 清除失败原因,让 agent 重新分析 bug.failure_reason = None session.add(bug) await session.commit() await session.refresh(bug) return { "message": "Bug 已重置为新发现状态,repair agent 将重新扫描修复", "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/approve-fix", tags=["PR Operations"]) async def approve_fix( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 人工确认修复(不依赖 PR) 用于 PENDING_FIX 状态但没有关联 PR 的 Bug,人工审核后直接标记为 FIXED。 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"只能确认等待审核的 Bug,当前状态: {bug.status}" ) bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) await session.commit() await session.refresh(bug) return { "message": "修复已确认", "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/reject-fix", tags=["PR Operations"]) async def reject_fix( bug_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 人工驳回修复(不依赖 PR) 用于 PENDING_FIX 状态但没有关联 PR 的 Bug,驳回后重置为 NEW 让 agent 重新修复。 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"只能驳回等待审核的 Bug,当前状态: {bug.status}" ) bug.status = LogStatus.NEW bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() rejection_info = { "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, } bug.rejection_reason = json.dumps(rejection_info, ensure_ascii=False) session.add(bug) await session.commit() await session.refresh(bug) return { "message": "修复已驳回,Bug 将重新修复", "rejection_count": bug.rejection_count, "bug_id": bug.id, "new_status": bug.status, }