#!/usr/bin/env python3 """gh-monitor: polls GitHub PRs and fires openclaw notifications on new activity.""" import json import logging import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path import yaml # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- BASE_DIR = Path(__file__).parent CONFIG_PATH = BASE_DIR / "config" / "watched.yaml" STATE_PATH = BASE_DIR / "state" / "last_seen.json" ERRORS_LOG = BASE_DIR / "state" / "errors.log" logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("gh-monitor") # --------------------------------------------------------------------------- # Custom exception # --------------------------------------------------------------------------- class GHAPIError(Exception): pass # --------------------------------------------------------------------------- # STEP 3 — config + state loader # --------------------------------------------------------------------------- def load_config(path: Path) -> dict: if not path.exists(): raise FileNotFoundError(f"Config not found: {path}") with open(path) as f: return yaml.safe_load(f) def load_state(path: Path) -> dict: if not path.exists(): return {} with open(path) as f: return json.load(f) def save_state(state: dict, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(".tmp") with open(tmp, "w") as f: json.dump(state, f, indent=2) tmp.rename(path) # --------------------------------------------------------------------------- # STEP 4 — GitHub API client # --------------------------------------------------------------------------- def gh_api(endpoint: str) -> list | dict: result = subprocess.run( ["gh", "api", "--paginate", endpoint], capture_output=True, text=True, ) if result.returncode != 0: raise GHAPIError(f"gh api failed ({result.returncode}): {result.stderr.strip()}") return json.loads(result.stdout) def get_open_prs(owner: str, repo: str) -> list[dict]: data = gh_api(f"/repos/{owner}/{repo}/pulls?state=open") return [{"number": pr["number"], "title": pr["title"], "html_url": pr["html_url"]} for pr in data] # --------------------------------------------------------------------------- # STEP 5 — event fetchers # --------------------------------------------------------------------------- def _normalize(event_type: str, items: list[dict]) -> list[dict]: """Normalize raw GitHub API items into a common event dict shape.""" events = [] for item in items: actor = (item.get("user") or item.get("author") or {}).get("login", "unknown") body = item.get("body") or "" url = item.get("html_url") or "" created_at = item.get("submitted_at") or item.get("created_at") or "" # review state maps to a human-readable action action = item.get("state", "").lower() if event_type == "review_submitted" else event_type events.append({ "id": item.get("id"), "event_type": event_type, "action": action or event_type, "created_at": created_at, "actor": actor, "body": body, "url": url, }) return events def get_reviews(owner: str, repo: str, pr_number: int) -> list[dict]: items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews") return _normalize("review_submitted", items) def get_review_comments(owner: str, repo: str, pr_number: int) -> list[dict]: items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/comments") return _normalize("review_comment", items) def get_issue_comments(owner: str, repo: str, pr_number: int) -> list[dict]: items = gh_api(f"/repos/{owner}/{repo}/issues/{pr_number}/comments") return _normalize("issue_comment", items) # --------------------------------------------------------------------------- # STEP 6 — event diffing # --------------------------------------------------------------------------- def new_events_since(events: list[dict], cursor_ts: str, seen_ids: set) -> list[dict]: filtered = [ e for e in events if e["created_at"] >= cursor_ts and e.get("id") not in seen_ids ] return sorted(filtered, key=lambda e: e["created_at"]) # --------------------------------------------------------------------------- # STEP 7 — notification sender # --------------------------------------------------------------------------- def notify(text: str) -> None: result = subprocess.run( ["openclaw", "system", "event", "--text", text, "--mode", "now"], capture_output=True, text=True, ) if result.returncode != 0: log.warning("notify failed (%d): %s", result.returncode, result.stderr.strip()) def format_notification(repo_slug: str, pr: dict, event: dict) -> str: number = pr["number"] title = pr["title"] actor = event["actor"] action = event["action"] body_preview = (event["body"] or "")[:200] url = event["url"] return f'[gh-monitor] PR #{number} "{title}" — {actor} {action}:\n"{body_preview}"\n{url}' # --------------------------------------------------------------------------- # STEP 8 — error tracking # --------------------------------------------------------------------------- def log_error(repo_slug: str, error: Exception, state: dict) -> None: # Append to errors.log ERRORS_LOG.parent.mkdir(parents=True, exist_ok=True) with open(ERRORS_LOG, "a") as f: ts = datetime.now(timezone.utc).isoformat() f.write(f"{ts} [{repo_slug}] {error}\n") repo_state = state.setdefault(repo_slug, {}) repo_state["consecutive_errors"] = repo_state.get("consecutive_errors", 0) + 1 if repo_state["consecutive_errors"] >= 3 and not repo_state.get("error_alerted"): notify(f"[gh-monitor] {repo_slug} has failed {repo_state['consecutive_errors']} times in a row — check errors.log") repo_state["error_alerted"] = True def reset_errors(repo_slug: str, state: dict) -> None: repo_state = state.setdefault(repo_slug, {}) repo_state["consecutive_errors"] = 0 repo_state["error_alerted"] = False # --------------------------------------------------------------------------- # STEP 9 — main poll loop # --------------------------------------------------------------------------- def poll_repo(repo_cfg: dict, state: dict) -> dict: owner = repo_cfg["owner"] repo = repo_cfg["repo"] notify_on = set(repo_cfg.get("notify_on", [])) repo_slug = f"{owner}/{repo}" repo_state = state.setdefault(repo_slug, {}) now_ts = datetime.now(timezone.utc).isoformat() cursor = repo_state.get("cursor", now_ts) seen_ids: set = set(repo_state.get("seen_ids", [])) # First run: cursor = now (no backfill) if "cursor" not in repo_state: repo_state["cursor"] = now_ts log.info("[%s] First run — cursor set to now, no backfill.", repo_slug) return state try: open_prs = get_open_prs(owner, repo) except GHAPIError as e: log.error("[%s] Failed to fetch PRs: %s", repo_slug, e) log_error(repo_slug, e, state) return state all_new_events: list[tuple[dict, dict]] = [] # (pr, event) for pr in open_prs: pr_number = pr["number"] fetchers = [] if "review_submitted" in notify_on: fetchers.append(get_reviews) if "review_comment" in notify_on: fetchers.append(get_review_comments) if "issue_comment" in notify_on: fetchers.append(get_issue_comments) for fetcher in fetchers: try: events = fetcher(owner, repo, pr_number) new = new_events_since(events, cursor, seen_ids) for event in new: all_new_events.append((pr, event)) except GHAPIError as e: log.error("[%s] PR #%d fetch error: %s", repo_slug, pr_number, e) log_error(repo_slug, e, state) return state # Also check pr_closed if configured (PRs that closed since cursor) if "pr_closed" in notify_on: try: closed_data = gh_api(f"/repos/{owner}/{repo}/pulls?state=closed") for pr_raw in closed_data: closed_at = pr_raw.get("closed_at") or "" if closed_at >= cursor and pr_raw.get("id") not in seen_ids: pr = {"number": pr_raw["number"], "title": pr_raw["title"], "html_url": pr_raw["html_url"]} actor = (pr_raw.get("user") or {}).get("login", "unknown") merged = pr_raw.get("merged_at") is not None action = "merged" if merged else "closed" event = { "id": pr_raw.get("id"), "event_type": "pr_closed", "action": action, "created_at": closed_at, "actor": actor, "body": pr_raw.get("body") or "", "url": pr_raw["html_url"], } all_new_events.append((pr, event)) except GHAPIError as e: log.error("[%s] Failed to fetch closed PRs: %s", repo_slug, e) log_error(repo_slug, e, state) return state # Sort all new events by created_at ascending all_new_events.sort(key=lambda x: x[1]["created_at"]) for pr, event in all_new_events: text = format_notification(repo_slug, pr, event) notify(text) log.info("[%s] Notified: PR #%d %s by %s", repo_slug, pr["number"], event["action"], event["actor"]) if event.get("id"): seen_ids.add(event["id"]) # Update cursor and seen_ids if all_new_events: repo_state["cursor"] = max(e["created_at"] for _, e in all_new_events) else: repo_state["cursor"] = now_ts # Keep seen_ids bounded — only retain IDs from events at or after the cursor repo_state["seen_ids"] = list(seen_ids) reset_errors(repo_slug, state) return state def main() -> None: config = load_config(CONFIG_PATH) state = load_state(STATE_PATH) for repo_cfg in config.get("repos", []): state = poll_repo(repo_cfg, state) save_state(state, STATE_PATH) sys.exit(0) if __name__ == "__main__": main()