Files
hans-tools/tools/gh-monitor/poll.py
hansheinemann 78003c0e33 fix(gh-monitor): notify main session immediately on new PR activity (#3)
* fix(gh-monitor): notify main session immediately on new PR activity

Previously, dispatch_agent only fired an isolated agent to handle the event.
The main session (Hans) was never directly notified, so new PR comments
only surfaced when Andrew messaged directly.

Now calls notify() immediately before dispatching the agent, so the main
session gets a system event about the new activity in real-time.

* refactor(gh-monitor): route PR activity through main session instead of isolated agents

Replaced dispatch_agent() (which spawned isolated cron jobs) with
dispatch_to_main() (which sends a system event to the main session).

This way Hans handles PR comments directly in the main session —
no sync issues between isolated and main sessions, no lost context.
2026-03-16 11:22:35 -04:00

325 lines
12 KiB
Python

#!/usr/bin/env python3
"""gh-monitor: polls GitHub PRs and fires openclaw notifications on new activity."""
import json
import logging
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
import yaml
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
CONFIG_PATH = BASE_DIR / "config" / "watched.yaml"
STATE_PATH = BASE_DIR / "state" / "last_seen.json"
ERRORS_LOG = BASE_DIR / "state" / "errors.log"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("gh-monitor")
# ---------------------------------------------------------------------------
# Custom exception
# ---------------------------------------------------------------------------
class GHAPIError(Exception):
pass
# ---------------------------------------------------------------------------
# STEP 3 — config + state loader
# ---------------------------------------------------------------------------
def load_config(path: Path) -> dict:
if not path.exists():
raise FileNotFoundError(f"Config not found: {path}")
with open(path) as f:
return yaml.safe_load(f)
def load_state(path: Path) -> dict:
if not path.exists():
return {}
with open(path) as f:
return json.load(f)
def save_state(state: dict, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(state, f, indent=2)
tmp.rename(path)
# ---------------------------------------------------------------------------
# STEP 4 — GitHub API client
# ---------------------------------------------------------------------------
def gh_api(endpoint: str) -> list | dict:
result = subprocess.run(
["gh", "api", "--paginate", endpoint],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GHAPIError(f"gh api failed ({result.returncode}): {result.stderr.strip()}")
return json.loads(result.stdout)
def get_open_prs(owner: str, repo: str) -> list[dict]:
data = gh_api(f"/repos/{owner}/{repo}/pulls?state=open")
return [{"number": pr["number"], "title": pr["title"], "html_url": pr["html_url"]} for pr in data]
# ---------------------------------------------------------------------------
# STEP 5 — event fetchers
# ---------------------------------------------------------------------------
def _normalize(event_type: str, items: list[dict]) -> list[dict]:
"""Normalize raw GitHub API items into a common event dict shape."""
events = []
for item in items:
actor = (item.get("user") or item.get("author") or {}).get("login", "unknown")
body = item.get("body") or ""
url = item.get("html_url") or ""
created_at = item.get("submitted_at") or item.get("created_at") or ""
# review state maps to a human-readable action
action = item.get("state", "").lower() if event_type == "review_submitted" else event_type
events.append({
"id": item.get("id"),
"event_type": event_type,
"action": action or event_type,
"created_at": created_at,
"actor": actor,
"body": body,
"url": url,
})
return events
def get_reviews(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews")
return _normalize("review_submitted", items)
def get_review_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/comments")
return _normalize("review_comment", items)
def get_issue_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
items = gh_api(f"/repos/{owner}/{repo}/issues/{pr_number}/comments")
return _normalize("issue_comment", items)
# ---------------------------------------------------------------------------
# STEP 6 — event diffing
# ---------------------------------------------------------------------------
def new_events_since(events: list[dict], cursor_ts: str, seen_ids: set) -> list[dict]:
filtered = [
e for e in events
if e["created_at"] >= cursor_ts and e.get("id") not in seen_ids
]
return sorted(filtered, key=lambda e: e["created_at"])
# ---------------------------------------------------------------------------
# STEP 7 — notification sender
# ---------------------------------------------------------------------------
def notify(text: str) -> None:
result = subprocess.run(
["openclaw", "system", "event", "--text", text, "--mode", "now"],
capture_output=True,
text=True,
)
if result.returncode != 0:
log.warning("notify failed (%d): %s", result.returncode, result.stderr.strip())
def format_notification(repo_slug: str, pr: dict, event: dict) -> str:
number = pr["number"]
title = pr["title"]
actor = event["actor"]
action = event["action"]
body_preview = (event["body"] or "")[:200]
url = event["url"]
return f'[gh-monitor] PR #{number} "{title}"{actor} {action}:\n"{body_preview}"\n{url}'
def dispatch_to_main(repo_slug: str, pr: dict, event: dict) -> None:
"""
Send a system event to the main session with full context so Hans handles
the PR comment directly — no isolated agent needed.
"""
number = pr["number"]
title = pr["title"]
actor = event["actor"]
action = event["action"]
body = (event["body"] or "")[:500]
url = event["url"]
text = (
f"[gh-monitor] New PR activity on {repo_slug}#{number} \"{title}\":\n"
f"{actor} {action}:\n"
f'"{body}"\n'
f"{url}\n\n"
f"Read the full comment, address it if needed (code change + GitHub reply), "
f"and notify Andrew on Signal when done."
)
notify(text)
log.info("Notified main session for PR #%d %s by %s", number, action, actor)
# ---------------------------------------------------------------------------
# STEP 8 — error tracking
# ---------------------------------------------------------------------------
def log_error(repo_slug: str, error: Exception, state: dict) -> None:
# Append to errors.log
ERRORS_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(ERRORS_LOG, "a") as f:
ts = datetime.now(timezone.utc).isoformat()
f.write(f"{ts} [{repo_slug}] {error}\n")
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = repo_state.get("consecutive_errors", 0) + 1
if repo_state["consecutive_errors"] >= 3 and not repo_state.get("error_alerted"):
notify(f"[gh-monitor] {repo_slug} has failed {repo_state['consecutive_errors']} times in a row — check errors.log")
repo_state["error_alerted"] = True
def reset_errors(repo_slug: str, state: dict) -> None:
repo_state = state.setdefault(repo_slug, {})
repo_state["consecutive_errors"] = 0
repo_state["error_alerted"] = False
# ---------------------------------------------------------------------------
# STEP 9 — main poll loop
# ---------------------------------------------------------------------------
def poll_repo(repo_cfg: dict, state: dict) -> dict:
owner = repo_cfg["owner"]
repo = repo_cfg["repo"]
notify_on = set(repo_cfg.get("notify_on", []))
repo_slug = f"{owner}/{repo}"
repo_state = state.setdefault(repo_slug, {})
now_ts = datetime.now(timezone.utc).isoformat()
cursor = repo_state.get("cursor", now_ts)
seen_ids: set = set(repo_state.get("seen_ids", []))
# First run: cursor = now (no backfill)
if "cursor" not in repo_state:
repo_state["cursor"] = now_ts
log.info("[%s] First run — cursor set to now, no backfill.", repo_slug)
return state
try:
open_prs = get_open_prs(owner, repo)
except GHAPIError as e:
log.error("[%s] Failed to fetch PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
all_new_events: list[tuple[dict, dict]] = [] # (pr, event)
my_login = repo_cfg.get("ignore_actor", "hansheinemann")
for pr in open_prs:
pr_number = pr["number"]
fetchers = []
if "review_submitted" in notify_on:
fetchers.append(get_reviews)
if "review_comment" in notify_on:
fetchers.append(get_review_comments)
if "issue_comment" in notify_on:
fetchers.append(get_issue_comments)
for fetcher in fetchers:
try:
events = fetcher(owner, repo, pr_number)
new = new_events_since(events, cursor, seen_ids)
for event in new:
if event.get("actor") == my_login:
log.debug("Skipping own event from %s", my_login)
if event.get("id"):
seen_ids.add(event["id"])
continue
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] PR #%d fetch error: %s", repo_slug, pr_number, e)
log_error(repo_slug, e, state)
return state
# Also check pr_closed if configured (PRs that closed since cursor)
if "pr_closed" in notify_on:
try:
closed_data = gh_api(f"/repos/{owner}/{repo}/pulls?state=closed")
for pr_raw in closed_data:
closed_at = pr_raw.get("closed_at") or ""
if closed_at >= cursor and pr_raw.get("id") not in seen_ids:
pr = {"number": pr_raw["number"], "title": pr_raw["title"], "html_url": pr_raw["html_url"]}
actor = (pr_raw.get("user") or {}).get("login", "unknown")
merged = pr_raw.get("merged_at") is not None
action = "merged" if merged else "closed"
event = {
"id": pr_raw.get("id"),
"event_type": "pr_closed",
"action": action,
"created_at": closed_at,
"actor": actor,
"body": pr_raw.get("body") or "",
"url": pr_raw["html_url"],
}
all_new_events.append((pr, event))
except GHAPIError as e:
log.error("[%s] Failed to fetch closed PRs: %s", repo_slug, e)
log_error(repo_slug, e, state)
return state
# Sort all new events by created_at ascending
all_new_events.sort(key=lambda x: x[1]["created_at"])
for pr, event in all_new_events:
log.info("[%s] New event: PR #%d %s by %s", repo_slug, pr["number"], event["action"], event["actor"])
dispatch_to_main(repo_slug, pr, event)
if event.get("id"):
seen_ids.add(event["id"])
# Update cursor and seen_ids
if all_new_events:
repo_state["cursor"] = max(e["created_at"] for _, e in all_new_events)
else:
repo_state["cursor"] = now_ts
# Keep seen_ids bounded — only retain IDs from events at or after the cursor
repo_state["seen_ids"] = list(seen_ids)
reset_errors(repo_slug, state)
return state
def main() -> None:
config = load_config(CONFIG_PATH)
state = load_state(STATE_PATH)
for repo_cfg in config.get("repos", []):
state = poll_repo(repo_cfg, state)
save_state(state, STATE_PATH)
sys.exit(0)
if __name__ == "__main__":
main()