""" adapters/vcs/github.py GitHub VCS adapter — Phase 2 implementation. Uses PyGithub (``pip install PyGithub``) to interact with the GitHub REST API. Reads the repository URL and base branch from the team.yaml config dict. Note on commit() signature -------------------------- The base class declares ``commit(files: list[str], message: str)``, which is insufficient for the GitHub Contents API (which requires file *content*, not just paths). This implementation extends the signature to accept either: * ``dict[str, str]`` — ``{path: content}`` mapping (preferred; uses the API). * ``list[str]`` — local file paths; content is read from disk and pushed. The optional ``branch`` keyword argument targets a specific branch; it defaults to the configured base branch. """ from __future__ import annotations import os import re from typing import Union from github import Github, GithubException from adapters.base.vcs import VCSAdapter class GitHubAdapter(VCSAdapter): """ VCS adapter for GitHub repositories. Authenticates via GITHUB_TOKEN and interacts with the GitHub REST API through PyGithub. Environment variables --------------------- GITHUB_TOKEN : Required. Personal access token or GitHub App installation token. Config keys (from team.yaml) ---------------------------- run.repo : SSH or HTTPS clone URL (e.g. "git@github.com:org/repo.git"). run.base_branch : Default base branch (e.g. "main"). """ def __init__(self, config: dict) -> None: """ Initialise the GitHub adapter. Parameters ---------- config : Loaded team.yaml config dict. Raises ------ ValueError If GITHUB_TOKEN is not set or the repo URL cannot be parsed. """ self._config = config token = os.environ.get("GITHUB_TOKEN") if not token: raise ValueError( "GITHUB_TOKEN environment variable is not set. " "Create a personal access token and export it before running the-agency." ) self._g = Github(token) run_cfg: dict = config.get("run", {}) repo_url: str = run_cfg.get("repo", "") self._base_branch: str = run_cfg.get("base_branch", "main") self._owner, self._repo_name = self._parse_repo_url(repo_url) self._repo = self._g.get_repo(f"{self._owner}/{self._repo_name}") # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _parse_repo_url(self, url: str) -> tuple[str, str]: """Parse *owner* and *repo* name from an SSH or HTTPS GitHub URL.""" # git@github.com:owner/repo.git m = re.match(r"git@github\.com:([^/]+)/([^/]+?)(?:\.git)?$", url) if m: return m.group(1), m.group(2) # https://github.com/owner/repo[.git] m = re.match(r"https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?/?$", url) if m: return m.group(1), m.group(2) raise ValueError( f"Cannot parse GitHub owner/repo from URL: {url!r}. " "Expected SSH (git@github.com:owner/repo.git) or " "HTTPS (https://github.com/owner/repo.git) format." ) # ------------------------------------------------------------------ # VCSAdapter interface # ------------------------------------------------------------------ def create_branch(self, name: str) -> None: """ Create a new branch off ``self._base_branch`` on the remote. Parameters ---------- name : New branch name (e.g. "feat/webhook-ingestion"). """ base_ref = self._repo.get_git_ref(f"heads/{self._base_branch}") self._repo.create_git_ref(f"refs/heads/{name}", base_ref.object.sha) def commit( self, files: Union[dict[str, str], list[str]], message: str, branch: str | None = None, ) -> str: """ Commit files to the repository via the GitHub Contents API. Parameters ---------- files : Either a ``dict[path, content]`` mapping (preferred), or a ``list[path]`` of local file paths whose content is read from disk. message : Commit message. branch : Target branch. Defaults to ``self._base_branch``. Returns ------- SHA of the last created/updated commit, or empty string if no files were committed. """ target_branch = branch or self._base_branch # Normalise to {path: content} if isinstance(files, list): files_dict: dict[str, str] = {} for path in files: with open(path, "r", encoding="utf-8") as fh: files_dict[path] = fh.read() else: files_dict = files last_sha: str = "" for path, content in files_dict.items(): try: existing = self._repo.get_contents(path, ref=target_branch) result = self._repo.update_file( path=path, message=message, content=content, sha=existing.sha, # type: ignore[union-attr] branch=target_branch, ) except GithubException: # File does not exist yet — create it result = self._repo.create_file( path=path, message=message, content=content, branch=target_branch, ) last_sha = result["commit"].sha return last_sha def create_pr(self, title: str, body: str, head: str, base: str) -> str: """ Open a pull request on GitHub. Parameters ---------- title : PR title. body : PR description / body markdown. head : Head branch name (the branch with changes). base : Base branch name (e.g. "main"). Returns ------- HTML URL of the created pull request. """ pr = self._repo.create_pull( title=title, body=body, head=head, base=base, ) return pr.html_url def get_pr_status(self, pr_id: str) -> str: """ Fetch the current status of a pull request. Parameters ---------- pr_id : Pull request number as a string (e.g. "42"). Returns ------- One of: "open" | "merged" | "closed". """ pr = self._repo.get_pull(int(pr_id)) if pr.merged: return "merged" return pr.state # "open" or "closed"