Compare commits
1 Commits
main
...
fix/auto-2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a83891b2f |
@ -16,38 +16,16 @@ DB_NAME = os.getenv("DB_NAME")
|
|||||||
|
|
||||||
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
||||||
|
|
||||||
_engine = None
|
engine = create_async_engine(
|
||||||
|
DATABASE_URL,
|
||||||
|
echo=True,
|
||||||
def get_engine():
|
future=True,
|
||||||
"""Lazy engine creation to ensure pool is bound to the current event loop."""
|
pool_pre_ping=True,
|
||||||
global _engine
|
pool_recycle=300,
|
||||||
if _engine is None:
|
)
|
||||||
_engine = create_async_engine(
|
|
||||||
DATABASE_URL,
|
|
||||||
echo=True,
|
|
||||||
future=True,
|
|
||||||
pool_pre_ping=True,
|
|
||||||
pool_recycle=300,
|
|
||||||
)
|
|
||||||
return _engine
|
|
||||||
|
|
||||||
|
|
||||||
async def dispose_engine():
|
|
||||||
"""Dispose engine and reset so next call creates a fresh one."""
|
|
||||||
global _engine
|
|
||||||
if _engine is not None:
|
|
||||||
await _engine.dispose()
|
|
||||||
_engine = None
|
|
||||||
|
|
||||||
|
|
||||||
# Module-level alias for backward compatibility
|
|
||||||
engine = None # Use get_engine() instead
|
|
||||||
|
|
||||||
|
|
||||||
async def init_db():
|
async def init_db():
|
||||||
eng = get_engine()
|
async with engine.begin() as conn:
|
||||||
async with eng.begin() as conn:
|
|
||||||
# await conn.run_sync(SQLModel.metadata.drop_all)
|
# await conn.run_sync(SQLModel.metadata.drop_all)
|
||||||
await conn.run_sync(SQLModel.metadata.create_all)
|
await conn.run_sync(SQLModel.metadata.create_all)
|
||||||
|
|
||||||
@ -81,7 +59,7 @@ async def init_db():
|
|||||||
|
|
||||||
async def get_session() -> AsyncSession:
|
async def get_session() -> AsyncSession:
|
||||||
async_session = sessionmaker(
|
async_session = sessionmaker(
|
||||||
get_engine(), class_=AsyncSession, expire_on_commit=False
|
engine, class_=AsyncSession, expire_on_commit=False
|
||||||
)
|
)
|
||||||
async with async_session() as session:
|
async with async_session() as session:
|
||||||
yield session
|
yield session
|
||||||
|
|||||||
39
app/main.py
39
app/main.py
@ -3,7 +3,8 @@ from fastapi.middleware.cors import CORSMiddleware
|
|||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
from sqlmodel import select, func, text
|
from sqlmodel import select, func, text
|
||||||
from .database import init_db, get_session, get_engine
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
from .database import init_db, get_session, engine
|
||||||
from .models import ErrorLog, ErrorLogCreate, LogStatus, TaskStatusUpdate, RepairTask, RepairTaskCreate, Project, ProjectUpdate
|
from .models import ErrorLog, ErrorLogCreate, LogStatus, TaskStatusUpdate, RepairTask, RepairTaskCreate, Project, ProjectUpdate
|
||||||
from .gitea_client import GiteaClient
|
from .gitea_client import GiteaClient
|
||||||
from .self_report import self_report_error
|
from .self_report import self_report_error
|
||||||
@ -49,7 +50,7 @@ async def _register_self_projects():
|
|||||||
"description": "日志中台 React 管理端",
|
"description": "日志中台 React 管理端",
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
async_session = sa_sessionmaker(get_engine(), class_=AsyncSession, expire_on_commit=False)
|
async_session = sa_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||||
async with async_session() as session:
|
async with async_session() as session:
|
||||||
for proj_data in projects:
|
for proj_data in projects:
|
||||||
stmt = select(Project).where(Project.project_id == proj_data["project_id"])
|
stmt = select(Project).where(Project.project_id == proj_data["project_id"])
|
||||||
@ -163,7 +164,39 @@ async def report_log(log_data: ErrorLogCreate, session: AsyncSession = Depends(g
|
|||||||
)
|
)
|
||||||
|
|
||||||
session.add(new_log)
|
session.add(new_log)
|
||||||
await session.commit()
|
try:
|
||||||
|
await session.commit()
|
||||||
|
except IntegrityError:
|
||||||
|
await session.rollback()
|
||||||
|
# Race condition: another request inserted the same fingerprint concurrently
|
||||||
|
statement = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint)
|
||||||
|
results = await session.exec(statement)
|
||||||
|
existing_log = results.first()
|
||||||
|
if existing_log:
|
||||||
|
if existing_log.status not in [LogStatus.DEPLOYED, LogStatus.FIXED, LogStatus.VERIFIED]:
|
||||||
|
existing_log.error_message = log_data.error.get("message", existing_log.error_message)
|
||||||
|
existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace)
|
||||||
|
existing_log.context = log_data.context or existing_log.context
|
||||||
|
existing_log.timestamp = log_data.timestamp or datetime.utcnow()
|
||||||
|
if log_data.commit_hash:
|
||||||
|
existing_log.commit_hash = log_data.commit_hash
|
||||||
|
session.add(existing_log)
|
||||||
|
await session.commit()
|
||||||
|
await session.refresh(existing_log)
|
||||||
|
return {"message": "Log deduplicated (content updated)", "id": existing_log.id, "status": existing_log.status}
|
||||||
|
existing_log.status = LogStatus.NEW
|
||||||
|
existing_log.error_message = log_data.error.get("message", existing_log.error_message)
|
||||||
|
existing_log.stack_trace = log_data.error.get("stack_trace", existing_log.stack_trace)
|
||||||
|
existing_log.context = log_data.context or existing_log.context
|
||||||
|
existing_log.timestamp = log_data.timestamp or datetime.utcnow()
|
||||||
|
existing_log.retry_count = 0
|
||||||
|
if log_data.commit_hash:
|
||||||
|
existing_log.commit_hash = log_data.commit_hash
|
||||||
|
session.add(existing_log)
|
||||||
|
await session.commit()
|
||||||
|
await session.refresh(existing_log)
|
||||||
|
return {"message": "Regression detected, reopened", "id": existing_log.id}
|
||||||
|
raise
|
||||||
await session.refresh(new_log)
|
await session.refresh(new_log)
|
||||||
|
|
||||||
return {"message": "Log reported", "id": new_log.id}
|
return {"message": "Log reported", "id": new_log.id}
|
||||||
|
|||||||
@ -9,7 +9,7 @@ from sqlmodel import select
|
|||||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
from .database import get_engine
|
from .database import engine
|
||||||
from .models import ErrorLog, LogStatus, Project
|
from .models import ErrorLog, LogStatus, Project
|
||||||
|
|
||||||
PROJECT_ID = "log_center_api"
|
PROJECT_ID = "log_center_api"
|
||||||
@ -35,7 +35,7 @@ async def self_report_error(exc: Exception, context: dict = None):
|
|||||||
raw = f"{PROJECT_ID}|{error_type}|{file_path}|{line_number}"
|
raw = f"{PROJECT_ID}|{error_type}|{file_path}|{line_number}"
|
||||||
fingerprint = hashlib.md5(raw.encode()).hexdigest()
|
fingerprint = hashlib.md5(raw.encode()).hexdigest()
|
||||||
|
|
||||||
async_session = sessionmaker(get_engine(), class_=AsyncSession, expire_on_commit=False)
|
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||||
async with async_session() as session:
|
async with async_session() as session:
|
||||||
# 去重检查
|
# 去重检查
|
||||||
stmt = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint)
|
stmt = select(ErrorLog).where(ErrorLog.fingerprint == fingerprint)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user