Files
hans-tools/tools/gh-monitor/poll.py

353 lines
12 KiB
Python

#!/usr/bin/env python3
"""gh-monitor: polls GitHub PRs and fires openclaw notifications on new activity."""
import json
import logging
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
import yaml
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
CONFIG_PATH = BASE_DIR / "config" / "watched.yaml"
STATE_PATH = BASE_DIR / "state" / "last_seen.json"
ERRORS_LOG = BASE_DIR / "state" / "errors.log"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("gh-monitor")
# ---------------------------------------------------------------------------
# Custom exception
# ---------------------------------------------------------------------------
class GHAPIError(Exception):
pass
# ---------------------------------------------------------------------------
# STEP 3 — config + state loader
# ---------------------------------------------------------------------------
def load_config(path: Path) -> dict:
if not path.exists():
raise FileNotFoundError(f"Config not found: {path}")
with open(path) as f:
return yaml.safe_load(f)
def load_state(path: Path) -> dict:
if not path.exists():
return {}
with open(path) as f:
return json.load(f)
def save_state(state: dict, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(state, f, indent=2)
tmp.rename(path)
# ---------------------------------------------------------------------------
# STEP 4 — GitHub API client
# ---------------------------------------------------------------------------
def gh_api(endpoint: str) -> list | dict:
result = subprocess.run(
["gh", "api", "--paginate", endpoint],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GHAPIError(f"gh api failed ({result.returncode}): {result.stderr.strip()}")
return json.loads(result.stdout)
def get_open_prs(owner: str, repo: str) -> list[dict]:
data = gh_api(f"/repos/{owner}/{repo}/pulls?state=open")
return [{"number": pr["number"], "title": pr["title"], "html_url": pr["html_url"]} for pr in data]
# ---------------------------------------------------------------------------
# STEP 5 — event fetchers
# ---------------------------------------------------------------------------
def _normalize(event_type: str, items: list[dict]) -> list[dict]:
"""Normalize raw GitHub API items into a common event dict shape."""
events = []
for item in items:
actor = (item.get("user") or item.get("author") or {}).get("login", "unknown")
body = item.get("body") or ""
url = item.get("html_url") or ""
created_at = item.get("submitted_at") or item.get("created_at") or ""
# review state maps to a human-readable action
action = item.get("state", "").lower() if event_type == "review_submitted" else event_type
events.append({
"id": item.get("id"),
"event_type": event_type,
"action": action or event_type,
"created_at": created_at,
"actor": actor,
"body": body,
"url": url,
})
return events
def get_reviews(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews")
return _normalize("review_submitted", items)
def get_review_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/comments")
return _normalize("review_comment", items)
def get_issue_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/issues/{pr_number}/comments")
return _normalize("issue_comment", items)
# ---------------------------------------------------------------------------
# STEP 6 — event diffing
# ---------------------------------------------------------------------------
def new_events_since(events: list[dict], cursor_ts: str, seen_ids: set) -> list[dict]:
filtered = [
e for e in events
if e["created_at"] >= cursor_ts and e.get("id") not in seen_ids
]
return sorted(filtered, key=lambda e: e["created_at"])
# ---------------------------------------------------------------------------
# STEP 7 — notification sender
# ---------------------------------------------------------------------------
def notify(text: str) -> None:
result = subprocess.run(
["openclaw", "system", "event", "--text", text, "--mode", "now"],
capture_output=True,
text=True,
)
if result.returncode != 0:
log.warning("notify failed (%d): %s", result.returncode, result.stderr.strip())
def format_notification(repo_slug: str, pr: dict, event: dict) -> str:
number = pr["number"]
title = pr["title"]
actor = event["actor"]
action = event["action"]
body_preview = (event["body"] or "")[:200]
url = event["url"]
return f'[gh-monitor] PR #{number} "{title}"{actor} {action}:\n"{body_preview}"\n{url}'
def dispatch_agent(repo_slug: str, pr: dict, event: dict) -> None:
"""
Fire a one-shot isolated agentTurn via `openclaw cron add` to handle the event.
The agent reads the PR, replies to the comment, and pushes fixes if needed.
"""
number = pr["number"]
title = pr["title"]
actor = event["actor"]
action = event["action"]
body = (event["body"] or "")[:500]
url = event["url"]
message = (
f"gh-monitor detected new PR activity — act on it now.\n\n"
f"Repo: {repo_slug}\n"
f"PR #{number}: {title}\n"
f"Event: {actor} {action}\n"
f"Comment: {body}\n"
f"URL: {url}\n\n"
f"Instructions:\n"
f"1. Read the full PR and the comment at the URL above.\n"
f"2. Reply to the comment on GitHub with a thoughtful response.\n"
f"3. If the comment requests a code change, implement it on the PR branch, "
f"commit, and push.\n"
f"4. Keep replies concise and technical."
)
job_name = f"gh-monitor-pr{number}-{event.get('id', 'evt')}"
try:
result = subprocess.run(
[
"openclaw", "cron", "add",
"--name", job_name,
"--at", "1m",
"--session", "isolated",
"--message", message,
"--delete-after-run",
"--announce",
"--timeout-seconds", "300",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0:
log.info("Dispatched agent for PR #%d %s", number, action)
else:
log.warning("Failed to dispatch agent for PR #%d: %s", number, result.stderr.strip())
except Exception as exc:
log.warning("Failed to dispatch agent for PR #%d: %s", number, exc)
# ---------------------------------------------------------------------------
# STEP 8 — error tracking
# ---------------------------------------------------------------------------
def log_error(repo_slug: str, error: Exception, state: dict) -> None:
# Append to errors.log
ERRORS_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(ERRORS_LOG, "a") as f:
ts = datetime.now(timezone.utc).isoformat()
f.write(f"{ts} [{repo_slug}] {error}\n")
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = repo_state.get("consecutive_errors", 0) + 1
if repo_state["consecutive_errors"] >= 3 and not repo_state.get("error_alerted"):
notify(f"[gh-monitor] {repo_slug} has failed {repo_state['consecutive_errors']} times in a row — check errors.log")
repo_state["error_alerted"] = True
def reset_errors(repo_slug: str, state: dict) -> None:
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = 0
repo_state["error_alerted"] = False
# ---------------------------------------------------------------------------
# STEP 9 — main poll loop
# ---------------------------------------------------------------------------
def poll_repo(repo_cfg: dict, state: dict) -> dict:
owner = repo_cfg["owner"]
repo = repo_cfg["repo"]
notify_on = set(repo_cfg.get("notify_on", []))
repo_slug = f"{owner}/{repo}"
repo_state = state.setdefault(repo_slug, {})
now_ts = datetime.now(timezone.utc).isoformat()
cursor = repo_state.get("cursor", now_ts)
seen_ids: set = set(repo_state.get("seen_ids", []))
# First run: cursor = now (no backfill)
if "cursor" not in repo_state:
repo_state["cursor"] = now_ts
log.info("[%s] First run — cursor set to now, no backfill.", repo_slug)
return state
try:
open_prs = get_open_prs(owner, repo)
except GHAPIError as e:
log.error("[%s] Failed to fetch PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
all_new_events: list[tuple[dict, dict]] = [] # (pr, event)
my_login = repo_cfg.get("ignore_actor", "hansheinemann")
for pr in open_prs:
pr_number = pr["number"]
fetchers = []
if "review_submitted" in notify_on:
fetchers.append(get_reviews)
if "review_comment" in notify_on:
fetchers.append(get_review_comments)
if "issue_comment" in notify_on:
fetchers.append(get_issue_comments)
for fetcher in fetchers:
try:
events = fetcher(owner, repo, pr_number)
new = new_events_since(events, cursor, seen_ids)
for event in new:
if event.get("actor") == my_login:
log.debug("Skipping own event from %s", my_login)
if event.get("id"):
seen_ids.add(event["id"])
continue
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] PR #%d fetch error: %s", repo_slug, pr_number, e)
log_error(repo_slug, e, state)
return state
# Also check pr_closed if configured (PRs that closed since cursor)
if "pr_closed" in notify_on:
try:
closed_data = gh_api(f"/repos/{owner}/{repo}/pulls?state=closed")
for pr_raw in closed_data:
closed_at = pr_raw.get("closed_at") or ""
if closed_at >= cursor and pr_raw.get("id") not in seen_ids:
pr = {"number": pr_raw["number"], "title": pr_raw["title"], "html_url": pr_raw["html_url"]}
actor = (pr_raw.get("user") or {}).get("login", "unknown")
merged = pr_raw.get("merged_at") is not None
action = "merged" if merged else "closed"
event = {
"id": pr_raw.get("id"),
"event_type": "pr_closed",
"action": action,
"created_at": closed_at,
"actor": actor,
"body": pr_raw.get("body") or "",
"url": pr_raw["html_url"],
}
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] Failed to fetch closed PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
# Sort all new events by created_at ascending
all_new_events.sort(key=lambda x: x[1]["created_at"])
for pr, event in all_new_events:
log.info("[%s] New event: PR #%d %s by %s", repo_slug, pr["number"], event["action"], event["actor"])
dispatch_agent(repo_slug, pr, event)
if event.get("id"):
seen_ids.add(event["id"])
# Update cursor and seen_ids
if all_new_events:
repo_state["cursor"] = max(e["created_at"] for _, e in all_new_events)
else:
repo_state["cursor"] = now_ts
# Keep seen_ids bounded — only retain IDs from events at or after the cursor
repo_state["seen_ids"] = list(seen_ids)
reset_errors(repo_slug, state)
return state
def main() -> None:
config = load_config(CONFIG_PATH)
state = load_state(STATE_PATH)
for repo_cfg in config.get("repos", []):
state = poll_repo(repo_cfg, state)
save_state(state, STATE_PATH)
sys.exit(0)
if __name__ == "__main__":
main()