feat(gh-monitor): implement poll.py — steps 1-9 (#2)

* feat(gh-monitor): implement poll.py — steps 1-9 complete

* chore: nudge PR head
This commit is contained in:
2026-03-15 18:48:56 -04:00
committed by GitHub
parent 22151c0856
commit 43ab5dcc3a
5 changed files with 298 additions and 0 deletions

282
tools/gh-monitor/poll.py Normal file
View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""gh-monitor: polls GitHub PRs and fires openclaw notifications on new activity."""
import json
import logging
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
import yaml
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
CONFIG_PATH = BASE_DIR / "config" / "watched.yaml"
STATE_PATH = BASE_DIR / "state" / "last_seen.json"
ERRORS_LOG = BASE_DIR / "state" / "errors.log"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("gh-monitor")
# ---------------------------------------------------------------------------
# Custom exception
# ---------------------------------------------------------------------------
class GHAPIError(Exception):
pass
# ---------------------------------------------------------------------------
# STEP 3 — config + state loader
# ---------------------------------------------------------------------------
def load_config(path: Path) -> dict:
if not path.exists():
raise FileNotFoundError(f"Config not found: {path}")
with open(path) as f:
return yaml.safe_load(f)
def load_state(path: Path) -> dict:
if not path.exists():
return {}
with open(path) as f:
return json.load(f)
def save_state(state: dict, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(state, f, indent=2)
tmp.rename(path)
# ---------------------------------------------------------------------------
# STEP 4 — GitHub API client
# ---------------------------------------------------------------------------
def gh_api(endpoint: str) -> list | dict:
result = subprocess.run(
["gh", "api", "--paginate", endpoint],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GHAPIError(f"gh api failed ({result.returncode}): {result.stderr.strip()}")
return json.loads(result.stdout)
def get_open_prs(owner: str, repo: str) -> list[dict]:
data = gh_api(f"/repos/{owner}/{repo}/pulls?state=open")
return [{"number": pr["number"], "title": pr["title"], "html_url": pr["html_url"]} for pr in data]
# ---------------------------------------------------------------------------
# STEP 5 — event fetchers
# ---------------------------------------------------------------------------
def _normalize(event_type: str, items: list[dict]) -> list[dict]:
"""Normalize raw GitHub API items into a common event dict shape."""
events = []
for item in items:
actor = (item.get("user") or item.get("author") or {}).get("login", "unknown")
body = item.get("body") or ""
url = item.get("html_url") or ""
created_at = item.get("submitted_at") or item.get("created_at") or ""
# review state maps to a human-readable action
action = item.get("state", "").lower() if event_type == "review_submitted" else event_type
events.append({
"event_type": event_type,
"action": action or event_type,
"created_at": created_at,
"actor": actor,
"body": body,
"url": url,
})
return events
def get_reviews(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews")
return _normalize("review_submitted", items)
def get_review_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/comments")
return _normalize("review_comment", items)
def get_issue_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/issues/{pr_number}/comments")
return _normalize("issue_comment", items)
# ---------------------------------------------------------------------------
# STEP 6 — event diffing
# ---------------------------------------------------------------------------
def new_events_since(events: list[dict], cursor_ts: str) -> list[dict]:
filtered = [e for e in events if e["created_at"] > cursor_ts]
return sorted(filtered, key=lambda e: e["created_at"])
# ---------------------------------------------------------------------------
# STEP 7 — notification sender
# ---------------------------------------------------------------------------
def notify(text: str) -> None:
result = subprocess.run(
["openclaw", "system", "event", "--text", text, "--mode", "now"],
capture_output=True,
text=True,
)
if result.returncode != 0:
log.warning("notify failed (%d): %s", result.returncode, result.stderr.strip())
def format_notification(repo_slug: str, pr: dict, event: dict) -> str:
number = pr["number"]
title = pr["title"]
actor = event["actor"]
action = event["action"]
body_preview = (event["body"] or "")[:200]
url = event["url"]
return f'[gh-monitor] PR #{number} "{title}"{actor} {action}:\n"{body_preview}"\n{url}'
# ---------------------------------------------------------------------------
# STEP 8 — error tracking
# ---------------------------------------------------------------------------
def log_error(repo_slug: str, error: Exception, state: dict) -> None:
# Append to errors.log
ERRORS_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(ERRORS_LOG, "a") as f:
ts = datetime.now(timezone.utc).isoformat()
f.write(f"{ts} [{repo_slug}] {error}\n")
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = repo_state.get("consecutive_errors", 0) + 1
if repo_state["consecutive_errors"] >= 3 and not repo_state.get("error_alerted"):
notify(f"[gh-monitor] {repo_slug} has failed {repo_state['consecutive_errors']} times in a row — check errors.log")
repo_state["error_alerted"] = True
def reset_errors(repo_slug: str, state: dict) -> None:
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = 0
repo_state["error_alerted"] = False
# ---------------------------------------------------------------------------
# STEP 9 — main poll loop
# ---------------------------------------------------------------------------
def poll_repo(repo_cfg: dict, state: dict) -> dict:
owner = repo_cfg["owner"]
repo = repo_cfg["repo"]
notify_on = set(repo_cfg.get("notify_on", []))
repo_slug = f"{owner}/{repo}"
repo_state = state.setdefault(repo_slug, {})
now_ts = datetime.now(timezone.utc).isoformat()
cursor = repo_state.get("cursor", now_ts)
# First run: cursor = now (no backfill)
if "cursor" not in repo_state:
repo_state["cursor"] = now_ts
log.info("[%s] First run — cursor set to now, no backfill.", repo_slug)
return state
try:
open_prs = get_open_prs(owner, repo)
except GHAPIError as e:
log.error("[%s] Failed to fetch PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
all_new_events: list[tuple[dict, dict]] = [] # (pr, event)
for pr in open_prs:
pr_number = pr["number"]
fetchers = []
if "review_submitted" in notify_on:
fetchers.append(get_reviews)
if "review_comment" in notify_on:
fetchers.append(get_review_comments)
if "issue_comment" in notify_on:
fetchers.append(get_issue_comments)
for fetcher in fetchers:
try:
events = fetcher(owner, repo, pr_number)
new = new_events_since(events, cursor)
for event in new:
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] PR #%d fetch error: %s", repo_slug, pr_number, e)
log_error(repo_slug, e, state)
return state
# Also check pr_closed if configured (PRs that closed since cursor)
if "pr_closed" in notify_on:
try:
closed_data = gh_api(f"/repos/{owner}/{repo}/pulls?state=closed")
for pr_raw in closed_data:
closed_at = pr_raw.get("closed_at") or ""
if closed_at > cursor:
pr = {"number": pr_raw["number"], "title": pr_raw["title"], "html_url": pr_raw["html_url"]}
actor = (pr_raw.get("user") or {}).get("login", "unknown")
merged = pr_raw.get("merged_at") is not None
action = "merged" if merged else "closed"
event = {
"event_type": "pr_closed",
"action": action,
"created_at": closed_at,
"actor": actor,
"body": pr_raw.get("body") or "",
"url": pr_raw["html_url"],
}
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] Failed to fetch closed PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
# Sort all new events by created_at ascending
all_new_events.sort(key=lambda x: x[1]["created_at"])
for pr, event in all_new_events:
text = format_notification(repo_slug, pr, event)
notify(text)
log.info("[%s] Notified: PR #%d %s by %s", repo_slug, pr["number"], event["action"], event["actor"])
# Update cursor
if all_new_events:
repo_state["cursor"] = max(e["created_at"] for _, e in all_new_events)
else:
repo_state["cursor"] = now_ts
reset_errors(repo_slug, state)
return state
def main() -> None:
config = load_config(CONFIG_PATH)
state = load_state(STATE_PATH)
for repo_cfg in config.get("repos", []):
state = poll_repo(repo_cfg, state)
save_state(state, STATE_PATH)
sys.exit(0)
if __name__ == "__main__":
main()