358 lines
13 KiB
Python
358 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""gh-monitor: polls GitHub PRs and fires openclaw notifications on new activity."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paths
|
|
# ---------------------------------------------------------------------------
|
|
BASE_DIR = Path(__file__).parent
|
|
CONFIG_PATH = BASE_DIR / "config" / "watched.yaml"
|
|
STATE_PATH = BASE_DIR / "state" / "last_seen.json"
|
|
ERRORS_LOG = BASE_DIR / "state" / "errors.log"
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
log = logging.getLogger("gh-monitor")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Custom exception
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class GHAPIError(Exception):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 3 — config + state loader
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_config(path: Path) -> dict:
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Config not found: {path}")
|
|
with open(path) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
def load_state(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {}
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_state(state: dict, path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = path.with_suffix(".tmp")
|
|
with open(tmp, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
tmp.rename(path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 4 — GitHub API client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def gh_api(endpoint: str) -> list | dict:
|
|
result = subprocess.run(
|
|
["gh", "api", "--paginate", endpoint],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
raise GHAPIError(f"gh api failed ({result.returncode}): {result.stderr.strip()}")
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def get_open_prs(owner: str, repo: str) -> list[dict]:
|
|
data = gh_api(f"/repos/{owner}/{repo}/pulls?state=open")
|
|
return [{"number": pr["number"], "title": pr["title"], "html_url": pr["html_url"]} for pr in data]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 5 — event fetchers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _normalize(event_type: str, items: list[dict]) -> list[dict]:
|
|
"""Normalize raw GitHub API items into a common event dict shape."""
|
|
events = []
|
|
for item in items:
|
|
actor = (item.get("user") or item.get("author") or {}).get("login", "unknown")
|
|
body = item.get("body") or ""
|
|
url = item.get("html_url") or ""
|
|
created_at = item.get("submitted_at") or item.get("created_at") or ""
|
|
# review state maps to a human-readable action
|
|
action = item.get("state", "").lower() if event_type == "review_submitted" else event_type
|
|
events.append({
|
|
"id": item.get("id"),
|
|
"event_type": event_type,
|
|
"action": action or event_type,
|
|
"created_at": created_at,
|
|
"actor": actor,
|
|
"body": body,
|
|
"url": url,
|
|
})
|
|
return events
|
|
|
|
|
|
def get_reviews(owner: str, repo: str, pr_number: int) -> list[dict]:
|
|
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/reviews")
|
|
return _normalize("review_submitted", items)
|
|
|
|
|
|
def get_review_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
|
|
items = gh_api(f"/repos/{owner}/{repo}/pulls/{pr_number}/comments")
|
|
return _normalize("review_comment", items)
|
|
|
|
|
|
def get_issue_comments(owner: str, repo: str, pr_number: int) -> list[dict]:
|
|
items = gh_api(f"/repos/{owner}/{repo}/issues/{pr_number}/comments")
|
|
return _normalize("issue_comment", items)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 6 — event diffing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def new_events_since(events: list[dict], cursor_ts: str, seen_ids: set) -> list[dict]:
|
|
filtered = [
|
|
e for e in events
|
|
if e["created_at"] >= cursor_ts and e.get("id") not in seen_ids
|
|
]
|
|
return sorted(filtered, key=lambda e: e["created_at"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 7 — notification sender
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def notify(text: str) -> None:
|
|
result = subprocess.run(
|
|
["openclaw", "system", "event", "--text", text, "--mode", "now"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
log.warning("notify failed (%d): %s", result.returncode, result.stderr.strip())
|
|
|
|
|
|
def format_notification(repo_slug: str, pr: dict, event: dict) -> str:
|
|
number = pr["number"]
|
|
title = pr["title"]
|
|
actor = event["actor"]
|
|
action = event["action"]
|
|
body_preview = (event["body"] or "")[:200]
|
|
url = event["url"]
|
|
return f'[gh-monitor] PR #{number} "{title}" — {actor} {action}:\n"{body_preview}"\n{url}'
|
|
|
|
|
|
def dispatch_agent(repo_slug: str, pr: dict, event: dict) -> None:
|
|
"""
|
|
Fire a one-shot isolated agentTurn via `openclaw cron add` to handle the event.
|
|
The agent reads the PR, replies to the comment, and pushes fixes if needed.
|
|
"""
|
|
number = pr["number"]
|
|
title = pr["title"]
|
|
actor = event["actor"]
|
|
action = event["action"]
|
|
body = (event["body"] or "")[:500]
|
|
url = event["url"]
|
|
|
|
message = (
|
|
f"gh-monitor detected new PR activity — act on it now.\n\n"
|
|
f"Repo: {repo_slug}\n"
|
|
f"PR #{number}: {title}\n"
|
|
f"Event: {actor} {action}\n"
|
|
f"Comment: {body}\n"
|
|
f"URL: {url}\n\n"
|
|
f"Instructions:\n"
|
|
f"1. Read the full PR and the comment at the URL above.\n"
|
|
f"2. IMMEDIATELY post a reply to the comment on GitHub acknowledging you have "
|
|
f"read it — even if you haven't done the work yet. Something like "
|
|
f"'Got it, looking into this now.' This must happen first, before any code changes.\n"
|
|
f"3. If the comment requests a code change, implement it on the PR branch, "
|
|
f"commit, and push.\n"
|
|
f"4. Post a follow-up GitHub reply with what you did (or why no change was needed).\n"
|
|
f"5. Run this exact command when done to notify Andrew on Signal:\n"
|
|
f" openclaw system event --text \"[gh-monitor] PR #{number}: <one line summary of what you did>\" --mode now\n"
|
|
f"6. Keep replies concise and technical."
|
|
)
|
|
|
|
job_name = f"gh-monitor-pr{number}-{event.get('id', 'evt')}"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"openclaw", "cron", "add",
|
|
"--name", job_name,
|
|
"--at", "1m",
|
|
"--session", "isolated",
|
|
"--message", message,
|
|
"--delete-after-run",
|
|
"--no-deliver",
|
|
"--timeout-seconds", "300",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
if result.returncode == 0:
|
|
log.info("Dispatched agent for PR #%d %s", number, action)
|
|
else:
|
|
log.warning("Failed to dispatch agent for PR #%d: %s", number, result.stderr.strip())
|
|
except Exception as exc:
|
|
log.warning("Failed to dispatch agent for PR #%d: %s", number, exc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 8 — error tracking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def log_error(repo_slug: str, error: Exception, state: dict) -> None:
|
|
# Append to errors.log
|
|
ERRORS_LOG.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(ERRORS_LOG, "a") as f:
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
f.write(f"{ts} [{repo_slug}] {error}\n")
|
|
|
|
repo_state = state.setdefault(repo_slug, {})
|
|
repo_state["consecutive_errors"] = repo_state.get("consecutive_errors", 0) + 1
|
|
|
|
if repo_state["consecutive_errors"] >= 3 and not repo_state.get("error_alerted"):
|
|
notify(f"[gh-monitor] {repo_slug} has failed {repo_state['consecutive_errors']} times in a row — check errors.log")
|
|
repo_state["error_alerted"] = True
|
|
|
|
|
|
def reset_errors(repo_slug: str, state: dict) -> None:
|
|
repo_state = state.setdefault(repo_slug, {})
|
|
repo_state["consecutive_errors"] = 0
|
|
repo_state["error_alerted"] = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# STEP 9 — main poll loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def poll_repo(repo_cfg: dict, state: dict) -> dict:
|
|
owner = repo_cfg["owner"]
|
|
repo = repo_cfg["repo"]
|
|
notify_on = set(repo_cfg.get("notify_on", []))
|
|
repo_slug = f"{owner}/{repo}"
|
|
|
|
repo_state = state.setdefault(repo_slug, {})
|
|
now_ts = datetime.now(timezone.utc).isoformat()
|
|
cursor = repo_state.get("cursor", now_ts)
|
|
seen_ids: set = set(repo_state.get("seen_ids", []))
|
|
|
|
# First run: cursor = now (no backfill)
|
|
if "cursor" not in repo_state:
|
|
repo_state["cursor"] = now_ts
|
|
log.info("[%s] First run — cursor set to now, no backfill.", repo_slug)
|
|
return state
|
|
|
|
try:
|
|
open_prs = get_open_prs(owner, repo)
|
|
except GHAPIError as e:
|
|
log.error("[%s] Failed to fetch PRs: %s", repo_slug, e)
|
|
log_error(repo_slug, e, state)
|
|
return state
|
|
|
|
all_new_events: list[tuple[dict, dict]] = [] # (pr, event)
|
|
|
|
my_login = repo_cfg.get("ignore_actor", "hansheinemann")
|
|
|
|
for pr in open_prs:
|
|
pr_number = pr["number"]
|
|
fetchers = []
|
|
if "review_submitted" in notify_on:
|
|
fetchers.append(get_reviews)
|
|
if "review_comment" in notify_on:
|
|
fetchers.append(get_review_comments)
|
|
if "issue_comment" in notify_on:
|
|
fetchers.append(get_issue_comments)
|
|
|
|
for fetcher in fetchers:
|
|
try:
|
|
events = fetcher(owner, repo, pr_number)
|
|
new = new_events_since(events, cursor, seen_ids)
|
|
for event in new:
|
|
if event.get("actor") == my_login:
|
|
log.debug("Skipping own event from %s", my_login)
|
|
if event.get("id"):
|
|
seen_ids.add(event["id"])
|
|
continue
|
|
all_new_events.append((pr, event))
|
|
except GHAPIError as e:
|
|
log.error("[%s] PR #%d fetch error: %s", repo_slug, pr_number, e)
|
|
log_error(repo_slug, e, state)
|
|
return state
|
|
|
|
# Also check pr_closed if configured (PRs that closed since cursor)
|
|
if "pr_closed" in notify_on:
|
|
try:
|
|
closed_data = gh_api(f"/repos/{owner}/{repo}/pulls?state=closed")
|
|
for pr_raw in closed_data:
|
|
closed_at = pr_raw.get("closed_at") or ""
|
|
if closed_at >= cursor and pr_raw.get("id") not in seen_ids:
|
|
pr = {"number": pr_raw["number"], "title": pr_raw["title"], "html_url": pr_raw["html_url"]}
|
|
actor = (pr_raw.get("user") or {}).get("login", "unknown")
|
|
merged = pr_raw.get("merged_at") is not None
|
|
action = "merged" if merged else "closed"
|
|
event = {
|
|
"id": pr_raw.get("id"),
|
|
"event_type": "pr_closed",
|
|
"action": action,
|
|
"created_at": closed_at,
|
|
"actor": actor,
|
|
"body": pr_raw.get("body") or "",
|
|
"url": pr_raw["html_url"],
|
|
}
|
|
all_new_events.append((pr, event))
|
|
except GHAPIError as e:
|
|
log.error("[%s] Failed to fetch closed PRs: %s", repo_slug, e)
|
|
log_error(repo_slug, e, state)
|
|
return state
|
|
|
|
# Sort all new events by created_at ascending
|
|
all_new_events.sort(key=lambda x: x[1]["created_at"])
|
|
|
|
for pr, event in all_new_events:
|
|
log.info("[%s] New event: PR #%d %s by %s", repo_slug, pr["number"], event["action"], event["actor"])
|
|
dispatch_agent(repo_slug, pr, event)
|
|
if event.get("id"):
|
|
seen_ids.add(event["id"])
|
|
|
|
# Update cursor and seen_ids
|
|
if all_new_events:
|
|
repo_state["cursor"] = max(e["created_at"] for _, e in all_new_events)
|
|
else:
|
|
repo_state["cursor"] = now_ts
|
|
|
|
# Keep seen_ids bounded — only retain IDs from events at or after the cursor
|
|
repo_state["seen_ids"] = list(seen_ids)
|
|
|
|
reset_errors(repo_slug, state)
|
|
return state
|
|
|
|
|
|
def main() -> None:
|
|
config = load_config(CONFIG_PATH)
|
|
state = load_state(STATE_PATH)
|
|
|
|
for repo_cfg in config.get("repos", []):
|
|
state = poll_repo(repo_cfg, state)
|
|
|
|
save_state(state, STATE_PATH)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|