diff --git a/.gitmodules b/.gitmodules index 17b49ae..79fa4eb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "agents"] path = agents - url = https://github.com/coding-with-hans-heinemann/agency-agents.git + url = https://git.tandrewng.com/cw-hans/agency-agents.git diff --git a/cli/__init__.py b/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/agency.py b/cli/agency.py new file mode 100644 index 0000000..d5bd7b4 --- /dev/null +++ b/cli/agency.py @@ -0,0 +1,576 @@ +""" +cli/agency.py +Command-line interface for the-agency pipeline. + +Subcommands +----------- +run Start a new run, print run_id. +watch Tail live blackboard events. +inspect [--tier T] [--brief B] Show run tree / artifact detail. +approve [--note "..."] Approve current inspection gate. +reject --reason "..." Reject current gate (re-invoke tier). +pause Force-pause at next tier boundary. +resume Release a manual pause. + +Gate approval UX +---------------- +`agency approve ` writes a gate_approved event directly to the +blackboard. The runner only polls the blackboard — it does not care how +the event got there. This makes approval work on any platform that has +filesystem access to the runs/ directory. +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +# --------------------------------------------------------------------------- +# Blackboard import (optional — degrade gracefully if core not on sys.path) +# --------------------------------------------------------------------------- +try: + from core.blackboard import Blackboard + _HAS_BLACKBOARD = True +except ImportError: + _HAS_BLACKBOARD = False + + +# --------------------------------------------------------------------------- +# ANSI colours (degraded to no-op if not a TTY) +# --------------------------------------------------------------------------- +_IS_TTY = sys.stdout.isatty() + + +def _c(code: str, text: str) -> str: + if not _IS_TTY: + return text + return f"\033[{code}m{text}\033[0m" + + +def _bold(t: str) -> str: + return _c("1", t) + + +def _dim(t: str) -> str: + return _c("2", t) + + +def _green(t: str) -> str: + return _c("32", t) + + +def _yellow(t: str) -> str: + return _c("33", t) + + +def _red(t: str) -> str: + return _c("31", t) + + +def _cyan(t: str) -> str: + return _c("36", t) + + +def _magenta(t: str) -> str: + return _c("35", t) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _require_blackboard(run_id: str) -> "Blackboard": + if not _HAS_BLACKBOARD: + _die("Could not import core.blackboard. Make sure you are running from the project root.") + db_path = Path("runs") / run_id / "blackboard.db" + if not db_path.exists(): + _die(f"No blackboard found for run_id={run_id!r}. Expected: {db_path}") + return Blackboard(run_id) + + +def _die(msg: str) -> None: + print(_red(f"Error: {msg}"), file=sys.stderr) + sys.exit(1) + + +def _fmt_ts(iso: Optional[str]) -> str: + if not iso: + return "" + try: + dt = datetime.fromisoformat(iso) + return dt.strftime("%H:%M:%S") + except ValueError: + return iso[:19] + + +def _parse_detail(raw: Optional[str]) -> dict: + if not raw: + return {} + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + return {"raw": raw} + + +# --------------------------------------------------------------------------- +# Event rendering +# --------------------------------------------------------------------------- + +_KIND_SYMBOLS: dict[str, str] = { + "spawned": "→", + "completed": "✓", + "failed": "✗", + "escalated": "↑", + "retried": "↺", + "gate_pending": "⏸", + "gate_approved": "✓", + "gate_rejected": "✗", + "gate_paused": "⏸", + "gate_resumed": "▶", + "path_amendment": "~", + "log": " ", +} + +_KIND_COLOUR: dict[str, str] = { + "spawned": "36", # cyan + "completed": "32", # green + "failed": "31", # red + "escalated": "33", # yellow + "retried": "33", # yellow + "gate_pending": "35", # magenta + "gate_approved": "32", # green + "gate_rejected": "31", # red + "gate_paused": "35", # magenta + "gate_resumed": "32", # green + "path_amendment": "33", # yellow + "log": "0", # default +} + + +def _render_event(ev: dict, run_id: str) -> str: + kind = ev.get("kind", "") + ts = _fmt_ts(ev.get("created_at")) + detail = _parse_detail(ev.get("detail")) + sym = _KIND_SYMBOLS.get(kind, "·") + col = _KIND_COLOUR.get(kind, "0") + kind_str = _c(col, f"{sym} {kind:<18}") + + # Build a short message from detail + msg_parts: list[str] = [] + + if kind == "log": + level = detail.get("level", "info") + message = detail.get("message", "") + level_col = "33" if level == "warning" else ("31" if level == "error" else "0") + msg_parts.append(_c(level_col, message)) + elif kind in ("gate_pending", "gate_approved", "gate_rejected"): + gate = detail.get("gate", "") + summary = detail.get("summary", "") + reason = detail.get("reason", "") + if gate: + msg_parts.append(_bold(f"[{gate}]")) + if summary: + msg_parts.append(summary) + if reason: + msg_parts.append(_dim(f"({reason})")) + elif kind in ("spawned", "completed", "failed", "escalated", "retried"): + tier = detail.get("tier") + role = detail.get("role", "") + ws = detail.get("workstream", "") + task_id = detail.get("task_id", "") + reason = detail.get("reason", detail.get("error", "")) + if tier: + msg_parts.append(_bold(f"T{tier}")) + if role: + msg_parts.append(role) + if ws: + msg_parts.append(_dim(f"ws={ws}")) + if task_id: + msg_parts.append(_dim(f"task={task_id}")) + if reason: + msg_parts.append(_dim(f"— {reason[:80]}")) + elif kind == "path_amendment": + proposed_by = detail.get("proposed_by", "") + reason = detail.get("reason", "") + msg_parts.append(f"{proposed_by}: {reason}") + else: + for k, v in list(detail.items())[:3]: + msg_parts.append(f"{k}={v!r}") + + msg = " ".join(msg_parts) + run_prefix = _dim(f"[{run_id}]") + ts_str = _dim(ts) + return f"{run_prefix} {ts_str} {kind_str} {msg}" + + +# --------------------------------------------------------------------------- +# Subcommand: run +# --------------------------------------------------------------------------- + +def cmd_run(args: argparse.Namespace) -> None: + """Start a new pipeline run.""" + config_path = args.config + if not os.path.exists(config_path): + _die(f"Config file not found: {config_path}") + + # Import here to keep startup fast for non-run commands + try: + from core.team_runner import TeamRunner + except ImportError as exc: + _die(f"Could not import core.team_runner: {exc}") + + dry = getattr(args, "dry_run", False) + runner = TeamRunner(config_path=config_path, dry_run=dry) + print(f"Starting run {_bold(runner.run_id)} …") + print(_dim(f" Watch: agency watch {runner.run_id}")) + print(_dim(f" Inspect: agency inspect {runner.run_id}")) + + try: + runner.run() + print(_green(f"Run {runner.run_id} complete.")) + except KeyboardInterrupt: + print(_yellow(f"\nRun {runner.run_id} interrupted.")) + sys.exit(1) + except Exception as exc: + print(_red(f"Run {runner.run_id} failed: {exc}")) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Subcommand: watch +# --------------------------------------------------------------------------- + +def cmd_watch(args: argparse.Namespace) -> None: + """Tail live blackboard events for a run.""" + bb = _require_blackboard(args.run_id) + run_id = args.run_id + poll = getattr(args, "poll", 2.0) + + print(_bold(f"Watching run {run_id} …"), _dim("(Ctrl-C to stop)")) + + seen_ids: set[str] = set() + try: + while True: + events = bb.get_all_events(limit=1000) + for ev in events: + eid = ev.get("event_id", "") + if eid in seen_ids: + continue + seen_ids.add(eid) + print(_render_event(ev, run_id)) + sys.stdout.flush() + + # Check if run is done + summary = bb.get_run_summary() + run_status = summary.get("status", "") + if run_status in ("done", "review", "failed"): + print() + if run_status == "review": + print(_green(f"Run {run_id} complete — status: {run_status}")) + elif run_status == "failed": + print(_red(f"Run {run_id} failed")) + else: + print(_bold(f"Run {run_id} status: {run_status}")) + break + + time.sleep(poll) + except KeyboardInterrupt: + print(_dim("\nStopped watching.")) + finally: + bb.close() + + +# --------------------------------------------------------------------------- +# Subcommand: inspect +# --------------------------------------------------------------------------- + +def cmd_inspect(args: argparse.Namespace) -> None: + """Show a live tree of run state.""" + bb = _require_blackboard(args.run_id) + run_id = args.run_id + tier_filter: Optional[int] = getattr(args, "tier", None) + brief_filter: Optional[str] = getattr(args, "brief", None) + + try: + summary = bb.get_run_summary() + if "error" in summary: + _die(summary["error"]) + + if brief_filter: + _inspect_brief(bb, run_id, brief_filter) + return + + if tier_filter: + _inspect_tier(bb, run_id, tier_filter) + return + + _inspect_run_tree(bb, run_id, summary) + finally: + bb.close() + + +def _inspect_run_tree(bb: "Blackboard", run_id: str, summary: dict) -> None: + status = summary.get("status", "?") + status_str = ( + _green(status) if status in ("done", "review") + else _red(status) if status == "failed" + else _yellow(status) + ) + print(f"\nRun {_bold(run_id)} [{status_str}]") + print(_dim(f" Goal: {summary.get('goal', '')}")) + print() + + workstreams = bb.get_workstreams() + if not workstreams: + print(_dim(" No workstreams yet.")) + else: + for ws in workstreams: + ws_status = ws.get("status", "?") + ws_col = "32" if ws_status == "done" else ("31" if ws_status == "failed" else "33") + ws_line = f" ├── {ws.get('name', ws.get('workstream_id'))} [{_c(ws_col, ws_status)}]" + print(ws_line) + + briefs = bb.get_briefs(workstream_id=ws["workstream_id"]) + for b in briefs: + b_status = b.get("status", "?") + b_col = "32" if b_status == "done" else ("31" if b_status == "failed" else "0") + print( + f" │ ├── T{b.get('tier')} {b.get('role')} " + f"[{_c(b_col, b_status)}] " + f"retries={b.get('retry_count', 0)} " + f"{_dim(b.get('brief_id', '')[:8])}" + ) + + print() + # Summary counts + briefs_summary = summary.get("briefs", {}) + events_summary = summary.get("events", {}) + print( + _dim( + f" Briefs: {briefs_summary} " + f"Events: {events_summary}" + ) + ) + + +def _inspect_tier(bb: "Blackboard", run_id: str, tier: int) -> None: + briefs = bb.get_briefs(tier=tier) + print(f"\nRun {_bold(run_id)} — T{tier} briefs ({len(briefs)})\n") + for b in briefs: + status = b.get("status", "?") + col = "32" if status == "done" else ("31" if status == "failed" else "0") + print( + f" {_dim(b.get('brief_id', '')[:8])} " + f"{b.get('role', ''):<22} [{_c(col, status)}] " + f"retries={b.get('retry_count', 0)}" + ) + + +def _inspect_brief(bb: "Blackboard", run_id: str, brief_id: str) -> None: + briefs = bb.get_briefs() + match = next( + (b for b in briefs if b.get("brief_id", "").startswith(brief_id)), + None, + ) + if not match: + _die(f"Brief {brief_id!r} not found in run {run_id}") + + print(f"\nBrief {_bold(match['brief_id'])}") + print(f" Tier: T{match.get('tier')}") + print(f" Role: {match.get('role')}") + print(f" Status: {match.get('status')}") + print(f" Retries: {match.get('retry_count', 0)}") + print() + + payload_raw = match.get("payload") + if payload_raw: + try: + payload = json.loads(payload_raw) + print(_bold("Payload (brief):")) + print(json.dumps(payload, indent=2)) + except (json.JSONDecodeError, TypeError): + print(payload_raw) + print() + + result_raw = match.get("result") + if result_raw: + try: + result = json.loads(result_raw) + print(_bold("Result:")) + print(json.dumps(result, indent=2)) + except (json.JSONDecodeError, TypeError): + print(result_raw) + + +# --------------------------------------------------------------------------- +# Subcommand: approve +# --------------------------------------------------------------------------- + +def cmd_approve(args: argparse.Namespace) -> None: + """Approve the current inspection gate, writing gate_approved to the blackboard.""" + bb = _require_blackboard(args.run_id) + note = getattr(args, "note", None) or "" + try: + bb.log_event( + "gate_approved", + detail={"approved_by": "cli", "note": note, "timestamp": _now_iso()}, + ) + print(_green(f"Gate approved for run {args.run_id}.")) + if note: + print(_dim(f" Note: {note}")) + finally: + bb.close() + + +# --------------------------------------------------------------------------- +# Subcommand: reject +# --------------------------------------------------------------------------- + +def cmd_reject(args: argparse.Namespace) -> None: + """Reject the current inspection gate, writing gate_rejected to the blackboard.""" + bb = _require_blackboard(args.run_id) + reason = getattr(args, "reason", None) or "rejected via CLI" + try: + bb.log_event( + "gate_rejected", + detail={"rejected_by": "cli", "reason": reason, "timestamp": _now_iso()}, + ) + print(_yellow(f"Gate rejected for run {args.run_id}.")) + print(_dim(f" Reason: {reason}")) + finally: + bb.close() + + +# --------------------------------------------------------------------------- +# Subcommand: pause +# --------------------------------------------------------------------------- + +def cmd_pause(args: argparse.Namespace) -> None: + """Force-pause the run at the next tier boundary.""" + bb = _require_blackboard(args.run_id) + try: + bb.log_event( + "gate_paused", + detail={"paused_by": "cli", "timestamp": _now_iso()}, + ) + print(_yellow(f"Pause signal written for run {args.run_id}.")) + print(_dim(f" Run will pause at the next tier boundary.")) + print(_dim(f" To resume: agency resume {args.run_id}")) + finally: + bb.close() + + +# --------------------------------------------------------------------------- +# Subcommand: resume +# --------------------------------------------------------------------------- + +def cmd_resume(args: argparse.Namespace) -> None: + """Release a manual pause.""" + bb = _require_blackboard(args.run_id) + try: + bb.log_event( + "gate_resumed", + detail={"resumed_by": "cli", "timestamp": _now_iso()}, + ) + print(_green(f"Resume signal written for run {args.run_id}.")) + finally: + bb.close() + + +# --------------------------------------------------------------------------- +# Argument parser +# --------------------------------------------------------------------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="agency", + description="the-agency pipeline CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + agency run config/team.yaml + agency watch abc12345 + agency inspect abc12345 + agency inspect abc12345 --tier 2 + agency inspect abc12345 --brief a1b2c3d4 + agency approve abc12345 + agency approve abc12345 --note "looks good" + agency reject abc12345 --reason "T2 missed the caching layer" + agency pause abc12345 + agency resume abc12345 +""", + ) + sub = parser.add_subparsers(dest="command", metavar="") + sub.required = True + + # run + p_run = sub.add_parser("run", help="Start a new pipeline run") + p_run.add_argument("config", nargs="?", default="config/team.yaml", + help="Path to team.yaml (default: config/team.yaml)") + p_run.add_argument("--dry-run", action="store_true", + help="Log actions without spawning agents") + p_run.set_defaults(func=cmd_run) + + # watch + p_watch = sub.add_parser("watch", help="Tail live blackboard events") + p_watch.add_argument("run_id", help="Run ID to watch") + p_watch.add_argument("--poll", type=float, default=2.0, + help="Poll interval in seconds (default: 2)") + p_watch.set_defaults(func=cmd_watch) + + # inspect + p_inspect = sub.add_parser("inspect", help="Show run state tree") + p_inspect.add_argument("run_id", help="Run ID to inspect") + p_inspect.add_argument("--tier", type=int, default=None, + help="Filter to a specific tier (e.g. --tier 2)") + p_inspect.add_argument("--brief", default=None, + help="Show full brief+result for brief_id prefix") + p_inspect.set_defaults(func=cmd_inspect) + + # approve + p_approve = sub.add_parser("approve", help="Approve current inspection gate") + p_approve.add_argument("run_id", help="Run ID") + p_approve.add_argument("--note", default="", help="Optional note written to blackboard") + p_approve.set_defaults(func=cmd_approve) + + # reject + p_reject = sub.add_parser("reject", help="Reject current inspection gate") + p_reject.add_argument("run_id", help="Run ID") + p_reject.add_argument("--reason", default="rejected via CLI", + help="Reason for rejection (shown in blackboard + logs)") + p_reject.set_defaults(func=cmd_reject) + + # pause + p_pause = sub.add_parser("pause", help="Force-pause at next tier boundary") + p_pause.add_argument("run_id", help="Run ID") + p_pause.set_defaults(func=cmd_pause) + + # resume + p_resume = sub.add_parser("resume", help="Release a manual pause") + p_resume.add_argument("run_id", help="Run ID") + p_resume.set_defaults(func=cmd_resume) + + return parser + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main(argv: Optional[list[str]] = None) -> None: + parser = build_parser() + args = parser.parse_args(argv) + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/core/blackboard.py b/core/blackboard.py index 00c3762..2a21f8c 100644 --- a/core/blackboard.py +++ b/core/blackboard.py @@ -85,18 +85,37 @@ CREATE TABLE IF NOT EXISTS events ( event_id TEXT PRIMARY KEY, run_id TEXT NOT NULL, brief_id TEXT, -- NULL for run-level events - kind TEXT NOT NULL, -- spawned|completed|failed|escalated|retried + kind TEXT NOT NULL, -- see _EVENT_KINDS detail TEXT, -- JSON created_at TEXT NOT NULL, FOREIGN KEY (run_id) REFERENCES runs(run_id) ); + +CREATE TABLE IF NOT EXISTS t3_task_lists ( + entry_id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + workstream_id TEXT NOT NULL, + t3_agent_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'draft', -- draft|committed + tasks TEXT NOT NULL DEFAULT '[]', -- JSON array of T4 task descriptors + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(run_id) +); """ # Valid status values per table — used for input validation. _RUN_STATUSES = {"pending", "active", "review", "done", "failed"} _WS_STATUSES = {"pending", "active", "blocked", "done", "failed"} _BRIEF_STATUSES = {"pending", "active", "done", "failed"} -_EVENT_KINDS = {"spawned", "completed", "failed", "escalated", "retried"} +_EVENT_KINDS = { + # Lifecycle + "spawned", "completed", "failed", "escalated", "retried", + # Visibility / gates + "gate_pending", "gate_approved", "gate_rejected", "gate_paused", "gate_resumed", + # Amendments / informational + "path_amendment", "log", +} # --------------------------------------------------------------------------- @@ -360,6 +379,194 @@ class Blackboard: # Cleanup # ------------------------------------------------------------------ + # ------------------------------------------------------------------ + # Event queries + # ------------------------------------------------------------------ + + def get_events( + self, + kinds: Optional[list[str]] = None, + after_iso: Optional[str] = None, + brief_id: Optional[str] = None, + limit: int = 100, + ) -> list[dict[str, Any]]: + """ + Query events for this run. + + Parameters + ---------- + kinds : Filter by event kinds (OR). None = all kinds. + after_iso : Only return events created after this ISO-8601 timestamp. + brief_id : Filter by brief_id. None = all briefs. + limit : Maximum rows to return (most recent first). + """ + conditions = ["run_id = ?"] + params: list[Any] = [self.run_id] + + if kinds: + placeholders = ",".join("?" * len(kinds)) + conditions.append(f"kind IN ({placeholders})") + params.extend(kinds) + + if after_iso: + conditions.append("created_at > ?") + params.append(after_iso) + + if brief_id: + conditions.append("brief_id = ?") + params.append(brief_id) + + where = " AND ".join(conditions) + rows = self._execute( + f"SELECT * FROM events WHERE {where} ORDER BY created_at DESC LIMIT ?", + (*params, limit), + ).fetchall() + return [dict(r) for r in rows] + + def get_latest_gate_event( + self, gate_name: str, after_iso: Optional[str] = None + ) -> Optional[dict[str, Any]]: + """ + Return the most recent gate_approved or gate_rejected event for + *gate_name* written after *after_iso*. + + The event detail JSON is expected to contain a ``"gate"`` field + matching *gate_name*. Falls back to returning any gate resolution + event if none carry an explicit gate field (for CLI-written events + that omit it). + """ + events = self.get_events( + kinds=["gate_approved", "gate_rejected"], + after_iso=after_iso, + limit=20, + ) + # Prefer events whose detail.gate matches + for ev in events: + try: + detail = json.loads(ev.get("detail") or "{}") + if detail.get("gate") == gate_name or not detail.get("gate"): + return ev + except (json.JSONDecodeError, TypeError): + return ev + return None + + def get_all_events(self, limit: int = 500) -> list[dict[str, Any]]: + """Return all events for this run, oldest first.""" + rows = self._execute( + "SELECT * FROM events WHERE run_id=? ORDER BY created_at ASC LIMIT ?", + (self.run_id, limit), + ).fetchall() + return [dict(r) for r in rows] + + # ------------------------------------------------------------------ + # T3 task lists + # ------------------------------------------------------------------ + + def create_t3_draft( + self, + *, + workstream_id: str, + t3_agent_id: str, + ) -> str: + """Insert a draft t3_task_list entry. Returns entry_id.""" + entry_id = _new_uuid() + now = _now_iso() + self._execute( + "INSERT OR IGNORE INTO t3_task_lists " + "(entry_id, run_id, workstream_id, t3_agent_id, status, tasks, created_at, updated_at) " + "VALUES (?, ?, ?, ?, 'draft', '[]', ?, ?)", + (entry_id, self.run_id, workstream_id, t3_agent_id, now, now), + commit=True, + ) + return entry_id + + def commit_t3_task_list( + self, + *, + workstream_id: str, + t3_agent_id: str, + tasks: list[Any], + ) -> None: + """Update a t3_task_list entry to committed with the final task list.""" + now = _now_iso() + tasks_json = json.dumps(tasks) + self._execute( + "UPDATE t3_task_lists SET status='committed', tasks=?, updated_at=? " + "WHERE run_id=? AND workstream_id=? AND t3_agent_id=?", + (tasks_json, now, self.run_id, workstream_id, t3_agent_id), + commit=True, + ) + + def get_t3_task_lists(self, workstream_id: str) -> list[dict[str, Any]]: + """Return all t3_task_list entries for a workstream.""" + rows = self._execute( + "SELECT * FROM t3_task_lists WHERE run_id=? AND workstream_id=? ORDER BY created_at ASC", + (self.run_id, workstream_id), + ).fetchall() + result = [] + for r in rows: + d = dict(r) + try: + d["tasks"] = json.loads(d.get("tasks") or "[]") + except (json.JSONDecodeError, TypeError): + d["tasks"] = [] + result.append(d) + return result + + def all_t3_committed(self, workstream_id: str) -> bool: + """Return True if all t3_task_list entries for the workstream are committed.""" + rows = self._execute( + "SELECT status FROM t3_task_lists WHERE run_id=? AND workstream_id=?", + (self.run_id, workstream_id), + ).fetchall() + if not rows: + return False + return all(r["status"] == "committed" for r in rows) + + # ------------------------------------------------------------------ + # Briefs query + # ------------------------------------------------------------------ + + def get_briefs( + self, + *, + status: Optional[str] = None, + tier: Optional[int] = None, + workstream_id: Optional[str] = None, + ) -> list[dict[str, Any]]: + """Query briefs with optional filters.""" + conditions = ["run_id = ?"] + params: list[Any] = [self.run_id] + + if status: + conditions.append("status = ?") + params.append(status) + if tier is not None: + conditions.append("tier = ?") + params.append(tier) + if workstream_id: + conditions.append("workstream_id = ?") + params.append(workstream_id) + + where = " AND ".join(conditions) + rows = self._execute( + f"SELECT * FROM briefs WHERE {where} ORDER BY created_at ASC", + tuple(params), + ).fetchall() + return [dict(r) for r in rows] + + def get_workstreams(self) -> list[dict[str, Any]]: + """Return all workstreams for this run.""" + rows = self._execute( + "SELECT * FROM workstreams WHERE run_id=? ORDER BY created_at ASC", + (self.run_id,), + ).fetchall() + return [dict(r) for r in rows] + + # ------------------------------------------------------------------ + # Cleanup + # ------------------------------------------------------------------ + def close(self) -> None: """Close the database connection gracefully.""" with self._lock: diff --git a/core/team_runner.py b/core/team_runner.py index d0c260e..4caf043 100644 --- a/core/team_runner.py +++ b/core/team_runner.py @@ -1,99 +1,1625 @@ """ core/team_runner.py -Top-level orchestration entry point — Phase 2 stub. +Full run lifecycle orchestrator for the-agency tiered agent pipeline. -The TeamRunner is responsible for: - 1. Loading config/team.yaml and config/role_registry.yaml. - 2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime). - 3. Creating a Blackboard for the run. - 4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary. - 5. Recursively spawning T2→T5 briefs based on tier outputs. - 6. Using EscalationHandler to manage retries, salvage, and escalation. - 7. Writing final run status and summary to the Blackboard. - -TODO (Phase 2): - - Load and validate team.yaml configuration. - - Build adapter registry (map adapter keys → concrete adapter classes). - - Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5. - - Parse tier JSON outputs into child TaskBrief objects via make_child_brief(). - - Integrate EscalationHandler into the dispatch loop. - - Support --dry-run flag (log actions without executing). - - Emit blackboard events at each stage (spawned, completed, failed, etc.). - - Expose a CLI entry point (argparse or click). +Responsibilities +---------------- +1. Load config/team.yaml and config/role_registry.yaml. +2. Instantiate adapters (LLM, VCS, notify, runtime) from config. +3. Create a Blackboard for the run. +4. Build the root T1 brief and kick off the spawn loop. +5. Spawn loop: detect pending briefs, enforce gates, call runtime_adapter.spawn(). +6. Gate logic: write gate_pending → notify → poll for gate_approved/gate_rejected. +7. Path amendment monitor: detect path_amendment events, relay to higher tier. +8. T3 mesh timeout → T2 escalation. +9. T1 failure + terminal human escalation (runner-owned; tier failures owned by escalation.py). +10. Write final run status and create PR. """ from __future__ import annotations -# TODO (Phase 2): Uncomment and implement imports as adapters are built. -# import argparse -# import yaml -# from core.task_brief import TaskBrief -# from core.blackboard import Blackboard -# from core.escalation import EscalationHandler -# from adapters.llm.anthropic import AnthropicAdapter -# from adapters.vcs.github import GitHubAdapter -# from adapters.notify.openclaw import OpenClawNotifyAdapter -# from adapters.runtime.openclaw import OpenClawRuntimeAdapter -# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter +import asyncio +import importlib +import json +import logging +import os +import time +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional +import yaml + +from core.blackboard import Blackboard +from core.escalation import EscalationHandler +from core.task_brief import TaskBrief + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Adapter registry — maps config key → dotted class path +# --------------------------------------------------------------------------- + +_LLM_ADAPTERS: dict[str, str] = { + "anthropic": "adapters.llm.anthropic.AnthropicAdapter", +} +_VCS_ADAPTERS: dict[str, str] = { + "github": "adapters.vcs.github.GitHubAdapter", +} +_NOTIFY_ADAPTERS: dict[str, str] = { + "openclaw": "adapters.notify.openclaw.OpenClawNotifyAdapter", +} +_RUNTIME_ADAPTERS: dict[str, str] = { + "openclaw": "adapters.runtime.openclaw.OpenClawRuntimeAdapter", + "claude_code": "adapters.runtime.claude_code.ClaudeCodeRuntimeAdapter", +} + +# Poll intervals (seconds) +_GATE_POLL_INTERVAL_S = 5 +_PATH_AMENDMENT_POLL_INTERVAL_S = 10 + +# Tier → capability level +_TIER_CAPABILITY: dict[int, str] = { + 1: "reasoning-heavy", + 2: "reasoning-heavy", + 3: "capable", + 4: "fast-cheap", + 5: "capable", +} + +# Tier → default agent timeout (seconds) +_TIER_TIMEOUT_S: dict[int, int] = { + 1: 300, + 2: 600, + 3: 300, + 4: 600, + 5: 300, +} + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class GateRejectedError(Exception): + """Raised when a human rejects an inspection gate.""" + + def __init__(self, gate: str, reason: str = "") -> None: + self.gate = gate + self.reason = reason + super().__init__(f"Gate {gate!r} rejected: {reason}") + + +class GateTimeoutError(Exception): + """Raised when an inspection gate auto-rejects due to timeout.""" + + def __init__(self, gate: str, timeout_minutes: int) -> None: + self.gate = gate + self.timeout_minutes = timeout_minutes + super().__init__(f"Gate {gate!r} timed out after {timeout_minutes}m") + + +class TerminalEscalationError(Exception): + """Raised when the run hits an unrecoverable failure that needs human intervention.""" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _load_adapter(registry: dict[str, str], key: str, config: dict) -> Any: + """Import and instantiate an adapter by registry key.""" + if key not in registry: + raise ValueError( + f"Unknown adapter key {key!r}. Known: {sorted(registry.keys())}" + ) + module_path, class_name = registry[key].rsplit(".", 1) + module = importlib.import_module(module_path) + cls = getattr(module, class_name) + return cls(config) + + +def _strip_json_fences(text: str) -> str: + """Remove ``` / ```json fences from an LLM response.""" + text = text.strip() + if text.startswith("```"): + lines = text.splitlines() + # Drop first line (``` or ```json) and last line if it's ``` + start = 1 + end = len(lines) - 1 if lines[-1].strip() == "```" else len(lines) + text = "\n".join(lines[start:end]) + return text.strip() + + +def _extract_json(raw: str) -> dict: + """ + Try to parse a JSON dict from raw agent output. + Handles markdown fences and leading/trailing prose. + """ + cleaned = _strip_json_fences(raw) + # Direct parse + try: + result = json.loads(cleaned) + if isinstance(result, dict): + return result + except json.JSONDecodeError: + pass + # Find first { … } block + start = cleaned.find("{") + end = cleaned.rfind("}") + 1 + if start >= 0 and end > start: + try: + result = json.loads(cleaned[start:end]) + if isinstance(result, dict): + return result + except json.JSONDecodeError: + pass + raise ValueError(f"No JSON object found in output: {raw[:300]!r}") + + +# --------------------------------------------------------------------------- +# TeamRunner +# --------------------------------------------------------------------------- class TeamRunner: """ Orchestrates a full T1→T5 agent pipeline run. - Usage (Phase 2):: + Usage:: runner = TeamRunner(config_path="config/team.yaml") runner.run() """ - def __init__(self, config_path: str = "config/team.yaml") -> None: - # TODO (Phase 2): Load YAML config. - # Instantiate adapters based on config.adapters keys. - # Create a Blackboard for this run. - raise NotImplementedError("TeamRunner.__init__ is not yet implemented.") + def __init__( + self, + config_path: str = "config/team.yaml", + role_registry_path: str = "config/role_registry.yaml", + dry_run: bool = False, + run_id: Optional[str] = None, + ) -> None: + self.dry_run = dry_run + + # ------------------------------------------------------------------ + # Load configs + # ------------------------------------------------------------------ + with open(config_path) as fh: + self.config: dict = yaml.safe_load(fh) + with open(role_registry_path) as fh: + self.role_registry: dict = yaml.safe_load(fh) + + # ------------------------------------------------------------------ + # Run identity + # ------------------------------------------------------------------ + self.run_id: str = run_id or str(uuid.uuid4())[:8] + + # ------------------------------------------------------------------ + # Instantiate adapters + # ------------------------------------------------------------------ + adapter_cfg = self.config.get("adapters", {}) + + self.llm = _load_adapter( + _LLM_ADAPTERS, adapter_cfg.get("llm", "anthropic"), self.config + ) + self.vcs = _load_adapter( + _VCS_ADAPTERS, adapter_cfg.get("vcs", "github"), self.config + ) + self.notify = _load_adapter( + _NOTIFY_ADAPTERS, adapter_cfg.get("notify", "openclaw"), self.config + ) + + runtime_cfg = self.config.get("runtime", {}) + default_runtime_key = runtime_cfg.get("default", "openclaw") + self._default_runtime = _load_adapter( + _RUNTIME_ADAPTERS, default_runtime_key, self.config + ) + + coding_agent_key = runtime_cfg.get("coding_agent", "claude_code") + try: + self._coding_runtime = _load_adapter( + _RUNTIME_ADAPTERS, coding_agent_key, self.config + ) + except Exception as exc: + logger.warning( + "Coding agent runtime (%s) unavailable — falling back to default: %s", + coding_agent_key, + exc, + ) + self._coding_runtime = self._default_runtime + + self._tier_runtime_map: dict[str, str] = runtime_cfg.get("tier_runtime_map", {}) + + # ------------------------------------------------------------------ + # Visibility / gate config + # ------------------------------------------------------------------ + vis = self.config.get("visibility", {}) + strict = vis.get("strict_mode", False) + raw_gates = vis.get("inspection_gates", {}) + + if strict: + self._gates: dict[str, bool] = { + g: True + for g in ("t1_plan", "t2_lead", "t2_synthesis", "t3_plan", "t5_verdict") + } + else: + self._gates = { + "t1_plan": True, # always required by design + "t2_lead": raw_gates.get("t2_lead", False), + "t2_synthesis": raw_gates.get("t2_synthesis", True), + "t3_plan": raw_gates.get("t3_plan", False), + "t5_verdict": raw_gates.get("t5_verdict", False), + } + + self._gate_timeout_minutes: int = vis.get("gate_timeout_minutes", 60) + self._t3_mesh_timeout_minutes: int = self.config.get( + "t3_mesh_timeout_minutes", 10 + ) + + # ------------------------------------------------------------------ + # Retry defaults + # ------------------------------------------------------------------ + retry = self.config.get("retry_defaults", {}) + self._retry_bad_output: int = retry.get("bad_output", 3) + self._retry_partial: int = retry.get("partial", 2) + self._retry_blocked: int = retry.get("blocked", 0) + + # ------------------------------------------------------------------ + # Blackboard + # ------------------------------------------------------------------ + self.bb = Blackboard(self.run_id) + self.bb.create_run(goal=self.config["run"]["goal"]) + + # ------------------------------------------------------------------ + # Escalation handler + # ------------------------------------------------------------------ + self._escalation = EscalationHandler() + + self._log( + f"TeamRunner initialized — run_id={self.run_id!r} " + f"goal={self.config['run']['goal']!r}" + ) + + # ------------------------------------------------------------------ + # Public entry point + # ------------------------------------------------------------------ def run(self) -> None: - """ - Execute the full pipeline from T1 decomposition through T5 verification. + """Synchronous entry point. Runs the async pipeline.""" + asyncio.run(self._run_async()) - TODO (Phase 2): - - Build root T1 brief from config.run.goal. - - Dispatch to T1 Visionary via LLM adapter. - - Parse workstreams from T1 output. - - For each workstream: dispatch T2 Architect. - - For each T2 subtask: dispatch T3 Squad Lead. - - For each T3 task: dispatch T4 Implementer. - - For each T4 artifact set: dispatch T5 Verifier. - - Run escalation handler at each tier on failure. - - Commit passing artifacts via VCS adapter. - - Notify on completion or terminal failure via notify adapter. - """ - raise NotImplementedError("TeamRunner.run is not yet implemented.") + # ------------------------------------------------------------------ + # Main async pipeline + # ------------------------------------------------------------------ - def _dispatch_brief(self, brief) -> dict: - """ - Send a single TaskBrief to the appropriate agent and return the result. + async def _run_async(self) -> None: + goal = self.config["run"]["goal"] + base_branch = self.config["run"].get("base_branch", "main") + branch = f"integration/{self.run_id}" - TODO (Phase 2): - - Select runtime based on brief.preferred_runtime. - - Load agent personality from brief.agent_personality (if set). - - Compose prompt from tier system prompt + brief payload. - - Spawn agent via runtime adapter. - - Await result via runtime.get_result(). - - Log spawned/completed/failed events to Blackboard. + self.bb.update_run_status("active") + self._log(f"Run active — goal={goal!r} branch={branch!r}") + + try: + # Create integration branch + if not self.dry_run: + try: + self.vcs.create_branch(branch) + self._log(f"Branch {branch!r} created") + except Exception as exc: + self._log( + f"Warning: could not create branch {branch!r}: {exc}", + level="warning", + ) + + # ---------------------------------------------------------- + # T1 Plan phase + # ---------------------------------------------------------- + workplan = await self._run_t1_plan(goal) + + # ---------------------------------------------------------- + # T2→T5 execution per workplan + # ---------------------------------------------------------- + ws_results = await self._execute_workplan(workplan) + + # ---------------------------------------------------------- + # T1 Accept phase + # ---------------------------------------------------------- + pr_url = await self._run_t1_accept(workplan, ws_results, branch, base_branch) + + self.bb.update_run_status("review") + self._log(f"Run complete — PR: {pr_url or '(none)'}") + + except (GateRejectedError, GateTimeoutError) as exc: + self._log(f"Run paused at gate: {exc}", level="warning") + self.bb.update_run_status("review") # Halted awaiting human + raise + + except TerminalEscalationError as exc: + self._log(f"Terminal escalation: {exc}", level="error") + self.bb.update_run_status("failed") + self.bb.log_event( + "failed", detail={"error": str(exc), "terminal": True} + ) + self.notify.send( + f"[agency] Run {self.run_id} — TERMINAL FAILURE: {exc}", + context={"run_id": self.run_id, "goal": goal, "error": str(exc)}, + ) + raise + + except Exception as exc: + logger.exception("Unexpected failure in run %s", self.run_id) + self.bb.update_run_status("failed") + self.bb.log_event( + "failed", detail={"error": str(exc), "terminal": True} + ) + self.notify.send( + f"[agency] Run {self.run_id} — FAILED: {exc}", + context={"run_id": self.run_id, "goal": goal, "error": str(exc)}, + ) + raise + + finally: + self.bb.close() + + # ------------------------------------------------------------------ + # T1 Plan phase + # ------------------------------------------------------------------ + + async def _run_t1_plan(self, goal: str) -> dict: """ - raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.") + Run T1 Phase 1 (Plan): + 1. Spawn T1 Visionary with planning prompt. + 2. Parse workplan JSON from output. + 3. Halt at t1_plan gate (always required). + 4. Return workplan dict. + """ + persona = self._resolve_persona("t1", "default") + fallback = self._load_prompt("t1_visionary") + + brief = TaskBrief( + run_id=self.run_id, + tier=1, + role="t1_plan", + goal_anchor=goal, + workstream="t1", + task=( + f"{fallback}\n\n" + f"---\n\n" + f"GOAL: {goal}\n\n" + f"PHASE 1 — PLAN:\n" + f"1. Assess complexity (high | medium | low).\n" + f"2. Identify workstreams and declare tier_path per workstream " + f"(e.g. [\"t2\",\"t3\",\"t4\",\"t5\"] or [\"t3\",\"t4\",\"t5\"] or [\"t4\",\"t5\"]).\n" + f"3. Declare parallelism groups (which workstreams can run concurrently).\n" + f"4. Set retry_budget_multiplier (1 = simple, 2 = complex).\n" + f"5. Self-critique in one pass — amend the plan.\n\n" + f"Return ONLY a JSON object matching this schema exactly " + f"(no markdown fences, no commentary):\n" + f'{{\n' + f' "run_id": "{self.run_id}",\n' + f' "goal_anchor": "",\n' + f' "complexity": "high|medium|low",\n' + f' "retry_budget_multiplier": 1,\n' + f' "workstreams": [\n' + f' {{\n' + f' "id": "ws-",\n' + f' "name": "",\n' + f' "domain": "",\n' + f' "tier_path": ["t2","t3","t4","t5"],\n' + f' "parallel_group": "A",\n' + f' "t2_specialist": "",\n' + f' "notes": ""\n' + f' }}\n' + f' ],\n' + f' "parallelism": {{\n' + f' "groups": {{"A": ["ws-..."]}},\n' + f' "sequence": ["A"]\n' + f' }},\n' + f' "self_critique_summary": ""\n' + f'}}' + ), + acceptance_criteria=["Valid JSON workplan returned with workstreams array"], + retry_budget=self._retry_bad_output, + preferred_runtime="standard", + agent_personality=persona, + ) + brief.validate() + self.bb.create_brief(brief) + self._log("T1 Plan phase starting") + + result = await self._dispatch_with_retry(brief) + + # Parse workplan + workplan = self._parse_json_result(result, brief) + workplan.setdefault("run_id", self.run_id) + workplan.setdefault("goal_anchor", goal) + + self.bb.update_brief_result(brief.brief_id, workplan) + self._log( + f"T1 Plan done — complexity={workplan.get('complexity')!r} " + f"workstreams={len(workplan.get('workstreams', []))}" + ) + + # Gate: t1_plan (always required by design) + await self._check_gate( + gate_name="t1_plan", + brief=brief, + summary=self._format_workplan_summary(workplan), + what_happens_next=( + f"T2 Architects will spawn for " + f"{len(workplan.get('workstreams', []))} workstream(s)" + ), + ) + + return workplan + + # ------------------------------------------------------------------ + # Workplan execution + # ------------------------------------------------------------------ + + async def _execute_workplan(self, workplan: dict) -> dict[str, Any]: + """ + Execute all workstreams in parallelism-group order. + Returns a dict mapping workstream_id → final context. + """ + parallelism = workplan.get("parallelism", {}) + groups: dict[str, list[str]] = parallelism.get("groups", {}) + sequence: list[str] = parallelism.get("sequence", []) + ws_map = {ws["id"]: ws for ws in workplan.get("workstreams", [])} + + # Normalise: if no explicit parallelism, put all in one group + if not sequence: + sequence = ["A"] + groups = {"A": list(ws_map.keys())} + + all_results: dict[str, Any] = {} + + for group_name in sequence: + ws_ids = groups.get(group_name, []) + ws_defs = [ws_map[wid] for wid in ws_ids if wid in ws_map] + self._log( + f"Parallelism group {group_name!r}: " + f"{[w['name'] for w in ws_defs]}" + ) + + group_results = await asyncio.gather( + *[self._run_workstream(ws, workplan) for ws in ws_defs], + return_exceptions=True, + ) + + for ws, res in zip(ws_defs, group_results): + if isinstance(res, Exception): + self._log( + f"Workstream {ws['name']!r} failed: {res}", level="error" + ) + all_results[ws["id"]] = {"error": str(res)} + else: + all_results[ws["id"]] = res + + return all_results + + async def _run_workstream(self, ws_def: dict, workplan: dict) -> dict: + """Execute the full tier_path for one workstream.""" + ws_id = ws_def["id"] + ws_name = ws_def["name"] + tier_path: list[str] = ws_def.get("tier_path", ["t2", "t3", "t4", "t5"]) + retry_mul = int(workplan.get("retry_budget_multiplier", 1)) + + ws_bb_id = self.bb.create_workstream( + workstream_id=ws_id, name=ws_name, tier=int(tier_path[0][1:]) if tier_path else 2 + ) + self.bb.update_workstream_status(ws_bb_id, "active") + self._log(f"Workstream {ws_name!r} starting — tier_path={tier_path}") + + ctx: dict = { + "workstream_id": ws_id, + "workstream_name": ws_name, + "domain": ws_def.get("domain", "default"), + "goal_anchor": workplan["goal_anchor"], + "t1_notes": ws_def.get("notes", ""), + } + + try: + for tier_name in tier_path: + if tier_name == "t2": + ctx = await self._run_t2(ws_def, ctx, workplan, retry_mul) + elif tier_name == "t3": + ctx = await self._run_t3(ws_def, ctx, workplan, retry_mul) + elif tier_name == "t4": + ctx = await self._run_t4(ws_def, ctx, workplan, retry_mul) + elif tier_name == "t5": + ctx = await self._run_t5(ws_def, ctx, workplan, retry_mul) + else: + self._log( + f"Unknown tier {tier_name!r} in path for {ws_name!r} — skipping", + level="warning", + ) + + self.bb.update_workstream_status(ws_bb_id, "done") + self._log(f"Workstream {ws_name!r} complete") + return ctx + + except Exception: + self.bb.update_workstream_status(ws_bb_id, "failed") + raise + + # ------------------------------------------------------------------ + # T2 — Architect + # ------------------------------------------------------------------ + + async def _run_t2( + self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int + ) -> dict: + """T2 Lead Architect → T2 Specialist → Lead Synthesis.""" + ws_name = ws_def["name"] + domain = ws_def.get("domain", "default") + goal = workplan["goal_anchor"] + budget = int(self._retry_bad_output * retry_mul) + + # --- Lead Architect --- + lead_persona = self._resolve_persona("t2", "default") + lead_brief = TaskBrief( + run_id=self.run_id, + tier=2, + role="t2_lead", + goal_anchor=goal, + workstream=ws_def["id"], + task=( + f"{self._load_prompt('t2_architect')}\n\n---\n\n" + f"T2 LEAD ARCHITECT — {ws_name}\n" + f"Goal: {goal}\nDomain: {domain}\n" + f"T1 notes: {ws_def.get('notes', '')}\n\n" + f"1. Define explicit domain boundaries (owns / excludes).\n" + f"2. Publish shared assumptions (auth, data formats, API patterns).\n" + f"3. Outline overall architectural approach.\n\n" + f"Return JSON:\n" + f'{{\n' + f' "domain_boundaries": {{"description":"","owns":[],"explicitly_excludes":[]}},\n' + f' "shared_assumptions": {{"auth":"","data_formats":"","api_patterns":"","key_decisions":[]}},\n' + f' "architecture_outline": ""\n' + f'}}' + ), + acceptance_criteria=["domain_boundaries and shared_assumptions present"], + context={"workstream_context": self._safe_ctx(ctx)}, + retry_budget=budget, + preferred_runtime="standard", + agent_personality=lead_persona, + ) + lead_brief.validate() + self.bb.create_brief(lead_brief, workstream_id=ws_def["id"]) + self._log(f"[{ws_name}] T2 Lead spawning") + + lead_result = await self._dispatch_with_retry(lead_brief) + lead_json = self._try_parse_json(lead_result) + self.bb.update_brief_result(lead_brief.brief_id, lead_json) + + ctx["t2_lead_output"] = lead_json + ctx["domain_boundaries"] = lead_json.get("domain_boundaries", {}) + ctx["shared_assumptions"] = lead_json.get("shared_assumptions", {}) + + # Gate: t2_lead + if self._gates.get("t2_lead"): + await self._check_gate( + gate_name="t2_lead", + brief=lead_brief, + summary=f"T2 Lead published domain boundaries for {ws_name}", + what_happens_next=f"T2 {domain} specialist will spawn", + ) + + # --- Specialist --- + spec_persona = self._resolve_persona("t2", domain) + spec_brief = TaskBrief( + run_id=self.run_id, + tier=2, + role="t2_specialist", + goal_anchor=goal, + workstream=ws_def["id"], + task=( + f"T2 SPECIALIST ARCHITECT — {ws_name} ({domain})\n" + f"Goal: {goal}\n\n" + f"Domain boundaries:\n{json.dumps(ctx['domain_boundaries'], indent=2)}\n\n" + f"Shared assumptions:\n{json.dumps(ctx['shared_assumptions'], indent=2)}\n\n" + f"Design the {domain} domain architecture:\n" + f"- Component breakdown\n- Interface contracts\n" + f"- Data models\n- Key implementation decisions\n\n" + f"Return JSON:\n" + f'{{\n' + f' "architecture": "",\n' + f' "components": [{{"name":"","responsibility":"","interfaces":[]}}],\n' + f' "interface_contracts": {{}},\n' + f' "t3_brief_context": ""\n' + f'}}' + ), + acceptance_criteria=["architecture and components present"], + context={"workstream_context": self._safe_ctx(ctx)}, + retry_budget=budget, + preferred_runtime="standard", + agent_personality=spec_persona, + ) + spec_brief.validate() + self.bb.create_brief(spec_brief, workstream_id=ws_def["id"]) + self._log(f"[{ws_name}] T2 Specialist ({domain}) spawning") + + spec_result = await self._dispatch_with_retry(spec_brief) + spec_json = self._try_parse_json(spec_result) + self.bb.update_brief_result(spec_brief.brief_id, spec_json) + ctx["t2_specialist_output"] = spec_json + + # --- Lead Synthesis --- + synth_brief = TaskBrief( + run_id=self.run_id, + tier=2, + role="t2_synthesis", + goal_anchor=goal, + workstream=ws_def["id"], + task=( + f"T2 LEAD — SYNTHESIS for {ws_name}\n\n" + f"Specialist output (truncated):\n" + f"{json.dumps(spec_json, indent=2)[:3000]}\n\n" + f"Synthesise into canonical architecture.\n\n" + f"Return JSON:\n" + f'{{\n' + f' "canonical_architecture": "",\n' + f' "t3_task_context": "",\n' + f' "interface_contracts": {{}},\n' + f' "implementation_notes": []\n' + f'}}' + ), + acceptance_criteria=["canonical_architecture present"], + context={"workstream_context": self._safe_ctx(ctx)}, + retry_budget=budget, + preferred_runtime="standard", + agent_personality=lead_persona, + ) + synth_brief.validate() + self.bb.create_brief(synth_brief, workstream_id=ws_def["id"]) + self._log(f"[{ws_name}] T2 Lead synthesis spawning") + + synth_result = await self._dispatch_with_retry(synth_brief) + synth_json = self._try_parse_json(synth_result) + self.bb.update_brief_result(synth_brief.brief_id, synth_json) + ctx["canonical_architecture"] = synth_json + + self.bb.log_event( + "completed", + detail={"tier": 2, "workstream": ws_name, "phase": "synthesis"}, + ) + self._log(f"[{ws_name}] T2 synthesis done") + + # Gate: t2_synthesis + if self._gates.get("t2_synthesis"): + await self._check_gate( + gate_name="t2_synthesis", + brief=synth_brief, + summary=f"T2 canonical architecture ready for {ws_name}", + what_happens_next="T3 Squad Lead(s) will spawn and negotiate task breakdown", + ) + + return ctx + + # ------------------------------------------------------------------ + # T3 — Squad Lead (with mesh coordination) + # ------------------------------------------------------------------ + + async def _run_t3( + self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int + ) -> dict: + """T3 Squad Lead — mesh draft/commit cycle, then returns task_list.""" + ws_id = ws_def["id"] + ws_name = ws_def["name"] + domain = ws_def.get("domain", "default") + goal = workplan["goal_anchor"] + budget = int(self._retry_bad_output * retry_mul) + + architecture = ctx.get("canonical_architecture") or ctx.get( + "t2_specialist_output", {} + ) + persona = self._resolve_persona("t3", domain) + + brief = TaskBrief( + run_id=self.run_id, + tier=3, + role="t3_squad_lead", + goal_anchor=goal, + workstream=ws_id, + task=( + f"{self._load_prompt('t3_squad_lead')}\n\n---\n\n" + f"T3 SQUAD LEAD — {ws_name}\n" + f"Goal: {goal}\n\n" + f"Architecture (truncated):\n" + f"{json.dumps(architecture, indent=2)[:3000]}\n\n" + f"Break the work into atomic T4 tasks. Declare dependencies.\n" + f"Pipeline tasks (sequential) set deps; swarm tasks (parallel) set deps=[].\n\n" + f"Return JSON:\n" + f'{{\n' + f' "task_list": [\n' + f' {{\n' + f' "id": "t4-",\n' + f' "task": "implement ",\n' + f' "acceptance_criteria": [],\n' + f' "deps": [],\n' + f' "t5_type": "code|integration|api",\n' + f' "preferred_runtime": "coding_agent"\n' + f' }}\n' + f' ],\n' + f' "coordination_notes": ""\n' + f'}}' + ), + acceptance_criteria=["task_list with at least one task"], + context={"workstream_context": self._safe_ctx(ctx)}, + retry_budget=budget, + preferred_runtime="standard", + agent_personality=persona, + ) + brief.validate() + self.bb.create_brief(brief, workstream_id=ws_id) + self._log(f"[{ws_name}] T3 Squad Lead spawning") + + # Register draft in t3_task_lists + self.bb.create_t3_draft(workstream_id=ws_id, t3_agent_id=brief.brief_id) + + t3_result = await self._dispatch_with_retry(brief) + t3_json = self._try_parse_json(t3_result) + task_list: list[dict] = t3_json.get("task_list", []) + + if not task_list: + self._log(f"[{ws_name}] T3 returned empty task_list — using fallback task", level="warning") + task_list = [ + { + "id": "t4-default", + "task": f"Implement {ws_name}: {goal}", + "acceptance_criteria": ["Implementation complete"], + "deps": [], + "t5_type": "code", + "preferred_runtime": "coding_agent", + } + ] + + # Commit task list (mesh — single T3 per workstream for now) + self.bb.commit_t3_task_list( + workstream_id=ws_id, + t3_agent_id=brief.brief_id, + tasks=task_list, + ) + self.bb.update_brief_result(brief.brief_id, t3_json) + + # T3 mesh timeout check (verify committed within deadline) + await self._await_t3_mesh_commit(ws_id, ws_name, ws_def, ctx, workplan, retry_mul) + + ctx["t3_output"] = t3_json + ctx["task_list"] = task_list + + self.bb.log_event("completed", detail={"tier": 3, "workstream": ws_name}) + self._log(f"[{ws_name}] T3 done — {len(task_list)} task(s) declared") + + # Gate: t3_plan + if self._gates.get("t3_plan"): + await self._check_gate( + gate_name="t3_plan", + brief=brief, + summary=f"T3 task breakdown for {ws_name}: {len(task_list)} task(s)", + what_happens_next=f"T4 workers will spawn for {len(task_list)} task(s)", + ) + + return ctx + + async def _await_t3_mesh_commit( + self, + ws_id: str, + ws_name: str, + ws_def: dict, + ctx: dict, + workplan: dict, + retry_mul: int, + ) -> None: + """ + Wait until all T3s in the domain have committed their task lists. + If timeout expires, escalate to T2 (domain boundary problem). + """ + deadline = time.monotonic() + self._t3_mesh_timeout_minutes * 60 + poll = 3.0 + + while time.monotonic() < deadline: + if self.bb.all_t3_committed(ws_id): + return + await asyncio.sleep(poll) + + # Timeout — escalate to T2 + self._log( + f"[{ws_name}] T3 mesh timeout after {self._t3_mesh_timeout_minutes}m — " + f"escalating to T2", + level="warning", + ) + self.bb.log_event( + "escalated", + detail={ + "reason": "t3_mesh_timeout", + "workstream": ws_name, + "timeout_minutes": self._t3_mesh_timeout_minutes, + "t3_task_lists": self.bb.get_t3_task_lists(ws_id), + }, + ) + self.notify.send( + f"[agency] Run {self.run_id}: T3 mesh timeout for workstream {ws_name!r}. " + f"T3 squad leads failed to commit task lists within " + f"{self._t3_mesh_timeout_minutes}m. Re-running T2 to re-scope.", + context={"run_id": self.run_id, "workstream": ws_name}, + ) + # Re-run T2 to re-scope (consume retry budget) + domain = ws_def.get("domain", "default") + budget = int(self._retry_bad_output * retry_mul) + if budget > 0: + ctx = await self._run_t2(ws_def, ctx, workplan, max(1, retry_mul - 1)) + else: + raise TerminalEscalationError( + f"T3 mesh timeout for {ws_name!r} and T2 retry budget exhausted" + ) + + # ------------------------------------------------------------------ + # T4 — Implementers (swarm + pipeline) + # ------------------------------------------------------------------ + + async def _run_t4( + self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int + ) -> dict: + """Dispatch T4 tasks respecting dep ordering (pipeline) and parallelism (swarm).""" + ws_name = ws_def["name"] + task_list: list[dict] = ctx.get("task_list", []) + + if not task_list: + self._log(f"[{ws_name}] No T4 tasks — skipping", level="warning") + return ctx + + t4_results: dict[str, Any] = {} + completed_ids: set[str] = set() + pending = list(task_list) + max_rounds = len(pending) * 2 + 5 + + for _ in range(max_rounds): + if not pending: + break + + ready = [ + t for t in pending + if all(dep in completed_ids for dep in t.get("deps", [])) + ] + if not ready: + self._log( + f"[{ws_name}] T4 dependency deadlock — " + f"remaining: {[t['id'] for t in pending]}", + level="warning", + ) + break + + self._log(f"[{ws_name}] T4 spawning {len(ready)} task(s) in parallel") + results = await asyncio.gather( + *[ + self._run_t4_task(task, ws_def, ctx, workplan, retry_mul) + for task in ready + ], + return_exceptions=True, + ) + + for task, res in zip(ready, results): + tid = task["id"] + if isinstance(res, Exception): + self._log( + f"[{ws_name}] T4 task {tid!r} failed: {res}", level="error" + ) + t4_results[tid] = {"status": "failed", "error": str(res)} + else: + t4_results[tid] = res + completed_ids.add(tid) + + pending = [t for t in pending if t["id"] not in completed_ids] + + ctx["t4_results"] = t4_results + self._log(f"[{ws_name}] T4 complete — {len(t4_results)} task(s)") + return ctx + + async def _run_t4_task( + self, + task: dict, + ws_def: dict, + ctx: dict, + workplan: dict, + retry_mul: int, + ) -> dict: + domain = ws_def.get("domain", "default") + persona = self._resolve_persona("t4", domain) + budget = int(self._retry_bad_output * retry_mul) + + brief = TaskBrief( + run_id=self.run_id, + tier=4, + role="t4_implementer", + goal_anchor=workplan["goal_anchor"], + workstream=ws_def["id"], + task=( + f"{self._load_prompt('t4_implementer')}\n\n---\n\n" + f"{task.get('task', 'Implement task')}" + ), + acceptance_criteria=task.get("acceptance_criteria", []), + context={ + "task_id": task.get("id"), + "canonical_architecture": self._safe_ctx( + ctx.get("canonical_architecture", {}) + ), + }, + retry_budget=budget, + preferred_runtime=task.get("preferred_runtime", "coding_agent"), + agent_personality=persona, + ) + brief.validate() + self.bb.create_brief(brief, workstream_id=ws_def["id"]) + + result = await self._dispatch_with_retry(brief) + self.bb.update_brief_result(brief.brief_id, result) + self.bb.log_event( + "completed", + brief_id=brief.brief_id, + detail={"tier": 4, "task_id": task.get("id"), "workstream": ws_def["name"]}, + ) + return result + + # ------------------------------------------------------------------ + # T5 — Verifiers (fan-out + consensus) + # ------------------------------------------------------------------ + + async def _run_t5( + self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int + ) -> dict: + """Fan-out T5 verifiers, compute joint verdict.""" + ws_name = ws_def["name"] + t4_results: dict[str, Any] = ctx.get("t4_results", {}) + + if not t4_results: + self._log(f"[{ws_name}] No T4 results to verify — skipping T5", level="warning") + return ctx + + task_list: list[dict] = ctx.get("task_list", []) + task_map = {t["id"]: t for t in task_list} + budget = int(self._retry_bad_output * retry_mul) + + self._log(f"[{ws_name}] T5 spawning {len(t4_results)} verifier(s)") + + briefs: list[TaskBrief] = [] + for task_id, t4_result in t4_results.items(): + task_def = task_map.get(task_id, {}) + t5_type = task_def.get("t5_type", "code") + persona = self._resolve_persona("t5", t5_type) + + b = TaskBrief( + run_id=self.run_id, + tier=5, + role=f"t5_{t5_type}", + goal_anchor=workplan["goal_anchor"], + workstream=ws_def["id"], + task=( + f"{self._load_prompt('t5_verifier')}\n\n---\n\n" + f"T5 VERIFIER — verify: {task_def.get('task', task_id)}\n\n" + f"T4 output (truncated):\n{json.dumps(t4_result, indent=2)[:2000]}\n\n" + f"Acceptance criteria: {task_def.get('acceptance_criteria', [])}\n" + f"Goal: {workplan['goal_anchor']}\n\n" + f"Return JSON:\n" + f'{{\n' + f' "verifier_id": "{task_id}",\n' + f' "scope": "{task_id}",\n' + f' "verdict": "pass|fail",\n' + f' "issues": [],\n' + f' "notes": ""\n' + f'}}' + ), + acceptance_criteria=["verdict is pass or fail"], + context={"task_id": task_id, "t4_result": self._safe_ctx(t4_result)}, + retry_budget=budget, + preferred_runtime=task_def.get("preferred_runtime", "coding_agent"), + agent_personality=persona, + ) + b.validate() + self.bb.create_brief(b, workstream_id=ws_def["id"]) + briefs.append(b) + + t5_results = await asyncio.gather( + *[self._dispatch_with_retry(b) for b in briefs], + return_exceptions=True, + ) + + parsed_results: list[dict] = [] + for b, res in zip(briefs, t5_results): + if isinstance(res, Exception): + parsed_results.append( + {"verifier_id": b.brief_id, "verdict": "fail", "issues": [str(res)]} + ) + else: + self.bb.update_brief_result(b.brief_id, res) + parsed_results.append(self._try_parse_json(res)) + + joint = self._compute_joint_verdict(parsed_results) + ctx["t5_results"] = parsed_results + ctx["joint_verdict"] = joint + + self.bb.log_event( + "completed", + detail={ + "tier": 5, + "workstream": ws_name, + "joint_verdict": joint.get("joint_verdict"), + }, + ) + self._log( + f"[{ws_name}] T5 joint verdict: {joint.get('joint_verdict')} " + f"({joint.get('summary')})" + ) + + # Gate: t5_verdict + if self._gates.get("t5_verdict") and briefs: + await self._check_gate( + gate_name="t5_verdict", + brief=briefs[0], + summary=f"T5 verdict for {ws_name}: {joint.get('joint_verdict')} — {joint.get('summary')}", + what_happens_next="Workstream will be marked done if verdict is 'pass'", + ) + + verdict = joint.get("joint_verdict", "fail") + if verdict == "partial": + failed = joint.get("failed_scopes", []) + self._log(f"[{ws_name}] T5 partial — failed scopes: {failed}", level="warning") + self.bb.log_event( + "retried", + detail={"workstream": ws_name, "failed_scopes": failed}, + ) + elif verdict == "fail": + self._log(f"[{ws_name}] T5 all fail — escalating", level="warning") + self.bb.log_event( + "escalated", + detail={"workstream": ws_name, "verdict": verdict}, + ) + + return ctx + + def _compute_joint_verdict(self, t5_results: list[dict]) -> dict: + verdicts: list[str] = [] + failed_scopes: list[str] = [] + + for r in t5_results: + v = r.get("verdict", "fail").lower() + scope = r.get("scope", r.get("verifier_id", "unknown")) + if v in ("pass", "done", "completed"): + verdicts.append("pass") + else: + verdicts.append("fail") + failed_scopes.append(scope) + + if not verdicts: + joint = "fail" + elif all(v == "pass" for v in verdicts): + joint = "pass" + elif any(v == "pass" for v in verdicts): + joint = "partial" + else: + joint = "fail" + + return { + "t5_results": t5_results, + "joint_verdict": joint, + "failed_scopes": failed_scopes, + "summary": f"{verdicts.count('pass')}/{len(verdicts)} verifiers passed", + } + + # ------------------------------------------------------------------ + # T1 Accept phase + # ------------------------------------------------------------------ + + async def _run_t1_accept( + self, + workplan: dict, + ws_results: dict, + branch: str, + base_branch: str, + ) -> str: + """T1 Phase 2 — validate output, create PR, notify.""" + goal = workplan["goal_anchor"] + persona = self._resolve_persona("t1", "default") + + brief = TaskBrief( + run_id=self.run_id, + tier=1, + role="t1_accept", + goal_anchor=goal, + workstream="t1", + task=( + f"PHASE 2 — ACCEPT\n\n" + f"Original goal: {goal}\n\n" + f"All workstreams have completed. Validate that the implementation " + f"aligns with the original goal anchor and that acceptance criteria " + f"are satisfied.\n\n" + f"Return JSON:\n" + f'{{\n' + f' "verdict": "pass|fail",\n' + f' "summary": "...",\n' + f' "issues": []\n' + f'}}' + ), + acceptance_criteria=["verdict is pass or fail"], + context={"ws_results_summary": {k: "done" for k in ws_results}}, + retry_budget=1, + preferred_runtime="standard", + agent_personality=persona, + ) + brief.validate() + self.bb.create_brief(brief) + + result = await self._dispatch_with_retry(brief) + accept_json = self._try_parse_json(result) + self.bb.update_brief_result(brief.brief_id, accept_json) + + verdict = accept_json.get("verdict", "pass") + summary = accept_json.get("summary", "T1 accept complete") + + if verdict == "fail": + issues = accept_json.get("issues", []) + self._log(f"T1 Accept verdict: fail — {issues}", level="warning") + self.bb.log_event( + "escalated", + detail={"tier": 1, "phase": "accept", "issues": issues}, + ) + + # Create PR + pr_url = "" + if not self.dry_run: + try: + workstreams = workplan.get("workstreams", []) + ws_lines = "\n".join( + f"- **{ws['name']}** ({', '.join(ws.get('tier_path', []))}): " + f"{ws.get('notes', '')}" + for ws in workstreams + ) + pr_body = ( + f"## Run `{self.run_id}`\n\n" + f"**Goal:** {goal}\n\n" + f"**Complexity:** {workplan.get('complexity', 'unknown')}\n\n" + f"### Workstreams\n{ws_lines}\n\n" + f"### T1 Accept Summary\n{summary}\n\n" + f"---\n*Generated by the-agency — do not auto-merge.*" + ) + pr_url = self.vcs.create_pr( + title=f"[agency] {self.run_id}: {goal[:55]}", + body=pr_body, + head=branch, + base=base_branch, + ) + self._log(f"PR created: {pr_url}") + except Exception as exc: + self._log(f"Warning: PR creation failed: {exc}", level="warning") + + self.notify.send( + f"[agency] Run {self.run_id} complete — " + f"verdict={verdict!r} PR={pr_url or '(none)'}\n" + f"Goal: {goal}", + context={ + "run_id": self.run_id, + "goal": goal, + "verdict": verdict, + "pr_url": pr_url, + }, + ) + + return pr_url + + # ------------------------------------------------------------------ + # Core dispatch + # ------------------------------------------------------------------ + + async def _dispatch_brief(self, brief: TaskBrief) -> dict: + """ + Dispatch a single brief to the appropriate runtime adapter. + Writes spawned/failed events; marks brief active. + Returns the raw result dict. + """ + runtime = self._select_runtime(brief) + task_text = self._build_task_prompt(brief) + capability = _TIER_CAPABILITY.get(brief.tier, "capable") + ctx: dict[str, Any] = { + "run_id": self.run_id, + "brief_id": brief.brief_id, + } + + self.bb.update_brief_status(brief.brief_id, "active") + self.bb.log_event( + "spawned", + brief_id=brief.brief_id, + detail={ + "tier": brief.tier, + "role": brief.role, + "runtime": type(runtime).__name__, + "dry_run": self.dry_run, + }, + ) + + if self.dry_run: + self._log( + f"[DRY RUN] T{brief.tier} {brief.role!r}: " + f"{brief.task[:80].splitlines()[0]!r}" + ) + return {"status": "done", "output": f"[dry-run T{brief.tier}/{brief.role}]"} + + try: + agent_id = runtime.spawn(task_text, capability, ctx) + timeout_s = _TIER_TIMEOUT_S.get(brief.tier, 300) + result = runtime.get_result(agent_id, timeout_s) + except TimeoutError as exc: + self.bb.log_event( + "failed", + brief_id=brief.brief_id, + detail={"error": "timeout", "msg": str(exc)}, + ) + raise + except Exception as exc: + self.bb.log_event( + "failed", + brief_id=brief.brief_id, + detail={"error": str(exc)}, + ) + raise + + # Attempt inline JSON parse of string output + if isinstance(result.get("output"), str): + raw = result["output"].strip() + try: + parsed = _extract_json(raw) + result = {**result, **parsed} + except ValueError: + result["raw_output"] = raw + result.setdefault("status", "done") + + return result + + async def _dispatch_with_retry(self, brief: TaskBrief) -> dict: + """ + Dispatch a brief and apply EscalationHandler retry/salvage/escalate logic. + Returns the final accepted result dict. + On escalation, returns result with ``escalated=True`` rather than raising. + """ + current = brief + + while True: + result = await self._dispatch_brief(current) + decision = self._escalation.handle(current, result) + + if decision.action == "complete": + return result + + elif decision.action in ("retry", "salvage_and_retry"): + self._log( + f"T{brief.tier} {brief.role!r}: {decision.action} — {decision.reason}" + ) + self.bb.log_event( + "retried", + brief_id=current.brief_id, + detail={"action": decision.action, "reason": decision.reason}, + ) + self.bb.increment_brief_retry(current.brief_id) + current = decision.amended_brief # type: ignore[assignment] + self.bb.create_brief(current) + + else: # escalate + self._log( + f"T{brief.tier} {brief.role!r}: escalating — {decision.reason}", + level="warning", + ) + self.bb.log_event( + "escalated", + brief_id=current.brief_id, + detail={"reason": decision.reason}, + ) + self.bb.update_brief_status(current.brief_id, "failed") + return {**result, "escalated": True, "escalation_reason": decision.reason} + + # ------------------------------------------------------------------ + # Gate logic + # ------------------------------------------------------------------ + + async def _check_gate( + self, + gate_name: str, + brief: Optional[TaskBrief], + summary: str, + what_happens_next: str, + ) -> None: + """ + Halt execution and wait for human gate approval. + + Writes gate_pending, notifies, then polls the blackboard for + gate_approved or gate_rejected. gate_approved → continue; + gate_rejected → raise GateRejectedError; timeout → raise GateTimeoutError. + """ + brief_id = brief.brief_id if brief else None + gate_pending_time = _now_iso() + + self._log(f"Gate {gate_name!r} — {summary}") + self.bb.log_event( + "gate_pending", + brief_id=brief_id, + detail={ + "gate": gate_name, + "summary": summary, + "what_happens_next": what_happens_next, + }, + ) + + self.notify.send( + f"[agency] GATE {gate_name!r} — {summary}\n" + f"Run: {self.run_id}\n" + f"Next: {what_happens_next}\n\n" + f" agency approve {self.run_id} # continue\n" + f" agency reject {self.run_id} --reason '...' # re-invoke tier", + context={ + "run_id": self.run_id, + "gate": gate_name, + "summary": summary, + "what_happens_next": what_happens_next, + }, + ) + + deadline = time.monotonic() + self._gate_timeout_minutes * 60 + + while time.monotonic() < deadline: + ev = self.bb.get_latest_gate_event(gate_name, after_iso=gate_pending_time) + if ev: + kind = ev["kind"] + detail: dict = {} + try: + detail = json.loads(ev.get("detail") or "{}") + except (json.JSONDecodeError, TypeError): + pass + + if kind == "gate_approved": + self._log(f"Gate {gate_name!r} approved") + return + + if kind == "gate_rejected": + reason = detail.get("reason", "no reason given") + self._log(f"Gate {gate_name!r} rejected: {reason}", level="warning") + raise GateRejectedError(gate=gate_name, reason=reason) + + await asyncio.sleep(_GATE_POLL_INTERVAL_S) + + # Timeout → auto-reject + self._log( + f"Gate {gate_name!r} timed out after {self._gate_timeout_minutes}m — auto-rejecting", + level="warning", + ) + self.bb.log_event( + "gate_rejected", + brief_id=brief_id, + detail={ + "gate": gate_name, + "reason": "gate_timeout", + "timeout_minutes": self._gate_timeout_minutes, + }, + ) + self.notify.send( + f"[agency] Run {self.run_id}: gate {gate_name!r} timed out — run paused", + context={"run_id": self.run_id, "gate": gate_name}, + ) + raise GateTimeoutError(gate=gate_name, timeout_minutes=self._gate_timeout_minutes) + + # ------------------------------------------------------------------ + # Path amendment monitor + # ------------------------------------------------------------------ + + async def monitor_path_amendments(self) -> None: + """ + Background task: poll for path_amendment events and relay to + the relevant higher tier via a log event + notification. + + Run this as an asyncio background task during the pipeline if needed. + """ + seen_ids: set[str] = set() + while True: + events = self.bb.get_events(kinds=["path_amendment"]) + for ev in events: + if ev["event_id"] in seen_ids: + continue + seen_ids.add(ev["event_id"]) + try: + detail = json.loads(ev.get("detail") or "{}") + except (json.JSONDecodeError, TypeError): + detail = {} + proposed_by = detail.get("proposed_by", "unknown") + reason = detail.get("reason", "") + amendment = detail.get("amendment", {}) + self._log( + f"Path amendment from {proposed_by!r}: {reason}", + level="warning", + ) + self.notify.send( + f"[agency] Path amendment — {proposed_by}: {reason}\n" + f"Amendment: {json.dumps(amendment)}\n" + f"Run: {self.run_id}", + context={ + "run_id": self.run_id, + "proposed_by": proposed_by, + "reason": reason, + "amendment": amendment, + }, + ) + await asyncio.sleep(_PATH_AMENDMENT_POLL_INTERVAL_S) + + # ------------------------------------------------------------------ + # Runtime selection + # ------------------------------------------------------------------ + + def _select_runtime(self, brief: TaskBrief) -> Any: + """Select the runtime adapter for a brief.""" + tier_key = f"t{brief.tier}" + override = self._tier_runtime_map.get(tier_key) + if override == "coding_agent" or ( + not override and brief.preferred_runtime == "coding_agent" + ): + return self._coding_runtime + return self._default_runtime + + # ------------------------------------------------------------------ + # Persona / prompt helpers + # ------------------------------------------------------------------ + + def _resolve_persona(self, tier_key: str, domain: str) -> Optional[str]: + """Return the agent personality file path, or None if not found.""" + tier_entries = self.role_registry.get(tier_key, {}) + path = tier_entries.get(domain) or tier_entries.get("default") + if path and os.path.exists(path): + return path + if path: + logger.debug("Persona file not found at %s — using no personality", path) + return None + + def _load_prompt(self, prompt_name: str) -> str: + """Load a fallback tier prompt from prompts/.""" + path = f"prompts/{prompt_name}.md" + try: + return Path(path).read_text() + except FileNotFoundError: + return "" + + def _build_task_prompt(self, brief: TaskBrief) -> str: + """Compose the full task string sent to the runtime.""" + parts: list[str] = [] + + if brief.agent_personality: + try: + personality = Path(brief.agent_personality).read_text() + parts.append(f"\n{personality}\n\n") + except FileNotFoundError: + logger.warning("Personality file not found: %s", brief.agent_personality) + + parts.append(brief.task) + + if brief.context: + try: + ctx_json = json.dumps(brief.context, indent=2) + parts.append(f"\n\nContext:\n{ctx_json}") + except (TypeError, ValueError): + pass + + if brief.acceptance_criteria: + criteria = "\n".join(f"- {c}" for c in brief.acceptance_criteria) + parts.append(f"\n\nAcceptance criteria:\n{criteria}") + + return "\n".join(parts) + + # ------------------------------------------------------------------ + # Result parsing helpers + # ------------------------------------------------------------------ + + def _parse_json_result(self, result: dict, brief: TaskBrief) -> dict: + """Extract a JSON dict from an agent result, raising on failure.""" + # Already parsed inline in _dispatch_brief + for key in ("workstreams", "goal_anchor", "complexity", "run_id", "task_list"): + if key in result: + return result + + raw = result.get("raw_output") or result.get("output", "") + if isinstance(raw, str) and raw.strip(): + try: + return _extract_json(raw) + except ValueError: + pass + + raise ValueError( + f"Could not parse JSON workplan from T{brief.tier}/{brief.role} result. " + f"raw={str(result)[:300]!r}" + ) + + def _try_parse_json(self, result: dict) -> dict: + """ + Best-effort JSON extraction from an agent result. + Returns result as-is if it already looks like structured output, + otherwise attempts to parse raw_output/output. + """ + if isinstance(result, dict) and len(result) > 2: + return result + + raw = result.get("raw_output") or result.get("output", "") + if isinstance(raw, str) and raw.strip(): + try: + return _extract_json(raw) + except ValueError: + pass + + return result + + # ------------------------------------------------------------------ + # Formatting helpers + # ------------------------------------------------------------------ + + def _format_workplan_summary(self, workplan: dict) -> str: + workstreams = workplan.get("workstreams", []) + complexity = workplan.get("complexity", "unknown") + lines = [ + f"Complexity: {complexity}", + f"Retry budget multiplier: {workplan.get('retry_budget_multiplier', 1)}x", + f"Workstreams ({len(workstreams)}):", + ] + for ws in workstreams: + path_str = " → ".join(ws.get("tier_path", [])) + lines.append(f" [{ws.get('parallel_group', '?')}] {ws.get('name', ws.get('id'))} — {path_str}") + critique = workplan.get("self_critique_summary", "") + if critique: + lines.append(f"Self-critique: {critique}") + return "\n".join(lines) + + @staticmethod + def _safe_ctx(ctx: Any, max_len: int = 4000) -> Any: + """Truncate a context value to avoid token overflow in prompts.""" + if isinstance(ctx, dict): + try: + s = json.dumps(ctx) + if len(s) <= max_len: + return ctx + return {"_truncated": True, "preview": s[:max_len]} + except (TypeError, ValueError): + return str(ctx)[:max_len] + return ctx + + # ------------------------------------------------------------------ + # Logging + # ------------------------------------------------------------------ + + def _log(self, message: str, level: str = "info") -> None: + """Write a log event to the blackboard and to the Python logger.""" + try: + self.bb.log_event("log", detail={"level": level, "message": message}) + except Exception: + pass + getattr(logger, level if level in ("debug", "info", "warning", "error") else "info")( + "[%s] %s", self.run_id, message + ) # --------------------------------------------------------------------------- -# CLI entry point (Phase 2) +# CLI entry point (thin wrapper — full CLI in cli/agency.py) # --------------------------------------------------------------------------- -# TODO (Phase 2): Implement argparse CLI. -# if __name__ == "__main__": -# parser = argparse.ArgumentParser(description="Run the-agency pipeline.") -# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml") -# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing") -# args = parser.parse_args() -# runner = TeamRunner(config_path=args.config) -# runner.run() +if __name__ == "__main__": + import argparse + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + parser = argparse.ArgumentParser(description="Run the-agency pipeline.") + parser.add_argument("--config", default="config/team.yaml") + parser.add_argument("--registry", default="config/role_registry.yaml") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + runner = TeamRunner( + config_path=args.config, + role_registry_path=args.registry, + dry_run=args.dry_run, + ) + runner.run()