""" cli/agency.py Command-line interface for the-agency pipeline. Subcommands ----------- run Start a new run, print run_id. watch Tail live blackboard events. inspect [--tier T] [--brief B] Show run tree / artifact detail. approve [--note "..."] Approve current inspection gate. reject --reason "..." Reject current gate (re-invoke tier). pause Force-pause at next tier boundary. resume Release a manual pause. Gate approval UX ---------------- `agency approve ` writes a gate_approved event directly to the blackboard. The runner only polls the blackboard — it does not care how the event got there. This makes approval work on any platform that has filesystem access to the runs/ directory. """ from __future__ import annotations import argparse import json import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional # --------------------------------------------------------------------------- # Blackboard import (optional — degrade gracefully if core not on sys.path) # --------------------------------------------------------------------------- try: from core.blackboard import Blackboard _HAS_BLACKBOARD = True except ImportError: _HAS_BLACKBOARD = False # --------------------------------------------------------------------------- # ANSI colours (degraded to no-op if not a TTY) # --------------------------------------------------------------------------- _IS_TTY = sys.stdout.isatty() def _c(code: str, text: str) -> str: if not _IS_TTY: return text return f"\033[{code}m{text}\033[0m" def _bold(t: str) -> str: return _c("1", t) def _dim(t: str) -> str: return _c("2", t) def _green(t: str) -> str: return _c("32", t) def _yellow(t: str) -> str: return _c("33", t) def _red(t: str) -> str: return _c("31", t) def _cyan(t: str) -> str: return _c("36", t) def _magenta(t: str) -> str: return _c("35", t) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _require_blackboard(run_id: str) -> "Blackboard": if not _HAS_BLACKBOARD: _die("Could not import core.blackboard. Make sure you are running from the project root.") db_path = Path("runs") / run_id / "blackboard.db" if not db_path.exists(): _die(f"No blackboard found for run_id={run_id!r}. Expected: {db_path}") return Blackboard(run_id) def _die(msg: str) -> None: print(_red(f"Error: {msg}"), file=sys.stderr) sys.exit(1) def _fmt_ts(iso: Optional[str]) -> str: if not iso: return "" try: dt = datetime.fromisoformat(iso) return dt.strftime("%H:%M:%S") except ValueError: return iso[:19] def _parse_detail(raw: Optional[str]) -> dict: if not raw: return {} try: return json.loads(raw) except (json.JSONDecodeError, TypeError): return {"raw": raw} # --------------------------------------------------------------------------- # Event rendering # --------------------------------------------------------------------------- _KIND_SYMBOLS: dict[str, str] = { "spawned": "→", "completed": "✓", "failed": "✗", "escalated": "↑", "retried": "↺", "gate_pending": "⏸", "gate_approved": "✓", "gate_rejected": "✗", "gate_paused": "⏸", "gate_resumed": "▶", "path_amendment": "~", "log": " ", } _KIND_COLOUR: dict[str, str] = { "spawned": "36", # cyan "completed": "32", # green "failed": "31", # red "escalated": "33", # yellow "retried": "33", # yellow "gate_pending": "35", # magenta "gate_approved": "32", # green "gate_rejected": "31", # red "gate_paused": "35", # magenta "gate_resumed": "32", # green "path_amendment": "33", # yellow "log": "0", # default } def _render_event(ev: dict, run_id: str) -> str: kind = ev.get("kind", "") ts = _fmt_ts(ev.get("created_at")) detail = _parse_detail(ev.get("detail")) sym = _KIND_SYMBOLS.get(kind, "·") col = _KIND_COLOUR.get(kind, "0") kind_str = _c(col, f"{sym} {kind:<18}") # Build a short message from detail msg_parts: list[str] = [] if kind == "log": level = detail.get("level", "info") message = detail.get("message", "") level_col = "33" if level == "warning" else ("31" if level == "error" else "0") msg_parts.append(_c(level_col, message)) elif kind in ("gate_pending", "gate_approved", "gate_rejected"): gate = detail.get("gate", "") summary = detail.get("summary", "") reason = detail.get("reason", "") if gate: msg_parts.append(_bold(f"[{gate}]")) if summary: msg_parts.append(summary) if reason: msg_parts.append(_dim(f"({reason})")) elif kind in ("spawned", "completed", "failed", "escalated", "retried"): tier = detail.get("tier") role = detail.get("role", "") ws = detail.get("workstream", "") task_id = detail.get("task_id", "") reason = detail.get("reason", detail.get("error", "")) if tier: msg_parts.append(_bold(f"T{tier}")) if role: msg_parts.append(role) if ws: msg_parts.append(_dim(f"ws={ws}")) if task_id: msg_parts.append(_dim(f"task={task_id}")) if reason: msg_parts.append(_dim(f"— {reason[:80]}")) elif kind == "path_amendment": proposed_by = detail.get("proposed_by", "") reason = detail.get("reason", "") msg_parts.append(f"{proposed_by}: {reason}") else: for k, v in list(detail.items())[:3]: msg_parts.append(f"{k}={v!r}") msg = " ".join(msg_parts) run_prefix = _dim(f"[{run_id}]") ts_str = _dim(ts) return f"{run_prefix} {ts_str} {kind_str} {msg}" # --------------------------------------------------------------------------- # Subcommand: run # --------------------------------------------------------------------------- def cmd_run(args: argparse.Namespace) -> None: """Start a new pipeline run.""" config_path = args.config if not os.path.exists(config_path): _die(f"Config file not found: {config_path}") # Import here to keep startup fast for non-run commands try: from core.team_runner import TeamRunner except ImportError as exc: _die(f"Could not import core.team_runner: {exc}") dry = getattr(args, "dry_run", False) runner = TeamRunner(config_path=config_path, dry_run=dry) print(f"Starting run {_bold(runner.run_id)} …") print(_dim(f" Watch: agency watch {runner.run_id}")) print(_dim(f" Inspect: agency inspect {runner.run_id}")) try: runner.run() print(_green(f"Run {runner.run_id} complete.")) except KeyboardInterrupt: print(_yellow(f"\nRun {runner.run_id} interrupted.")) sys.exit(1) except Exception as exc: print(_red(f"Run {runner.run_id} failed: {exc}")) sys.exit(1) # --------------------------------------------------------------------------- # Subcommand: watch # --------------------------------------------------------------------------- def cmd_watch(args: argparse.Namespace) -> None: """Tail live blackboard events for a run.""" bb = _require_blackboard(args.run_id) run_id = args.run_id poll = getattr(args, "poll", 2.0) print(_bold(f"Watching run {run_id} …"), _dim("(Ctrl-C to stop)")) seen_ids: set[str] = set() try: while True: events = bb.get_all_events(limit=1000) for ev in events: eid = ev.get("event_id", "") if eid in seen_ids: continue seen_ids.add(eid) print(_render_event(ev, run_id)) sys.stdout.flush() # Check if run is done summary = bb.get_run_summary() run_status = summary.get("status", "") if run_status in ("done", "review", "failed"): print() if run_status == "review": print(_green(f"Run {run_id} complete — status: {run_status}")) elif run_status == "failed": print(_red(f"Run {run_id} failed")) else: print(_bold(f"Run {run_id} status: {run_status}")) break time.sleep(poll) except KeyboardInterrupt: print(_dim("\nStopped watching.")) finally: bb.close() # --------------------------------------------------------------------------- # Subcommand: inspect # --------------------------------------------------------------------------- def cmd_inspect(args: argparse.Namespace) -> None: """Show a live tree of run state.""" bb = _require_blackboard(args.run_id) run_id = args.run_id tier_filter: Optional[int] = getattr(args, "tier", None) brief_filter: Optional[str] = getattr(args, "brief", None) try: summary = bb.get_run_summary() if "error" in summary: _die(summary["error"]) if brief_filter: _inspect_brief(bb, run_id, brief_filter) return if tier_filter: _inspect_tier(bb, run_id, tier_filter) return _inspect_run_tree(bb, run_id, summary) finally: bb.close() def _inspect_run_tree(bb: "Blackboard", run_id: str, summary: dict) -> None: status = summary.get("status", "?") status_str = ( _green(status) if status in ("done", "review") else _red(status) if status == "failed" else _yellow(status) ) print(f"\nRun {_bold(run_id)} [{status_str}]") print(_dim(f" Goal: {summary.get('goal', '')}")) print() workstreams = bb.get_workstreams() if not workstreams: print(_dim(" No workstreams yet.")) else: for ws in workstreams: ws_status = ws.get("status", "?") ws_col = "32" if ws_status == "done" else ("31" if ws_status == "failed" else "33") ws_line = f" ├── {ws.get('name', ws.get('workstream_id'))} [{_c(ws_col, ws_status)}]" print(ws_line) briefs = bb.get_briefs(workstream_id=ws["workstream_id"]) for b in briefs: b_status = b.get("status", "?") b_col = "32" if b_status == "done" else ("31" if b_status == "failed" else "0") print( f" │ ├── T{b.get('tier')} {b.get('role')} " f"[{_c(b_col, b_status)}] " f"retries={b.get('retry_count', 0)} " f"{_dim(b.get('brief_id', '')[:8])}" ) print() # Summary counts briefs_summary = summary.get("briefs", {}) events_summary = summary.get("events", {}) print( _dim( f" Briefs: {briefs_summary} " f"Events: {events_summary}" ) ) def _inspect_tier(bb: "Blackboard", run_id: str, tier: int) -> None: briefs = bb.get_briefs(tier=tier) print(f"\nRun {_bold(run_id)} — T{tier} briefs ({len(briefs)})\n") for b in briefs: status = b.get("status", "?") col = "32" if status == "done" else ("31" if status == "failed" else "0") print( f" {_dim(b.get('brief_id', '')[:8])} " f"{b.get('role', ''):<22} [{_c(col, status)}] " f"retries={b.get('retry_count', 0)}" ) def _inspect_brief(bb: "Blackboard", run_id: str, brief_id: str) -> None: briefs = bb.get_briefs() match = next( (b for b in briefs if b.get("brief_id", "").startswith(brief_id)), None, ) if not match: _die(f"Brief {brief_id!r} not found in run {run_id}") print(f"\nBrief {_bold(match['brief_id'])}") print(f" Tier: T{match.get('tier')}") print(f" Role: {match.get('role')}") print(f" Status: {match.get('status')}") print(f" Retries: {match.get('retry_count', 0)}") print() payload_raw = match.get("payload") if payload_raw: try: payload = json.loads(payload_raw) print(_bold("Payload (brief):")) print(json.dumps(payload, indent=2)) except (json.JSONDecodeError, TypeError): print(payload_raw) print() result_raw = match.get("result") if result_raw: try: result = json.loads(result_raw) print(_bold("Result:")) print(json.dumps(result, indent=2)) except (json.JSONDecodeError, TypeError): print(result_raw) # --------------------------------------------------------------------------- # Subcommand: approve # --------------------------------------------------------------------------- def cmd_approve(args: argparse.Namespace) -> None: """Approve the current inspection gate, writing gate_approved to the blackboard.""" bb = _require_blackboard(args.run_id) note = getattr(args, "note", None) or "" try: bb.log_event( "gate_approved", detail={"approved_by": "cli", "note": note, "timestamp": _now_iso()}, ) print(_green(f"Gate approved for run {args.run_id}.")) if note: print(_dim(f" Note: {note}")) finally: bb.close() # --------------------------------------------------------------------------- # Subcommand: reject # --------------------------------------------------------------------------- def cmd_reject(args: argparse.Namespace) -> None: """Reject the current inspection gate, writing gate_rejected to the blackboard.""" bb = _require_blackboard(args.run_id) reason = getattr(args, "reason", None) or "rejected via CLI" try: bb.log_event( "gate_rejected", detail={"rejected_by": "cli", "reason": reason, "timestamp": _now_iso()}, ) print(_yellow(f"Gate rejected for run {args.run_id}.")) print(_dim(f" Reason: {reason}")) finally: bb.close() # --------------------------------------------------------------------------- # Subcommand: pause # --------------------------------------------------------------------------- def cmd_pause(args: argparse.Namespace) -> None: """Force-pause the run at the next tier boundary.""" bb = _require_blackboard(args.run_id) try: bb.log_event( "gate_paused", detail={"paused_by": "cli", "timestamp": _now_iso()}, ) print(_yellow(f"Pause signal written for run {args.run_id}.")) print(_dim(f" Run will pause at the next tier boundary.")) print(_dim(f" To resume: agency resume {args.run_id}")) finally: bb.close() # --------------------------------------------------------------------------- # Subcommand: resume # --------------------------------------------------------------------------- def cmd_resume(args: argparse.Namespace) -> None: """Release a manual pause.""" bb = _require_blackboard(args.run_id) try: bb.log_event( "gate_resumed", detail={"resumed_by": "cli", "timestamp": _now_iso()}, ) print(_green(f"Resume signal written for run {args.run_id}.")) finally: bb.close() # --------------------------------------------------------------------------- # Argument parser # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="agency", description="the-agency pipeline CLI", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: agency run config/team.yaml agency watch abc12345 agency inspect abc12345 agency inspect abc12345 --tier 2 agency inspect abc12345 --brief a1b2c3d4 agency approve abc12345 agency approve abc12345 --note "looks good" agency reject abc12345 --reason "T2 missed the caching layer" agency pause abc12345 agency resume abc12345 """, ) sub = parser.add_subparsers(dest="command", metavar="") sub.required = True # run p_run = sub.add_parser("run", help="Start a new pipeline run") p_run.add_argument("config", nargs="?", default="config/team.yaml", help="Path to team.yaml (default: config/team.yaml)") p_run.add_argument("--dry-run", action="store_true", help="Log actions without spawning agents") p_run.set_defaults(func=cmd_run) # watch p_watch = sub.add_parser("watch", help="Tail live blackboard events") p_watch.add_argument("run_id", help="Run ID to watch") p_watch.add_argument("--poll", type=float, default=2.0, help="Poll interval in seconds (default: 2)") p_watch.set_defaults(func=cmd_watch) # inspect p_inspect = sub.add_parser("inspect", help="Show run state tree") p_inspect.add_argument("run_id", help="Run ID to inspect") p_inspect.add_argument("--tier", type=int, default=None, help="Filter to a specific tier (e.g. --tier 2)") p_inspect.add_argument("--brief", default=None, help="Show full brief+result for brief_id prefix") p_inspect.set_defaults(func=cmd_inspect) # approve p_approve = sub.add_parser("approve", help="Approve current inspection gate") p_approve.add_argument("run_id", help="Run ID") p_approve.add_argument("--note", default="", help="Optional note written to blackboard") p_approve.set_defaults(func=cmd_approve) # reject p_reject = sub.add_parser("reject", help="Reject current inspection gate") p_reject.add_argument("run_id", help="Run ID") p_reject.add_argument("--reason", default="rejected via CLI", help="Reason for rejection (shown in blackboard + logs)") p_reject.set_defaults(func=cmd_reject) # pause p_pause = sub.add_parser("pause", help="Force-pause at next tier boundary") p_pause.add_argument("run_id", help="Run ID") p_pause.set_defaults(func=cmd_pause) # resume p_resume = sub.add_parser("resume", help="Release a manual pause") p_resume.add_argument("run_id", help="Run ID") p_resume.set_defaults(func=cmd_resume) return parser # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(argv: Optional[list[str]] = None) -> None: parser = build_parser() args = parser.parse_args(argv) args.func(args) if __name__ == "__main__": main()