from fastapi import FastAPI, Depends, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel import select, func, text from .database import init_db, get_session, engine from .models import ErrorLog, ErrorLogCreate, LogStatus, TaskStatusUpdate, RepairTask, RepairTaskCreate, Project, ProjectUpdate from .gitea_client import GiteaClient from .self_report import self_report_error from datetime import datetime, timedelta from typing import Optional, List from pydantic import BaseModel import hashlib import json app = FastAPI(title="Log Center & AIOps Control Plane") # CORS for frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, restrict to your domain allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def on_startup(): await init_db() await _register_self_projects() async def _register_self_projects(): """启动时注册日志中台自身的项目信息。""" from sqlalchemy.orm import sessionmaker as sa_sessionmaker projects = [ { "project_id": "log_center_api", "name": "Log Center API", "repo_url": "https://gitea.airlabs.art/zyc/qy_gitlab.git", "local_path": "/Users/maidong/Desktop/zyc/qy_gitlab/log_center", "description": "日志中台 FastAPI 后端服务", }, { "project_id": "log_center_web", "name": "Log Center Web", "repo_url": "https://gitea.airlabs.art/zyc/qy_gitlab.git", "local_path": "/Users/maidong/Desktop/zyc/qy_gitlab/log_center/web", "description": "日志中台 React 管理端", }, ] async_session = sa_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with async_session() as session: for proj_data in projects: stmt = select(Project).where(Project.project_id == proj_data["project_id"]) result = await session.exec(stmt) existing = result.first() if not existing: session.add(Project(**proj_data)) else: # 更新元信息(仓库地址、路径等可能变更) for key, value in proj_data.items(): if key != "project_id": setattr(existing, key, value) existing.updated_at = datetime.utcnow() session.add(existing) await session.commit() @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): """捕获所有未处理异常,上报到自身数据库后返回 500。""" await self_report_error(exc, context={ "url": str(request.url), "method": request.method, }) return JSONResponse( status_code=500, content={"detail": "Internal Server Error"}, ) def generate_fingerprint(log: ErrorLogCreate) -> str: source = log.source if source == "cicd": ctx = log.context or {} raw = f"{log.project_id}|cicd|{log.error.get('type')}|{ctx.get('job_name', 'unknown')}|{ctx.get('step_name', 'unknown')}" elif source == "deployment": ctx = log.context or {} raw = f"{log.project_id}|deployment|{log.error.get('type')}|{ctx.get('namespace', 'default')}|{ctx.get('deployment_name', 'unknown')}" else: raw = f"{log.project_id}|{log.error.get('type')}|{log.error.get('file_path')}|{log.error.get('line_number')}" return hashlib.md5(raw.encode()).hexdigest() # ==================== Log Reporting ==================== @app.post("/api/v1/logs/report", tags=["Logs"]) async def report_log(log_data: ErrorLogCreate, session: AsyncSession = Depends(get_session)): fingerprint = generate_fingerprint(log_data) # Check deduplication statement = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint) results = await session.exec(statement) existing_log = results.first() if existing_log: # If exists and not resolved, just ignore or update count (implied) if existing_log.status not in [LogStatus.DEPLOYED, LogStatus.FIXED, LogStatus.VERIFIED]: return {"message": "Log deduplicated", "id": existing_log.id, "status": existing_log.status} # If it was resolved but happened again -> Regression! Reset to NEW? existing_log.status = LogStatus.NEW existing_log.timestamp = log_data.timestamp or datetime.utcnow() existing_log.retry_count = 0 # Reset retries for new occurrence session.add(existing_log) await session.commit() await session.refresh(existing_log) return {"message": "Regression detected, reopened", "id": existing_log.id} # Upsert Project record proj_stmt = select(Project).where(Project.project_id == log_data.project_id) proj_result = await session.exec(proj_stmt) project = proj_result.first() if not project: project = Project(project_id=log_data.project_id) if log_data.repo_url: project.repo_url = log_data.repo_url project.updated_at = datetime.utcnow() session.add(project) # Create new new_log = ErrorLog( project_id=log_data.project_id, environment=log_data.environment, level=log_data.level, source=log_data.source, error_type=log_data.error.get("type"), error_message=log_data.error.get("message"), file_path=log_data.error.get("file_path"), line_number=log_data.error.get("line_number"), stack_trace=log_data.error.get("stack_trace"), context=log_data.context, version=log_data.version, commit_hash=log_data.commit_hash, fingerprint=fingerprint, timestamp=log_data.timestamp or datetime.utcnow() ) session.add(new_log) await session.commit() await session.refresh(new_log) return {"message": "Log reported", "id": new_log.id} # ==================== Agent Tasks ==================== @app.get("/api/v1/tasks/pending", tags=["Tasks"]) async def get_pending_tasks(project_id: str = None, source: Optional[str] = None, session: AsyncSession = Depends(get_session)): query = select(ErrorLog).where(ErrorLog.status == LogStatus.NEW) if project_id: query = query.where(ErrorLog.project_id == project_id) if source: query = query.where(ErrorLog.source == source) results = await session.exec(query) return results.all() @app.put("/api/v1/tasks/{task_id}/status", tags=["Tasks"]) async def update_task_status( task_id: int, status_update: TaskStatusUpdate, session: AsyncSession = Depends(get_session) ): statement = select(ErrorLog).where(ErrorLog.id == task_id) results = await session.exec(statement) task = results.first() if not task: raise HTTPException(status_code=404, detail="Task not found") task.status = status_update.status if status_update.message and status_update.status == LogStatus.FIX_FAILED: task.failure_reason = status_update.message session.add(task) await session.commit() await session.refresh(task) return {"message": "Status updated", "id": task.id, "status": task.status} class PRInfoUpdate(BaseModel): pr_number: int pr_url: str branch_name: str class SeverityUpdate(BaseModel): severity: int severity_reason: Optional[str] = None @app.put("/api/v1/bugs/{bug_id}/pr-info", tags=["Tasks"]) async def update_bug_pr_info( bug_id: int, pr_info: PRInfoUpdate, session: AsyncSession = Depends(get_session), ): """repair agent 创建 PR 后回写 PR 信息""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") bug.pr_number = pr_info.pr_number bug.pr_url = pr_info.pr_url bug.branch_name = pr_info.branch_name session.add(bug) await session.commit() await session.refresh(bug) return {"message": "PR info updated", "bug_id": bug.id} @app.put("/api/v1/bugs/{bug_id}/severity", tags=["Tasks"]) async def update_bug_severity( bug_id: int, data: SeverityUpdate, session: AsyncSession = Depends(get_session), ): """repair agent 评估后回写 Bug 严重等级""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") bug.severity = data.severity bug.severity_reason = data.severity_reason session.add(bug) await session.commit() await session.refresh(bug) return {"message": "Severity updated", "bug_id": bug.id, "severity": bug.severity} class PRRejectRequest(BaseModel): """拒绝/驳回请求""" reason: str # ==================== Repair Reports ==================== @app.post("/api/v1/repair/reports", tags=["Repair"]) async def create_repair_report(report: RepairTaskCreate, session: AsyncSession = Depends(get_session)): """Upload a new repair report (one per batch, may cover multiple bugs)""" repair_task = RepairTask.from_orm(report) session.add(repair_task) # Update all related bugs' status and failure_reason if report.status in [LogStatus.FIXED, LogStatus.FIX_FAILED, LogStatus.PENDING_FIX, LogStatus.FIXING]: for bug_id in report.error_log_ids: log_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(log_stmt) error_log = results.first() if error_log: error_log.status = report.status if report.failure_reason and report.status == LogStatus.FIX_FAILED: error_log.failure_reason = report.failure_reason session.add(error_log) await session.commit() await session.refresh(repair_task) return {"message": "Report uploaded", "id": repair_task.id} @app.get("/api/v1/repair/reports", tags=["Repair"]) async def get_repair_reports( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), project_id: Optional[str] = None, error_log_id: Optional[int] = None, session: AsyncSession = Depends(get_session) ): """Get repair reports list, optionally filtered by project or bug""" query = select(RepairTask).order_by(RepairTask.created_at.desc()) if project_id: query = query.where(RepairTask.project_id == project_id) if error_log_id: # PostgreSQL JSONB contains: error_log_ids @> '[28]' query = query.where( text(f"error_log_ids @> '{json.dumps([error_log_id])}'::jsonb") ) offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) results = await session.exec(query) tasks = results.all() # Get total count_query = select(func.count(RepairTask.id)) if project_id: count_query = count_query.where(RepairTask.project_id == project_id) if error_log_id: count_query = count_query.where( text(f"error_log_ids @> '{json.dumps([error_log_id])}'::jsonb") ) count_result = await session.exec(count_query) total = count_result.one() return { "items": tasks, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } @app.get("/api/v1/repair/reports/{report_id}", tags=["Repair"]) async def get_repair_report_detail(report_id: int, session: AsyncSession = Depends(get_session)): """Get detailed repair report""" statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) task = results.first() if not task: raise HTTPException(status_code=404, detail="Report not found") return task @app.post("/api/v1/repair/reports/{report_id}/approve", tags=["Repair"]) async def approve_report(report_id: int, session: AsyncSession = Depends(get_session)): """ 批准修复报告:合并 PR(如有)并将所有关联 Bug 标记为 FIXED """ statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) report = results.first() if not report: raise HTTPException(status_code=404, detail="Report not found") if report.status != LogStatus.PENDING_FIX: raise HTTPException(status_code=400, detail=f"报告状态不是等待审核: {report.status}") # 如有 PR,调用 Gitea API 合并 if report.pr_url: gitea_client = GiteaClient() success, message = gitea_client.merge_pr_by_url(report.pr_url) if not success: raise HTTPException(status_code=500, detail=f"合并 PR 失败: {message}") # 更新报告状态 report.status = LogStatus.FIXED session.add(report) # 更新所有关联 Bug 状态 updated_ids = [] for bug_id in (report.error_log_ids or []): bug_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) bug_results = await session.exec(bug_stmt) bug = bug_results.first() if bug: bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) updated_ids.append(bug_id) await session.commit() return { "message": f"修复已批准,{len(updated_ids)} 个缺陷已更新", "report_id": report_id, "updated_bug_ids": updated_ids, } @app.post("/api/v1/repair/reports/{report_id}/reject", tags=["Repair"]) async def reject_report( report_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 驳回修复报告:关闭 PR(如有)并将所有关联 Bug 重置为 NEW """ statement = select(RepairTask).where(RepairTask.id == report_id) results = await session.exec(statement) report = results.first() if not report: raise HTTPException(status_code=404, detail="Report not found") if report.status != LogStatus.PENDING_FIX: raise HTTPException(status_code=400, detail=f"报告状态不是等待审核: {report.status}") # 如有 PR,关闭 if report.pr_url: gitea_client = GiteaClient() success, message = gitea_client.close_pr_by_url(report.pr_url, request.reason) if not success: raise HTTPException(status_code=500, detail=f"关闭 PR 失败: {message}") # 更新报告状态 report.status = LogStatus.FIX_FAILED report.failure_reason = f"人工驳回: {request.reason}" session.add(report) # 重置所有关联 Bug 为 NEW updated_ids = [] rejection_info = json.dumps({ "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, "report_id": report_id, }, ensure_ascii=False) for bug_id in (report.error_log_ids or []): bug_stmt = select(ErrorLog).where(ErrorLog.id == bug_id) bug_results = await session.exec(bug_stmt) bug = bug_results.first() if bug: bug.status = LogStatus.NEW bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() bug.rejection_reason = rejection_info session.add(bug) updated_ids.append(bug_id) await session.commit() return { "message": f"修复已驳回,{len(updated_ids)} 个缺陷将重新修复", "report_id": report_id, "updated_bug_ids": updated_ids, } # ==================== Dashboard APIs ==================== @app.get("/api/v1/dashboard/stats", tags=["Dashboard"]) async def get_dashboard_stats(source: Optional[str] = None, session: AsyncSession = Depends(get_session)): """Get overall statistics for dashboard""" today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) def _apply_source(q): return q.where(ErrorLog.source == source) if source else q # Total bugs total_query = _apply_source(select(func.count(ErrorLog.id))) total_result = await session.exec(total_query) total_bugs = total_result.one() # Today's new bugs today_query = _apply_source(select(func.count(ErrorLog.id)).where(ErrorLog.timestamp >= today)) today_result = await session.exec(today_query) today_bugs = today_result.one() # Count by status status_counts = {} for status in LogStatus: count_query = _apply_source(select(func.count(ErrorLog.id)).where(ErrorLog.status == status)) count_result = await session.exec(count_query) status_counts[status.value] = count_result.one() # 修复率 = (FIXED + VERIFIED + DEPLOYED + CANNOT_REPRODUCE) / Total resolved_count = ( status_counts.get("FIXED", 0) + status_counts.get("VERIFIED", 0) + status_counts.get("DEPLOYED", 0) + status_counts.get("CANNOT_REPRODUCE", 0) ) fix_rate = round((resolved_count / total_bugs * 100), 2) if total_bugs > 0 else 0 # Source distribution from .models import LogSource source_counts = {} for src in LogSource: sq = select(func.count(ErrorLog.id)).where(ErrorLog.source == src.value) sr = await session.exec(sq) source_counts[src.value] = sr.one() return { "total_bugs": total_bugs, "today_bugs": today_bugs, "fix_rate": fix_rate, "status_distribution": status_counts, "source_distribution": source_counts, } @app.get("/api/v1/bugs", tags=["Dashboard"]) async def get_bugs_list( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), status: Optional[LogStatus] = None, project_id: Optional[str] = None, source: Optional[str] = None, session: AsyncSession = Depends(get_session) ): """Get paginated list of bugs with optional filters""" query = select(ErrorLog).order_by(ErrorLog.timestamp.desc()) if status: query = query.where(ErrorLog.status == status) if project_id: query = query.where(ErrorLog.project_id == project_id) if source: query = query.where(ErrorLog.source == source) # Pagination offset = (page - 1) * page_size query = query.offset(offset).limit(page_size) results = await session.exec(query) bugs = results.all() # Get total count for pagination info count_query = select(func.count(ErrorLog.id)) if status: count_query = count_query.where(ErrorLog.status == status) if project_id: count_query = count_query.where(ErrorLog.project_id == project_id) if source: count_query = count_query.where(ErrorLog.source == source) count_result = await session.exec(count_query) total = count_result.one() return { "items": bugs, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } @app.get("/api/v1/bugs/{bug_id}", tags=["Dashboard"]) async def get_bug_detail(bug_id: int, session: AsyncSession = Depends(get_session)): """Get detailed information about a specific bug""" statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") return bug # ==================== Project Management ==================== @app.get("/api/v1/projects", tags=["Projects"]) async def get_projects(session: AsyncSession = Depends(get_session)): """Get list of all projects with full info""" query = select(Project).order_by(Project.updated_at.desc()) results = await session.exec(query) projects = results.all() return {"projects": projects} @app.get("/api/v1/projects/{project_id}", tags=["Projects"]) async def get_project_detail(project_id: str, session: AsyncSession = Depends(get_session)): """Get single project detail""" statement = select(Project).where(Project.project_id == project_id) results = await session.exec(statement) project = results.first() if not project: raise HTTPException(status_code=404, detail="Project not found") return project @app.put("/api/v1/projects/{project_id}", tags=["Projects"]) async def update_project(project_id: str, data: ProjectUpdate, session: AsyncSession = Depends(get_session)): """Update project config (repo_url, local_path, name, description)""" statement = select(Project).where(Project.project_id == project_id) results = await session.exec(statement) project = results.first() if not project: raise HTTPException(status_code=404, detail="Project not found") update_data = data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(project, key, value) project.updated_at = datetime.utcnow() session.add(project) await session.commit() await session.refresh(project) return project @app.get("/", tags=["Health"]) async def health_check(): return {"status": "ok"} # ==================== PR 操作 ==================== @app.post("/api/v1/bugs/{bug_id}/merge-pr", tags=["PR Operations"]) async def merge_pr( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 批准并合并 PR 流程: 1. 调用 Gitea API 合并 PR 2. 更新 Bug 状态 → MERGED """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"Bug 状态不是待修复,当前状态: {bug.status}" ) if not bug.pr_url: raise HTTPException(status_code=400, detail="Bug 没有关联的 PR") # 调用 Gitea API 合并 PR gitea_client = GiteaClient() success, message = gitea_client.merge_pr_by_url(bug.pr_url) if success: # 更新 Bug 状态 bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) await session.commit() await session.refresh(bug) return { "message": "PR 已合并", "pr_url": bug.pr_url, "bug_id": bug.id, "new_status": bug.status, } else: raise HTTPException(status_code=500, detail=f"合并失败: {message}") @app.post("/api/v1/bugs/{bug_id}/close-pr", tags=["PR Operations"]) async def close_pr( bug_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 拒绝修复并关闭 PR 流程: 1. 添加拒绝原因评论到 PR 2. 调用 Gitea API 关闭 PR 3. 更新 Bug 状态 → PENDING_FIX 4. 记录拒绝原因 5. Agent 会自动检测并重新修复 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"Bug 状态不是待修复,当前状态: {bug.status}" ) if not bug.pr_url: raise HTTPException(status_code=400, detail="Bug 没有关联的 PR") gitea_client = GiteaClient() # 关闭 PR(带拒绝原因) success, message = gitea_client.close_pr_by_url(bug.pr_url, request.reason) if not success: raise HTTPException(status_code=500, detail=f"关闭 PR 失败: {message}") # 更新 Bug 状态 bug.status = LogStatus.PENDING_FIX bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() # 记录拒绝原因 rejection_info = { "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, "previous_pr": { "pr_number": bug.pr_number, "pr_url": bug.pr_url, "branch": bug.branch_name, }, } bug.rejection_reason = json.dumps(rejection_info, ensure_ascii=False) session.add(bug) await session.commit() await session.refresh(bug) return { "message": "PR 已拒绝,Bug 将重新修复", "rejection_count": bug.rejection_count, "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/retry", tags=["PR Operations"]) async def retry_fix( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 重新尝试修复失败的 Bug 将 FIX_FAILED 状态的 Bug 重置为 NEW,让 repair agent 重新扫描 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.FIX_FAILED: raise HTTPException( status_code=400, detail=f"只能重试修复失败的 Bug,当前状态: {bug.status}" ) # 重置状态为 NEW bug.status = LogStatus.NEW # 清除失败原因,让 agent 重新分析 bug.failure_reason = None session.add(bug) await session.commit() await session.refresh(bug) return { "message": "Bug 已重置为新发现状态,repair agent 将重新扫描修复", "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/approve-fix", tags=["PR Operations"]) async def approve_fix( bug_id: int, session: AsyncSession = Depends(get_session), ): """ 人工确认修复(不依赖 PR) 用于 PENDING_FIX 状态但没有关联 PR 的 Bug,人工审核后直接标记为 FIXED。 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"只能确认等待审核的 Bug,当前状态: {bug.status}" ) bug.status = LogStatus.FIXED bug.merged_at = datetime.utcnow() session.add(bug) await session.commit() await session.refresh(bug) return { "message": "修复已确认", "bug_id": bug.id, "new_status": bug.status, } @app.post("/api/v1/bugs/{bug_id}/reject-fix", tags=["PR Operations"]) async def reject_fix( bug_id: int, request: PRRejectRequest, session: AsyncSession = Depends(get_session), ): """ 人工驳回修复(不依赖 PR) 用于 PENDING_FIX 状态但没有关联 PR 的 Bug,驳回后重置为 NEW 让 agent 重新修复。 """ statement = select(ErrorLog).where(ErrorLog.id == bug_id) results = await session.exec(statement) bug = results.first() if not bug: raise HTTPException(status_code=404, detail="Bug not found") if bug.status != LogStatus.PENDING_FIX: raise HTTPException( status_code=400, detail=f"只能驳回等待审核的 Bug,当前状态: {bug.status}" ) bug.status = LogStatus.NEW bug.rejection_count = (bug.rejection_count or 0) + 1 bug.last_rejected_at = datetime.utcnow() rejection_info = { "rejected_at": datetime.utcnow().isoformat(), "reason": request.reason, } bug.rejection_reason = json.dumps(rejection_info, ensure_ascii=False) session.add(bug) await session.commit() await session.refresh(bug) return { "message": "修复已驳回,Bug 将重新修复", "rejection_count": bug.rejection_count, "bug_id": bug.id, "new_status": bug.status, }