""" Git Manager - Git 操作管理 """ import os from typing import Optional, Tuple import requests from git import Repo, InvalidGitRepositoryError from loguru import logger from ..config import settings class GitManager: """负责 Git 操作""" def __init__(self, project_path: str, github_repo: str = ""): self.project_path = project_path self.github_repo = github_repo.strip() self.repo: Optional[Repo] = None self._init_repo() def _init_repo(self): """初始化 Git 仓库""" try: self.repo = Repo(self.project_path) # 配置 Git 用户 with self.repo.config_writer() as config: config.set_value("user", "name", settings.git_user_name) config.set_value("user", "email", settings.git_user_email) # 如果指定了 GitHub 仓库地址,确保 origin 指向正确的仓库 if self.github_repo: self._ensure_remote(self.github_repo) logger.info(f"Git 仓库初始化成功: {self.project_path}") except InvalidGitRepositoryError: logger.error(f"无效的 Git 仓库: {self.project_path}") self.repo = None def _ensure_remote(self, repo_url: str): """确保 origin remote 指向指定的仓库地址""" if not self.repo: return try: if "origin" in [r.name for r in self.repo.remotes]: current_url = self.repo.remotes.origin.url if current_url != repo_url: self.repo.remotes.origin.set_url(repo_url) logger.info(f"更新 origin 地址: {repo_url}") else: self.repo.create_remote("origin", repo_url) logger.info(f"添加 origin 地址: {repo_url}") except Exception as e: logger.error(f"配置 remote 失败: {e}") def _parse_owner_repo(self) -> Tuple[str, str]: """ 从 github_repo 解析出 owner 和 repo 名称。 支持格式: - "owner/repo" - "https://gitea.airlabs.art/owner/repo.git" - "https://gitea.airlabs.art/owner/repo" - "git@gitea.airlabs.art:owner/repo.git" """ raw = self.github_repo.strip() # 去掉 .git 后缀 if raw.endswith(".git"): raw = raw[:-4] # HTTP(S) URL if raw.startswith("http://") or raw.startswith("https://"): parts = raw.rstrip("/").split("/") if len(parts) >= 2: return parts[-2], parts[-1] # SSH 格式 git@host:owner/repo elif ":" in raw and "@" in raw: path = raw.split(":")[-1] segments = path.split("/") if len(segments) == 2: return segments[0], segments[1] # 简单 owner/repo 格式 else: segments = raw.split("/") if len(segments) == 2: return segments[0], segments[1] return "", "" def pull(self) -> bool: """拉取最新代码(自动切回 main/master 分支)""" if not self.repo: return False try: # 先切回 main/master,避免在无 upstream 的 fix 分支上 pull 失败 current = self.repo.active_branch.name if current.startswith("fix/"): main_name = "main" if "main" in self.repo.heads else "master" if main_name in [h.name for h in self.repo.heads]: self.repo.heads[main_name].checkout() logger.info(f"从 {current} 切回 {main_name}") origin = self.repo.remotes.origin origin.pull() logger.info("代码拉取成功") return True except Exception as e: logger.error(f"拉取代码失败: {e}") return False def create_branch(self, branch_name: str) -> bool: """ 创建并切换到新分支 Args: branch_name: 分支名称 Returns: 是否成功 """ if not self.repo: return False try: # 先切换到 main 分支 if "main" in self.repo.heads: self.repo.heads.main.checkout() elif "master" in self.repo.heads: self.repo.heads.master.checkout() # 创建新分支 new_branch = self.repo.create_head(branch_name) new_branch.checkout() logger.info(f"创建并切换到分支: {branch_name}") return True except Exception as e: logger.error(f"创建分支失败: {e}") return False def checkout(self, branch_name: str) -> bool: """切换分支""" if not self.repo: return False try: if branch_name in self.repo.heads: self.repo.heads[branch_name].checkout() logger.info(f"切换到分支: {branch_name}") return True else: logger.error(f"分支不存在: {branch_name}") return False except Exception as e: logger.error(f"切换分支失败: {e}") return False def get_diff(self) -> str: """获取当前修改的 diff""" if not self.repo: return "" try: return self.repo.git.diff() except Exception as e: logger.error(f"获取 diff 失败: {e}") return "" def get_modified_files(self) -> list[str]: """获取已修改的文件列表""" if not self.repo: return [] try: return [item.a_path for item in self.repo.index.diff(None)] except Exception as e: logger.error(f"获取修改文件失败: {e}") return [] def commit(self, message: str) -> bool: """提交更改""" if not self.repo: return False try: # 添加所有修改 self.repo.git.add(A=True) # 检查是否有更改 if not self.repo.is_dirty(): logger.warning("没有需要提交的更改") return False self.repo.index.commit(message) logger.info(f"提交成功: {message}") return True except Exception as e: logger.error(f"提交失败: {e}") return False def push(self, branch_name: Optional[str] = None) -> bool: """推送到远程(自动设置上游跟踪分支)""" if not self.repo: return False try: branch = branch_name or self.repo.active_branch.name origin = self.repo.remotes.origin origin.push(refspec=f"{branch}:{branch}", set_upstream=True) logger.info(f"推送成功: {branch}") return True except Exception as e: logger.error(f"推送失败: {e}") return False def merge_to_main_and_push(self) -> bool: """将当前 fix 分支合并到 main 并推送,然后删除 fix 分支""" if not self.repo: return False try: fix_branch = self.repo.active_branch.name # 切换到 main(或 master) main_name = "main" if "main" in self.repo.heads else "master" self.repo.heads[main_name].checkout() # 合并 fix 分支 self.repo.git.merge(fix_branch) logger.info(f"合并 {fix_branch} → {main_name}") # 推送 main origin = self.repo.remotes.origin origin.push(refspec=f"{main_name}:{main_name}") logger.info(f"推送 {main_name} 成功") # 删除本地和远程 fix 分支 self.repo.git.branch("-d", fix_branch) try: origin.push(refspec=f":{fix_branch}") logger.info(f"删除远程分支 {fix_branch}") except Exception: pass # 远程分支可能不存在 return True except Exception as e: logger.error(f"合并到 main 失败: {e}") return False def create_pr(self, title: str, body: str, base: str = "main") -> Tuple[bool, Optional[dict]]: """ 通过 Gitea API 创建 Pull Request Returns: (success, pr_info) - pr_info 包含 pr_number, pr_url, branch_name """ if not self.repo or not self.github_repo: logger.error("未配置仓库地址,无法创建 PR") return False, None head_branch = self.repo.active_branch.name gitea_url = settings.gitea_url.rstrip("/") token = settings.gitea_token if not token: logger.error("未配置 Gitea Token,无法创建 PR") return False, None # 从 github_repo 提取 owner/repo,支持完整 URL 和 owner/repo 格式 owner, repo = self._parse_owner_repo() if not owner or not repo: logger.error(f"无法解析仓库 owner/repo: {self.github_repo}") return False, None api_url = f"{gitea_url}/api/v1/repos/{owner}/{repo}/pulls" try: resp = requests.post( api_url, json={ "title": title, "body": body, "head": head_branch, "base": base, }, headers={"Authorization": f"token {token}"}, timeout=30, ) if resp.status_code in (200, 201): data = resp.json() pr_info = { "pr_number": data["number"], "pr_url": data["html_url"], "branch_name": head_branch, } logger.info(f"PR 创建成功: #{data['number']} - {data['html_url']}") return True, pr_info else: logger.error(f"创建 PR 失败 ({resp.status_code}): {resp.text}") return False, None except Exception as e: logger.error(f"创建 PR 请求异常: {e}") return False, None def reset_hard(self) -> bool: """重置所有更改""" if not self.repo: return False try: self.repo.git.reset("--hard") self.repo.git.clean("-fd") logger.info("重置成功") return True except Exception as e: logger.error(f"重置失败: {e}") return False def get_current_branch(self) -> str: """获取当前分支名""" if not self.repo: return "" try: return self.repo.active_branch.name except Exception: return ""