""" core/team_runner.py Top-level orchestration entry point for the-agency pipeline. The TeamRunner loads team.yaml, builds the adapter registry, and drives the full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling. CLI usage:: python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose] """ from __future__ import annotations import argparse import json import logging import os import re import uuid from typing import Optional import yaml from core.blackboard import Blackboard from core.escalation import EscalationHandler from core.task_brief import TaskBrief from adapters.base.llm import LLMAdapter from adapters.base.notify import NotifyAdapter from adapters.base.runtime import RuntimeAdapter from adapters.base.vcs import VCSAdapter from adapters.llm.anthropic import AnthropicAdapter from adapters.notify.openclaw import OpenClawNotifyAdapter from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter from adapters.runtime.openclaw import OpenClawRuntimeAdapter from adapters.vcs.github import GitHubAdapter logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- # Maps tier number → prompt file path (relative to project root). _TIER_PROMPTS: dict[int, str] = { 1: "prompts/t1_visionary.md", 2: "prompts/t2_architect.md", 3: "prompts/t3_squad_lead.md", 4: "prompts/t4_implementer.md", 5: "prompts/t5_verifier.md", } # Maps tier number → LLM capability hint. _TIER_CAPABILITIES: dict[int, str] = { 1: "reasoning-heavy", 2: "reasoning-heavy", 3: "capable", 4: "capable", 5: "fast-cheap", } # Adapter registry: config key → concrete class. _LLM_ADAPTERS: dict[str, type[LLMAdapter]] = { "anthropic": AnthropicAdapter, } _VCS_ADAPTERS: dict[str, type[VCSAdapter]] = { "github": GitHubAdapter, } _NOTIFY_ADAPTERS: dict[str, type[NotifyAdapter]] = { "openclaw": OpenClawNotifyAdapter, } _RUNTIME_ADAPTERS: dict[str, type[RuntimeAdapter]] = { "openclaw": OpenClawRuntimeAdapter, "claude_code": ClaudeCodeRuntimeAdapter, } # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- class EscalationError(RuntimeError): """Raised when a brief escalates past its retry budget with no recovery.""" # --------------------------------------------------------------------------- # TeamRunner # --------------------------------------------------------------------------- class TeamRunner: """ Orchestrates a full T1→T5 agent pipeline run. Usage:: runner = TeamRunner(config_path="config/team.yaml") runner.run() Dry-run mode logs all planned actions but skips LLM calls, VCS commits, and notifications:: runner = TeamRunner(config_path="config/team.yaml", dry_run=True) runner.run() """ def __init__( self, config_path: str = "config/team.yaml", dry_run: bool = False, ) -> None: """ Load configuration and instantiate adapters. Parameters ---------- config_path : Path to team.yaml. dry_run : When True, skip LLM calls, VCS commits, and notifications. All planned actions are logged at INFO level. """ self._dry_run = dry_run self._config = self._load_yaml(config_path) self._role_registry = self._load_yaml("config/role_registry.yaml") self._escalation = EscalationHandler() run_id = str(uuid.uuid4()) self._bb = Blackboard(run_id=run_id) # Build adapters — VCS and notify are optional and swallow init errors. adapter_cfg: dict = self._config.get("adapters", {}) runtime_cfg: dict = self._config.get("runtime", {}) self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic")) self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment] _VCS_ADAPTERS, adapter_cfg.get("vcs"), "VCS" ) self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment] _NOTIFY_ADAPTERS, adapter_cfg.get("notify"), "notify" ) self._default_runtime: RuntimeAdapter = self._build_runtime( runtime_cfg.get("default", "openclaw") ) self._coding_runtime: RuntimeAdapter = self._build_runtime( runtime_cfg.get("coding_agent", "claude_code") ) logger.info( "TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run ) # ------------------------------------------------------------------ # Configuration helpers # ------------------------------------------------------------------ @staticmethod def _load_yaml(path: str) -> dict: with open(path, "r", encoding="utf-8") as fh: return yaml.safe_load(fh) or {} @staticmethod def _load_text(path: str) -> str: with open(path, "r", encoding="utf-8") as fh: return fh.read() def _build_llm(self, key: str) -> LLMAdapter: cls = _LLM_ADAPTERS.get(key) if cls is None: raise ValueError( f"Unknown LLM adapter {key!r}. Known: {list(_LLM_ADAPTERS)}" ) return cls(self._config) def _build_optional( self, registry: dict, key: Optional[str], label: str, ) -> Optional[object]: """Build an optional adapter, returning None on any init error.""" if not key: return None cls = registry.get(key) if cls is None: logger.warning("Unknown %s adapter %r — skipping.", label, key) return None try: return cls(self._config) except Exception as exc: logger.warning( "%s adapter %r could not be initialised (%s) — skipping.", label, key, exc, ) return None def _build_runtime(self, key: str) -> RuntimeAdapter: cls = _RUNTIME_ADAPTERS.get(key) if cls is None: raise ValueError( f"Unknown runtime adapter {key!r}. Known: {list(_RUNTIME_ADAPTERS)}" ) return cls(self._config) # ------------------------------------------------------------------ # Role registry # ------------------------------------------------------------------ def _resolve_personality(self, tier: int, role: str) -> Optional[str]: """Return the path to the agent persona .md file, or None.""" tier_key = f"t{tier}" tier_map: dict = self._role_registry.get(tier_key, {}) path = tier_map.get(role) or tier_map.get("default") if path and os.path.isfile(path): return path return None # ------------------------------------------------------------------ # Prompt helpers # ------------------------------------------------------------------ def _load_tier_prompt(self, tier: int) -> str: """Load the system prompt for a tier from the prompts/ directory.""" path = _TIER_PROMPTS.get(tier, "") if path and os.path.isfile(path): return self._load_text(path) logger.warning("Tier %d prompt not found at %r", tier, path) return "" def _load_personality(self, path: Optional[str]) -> str: if path and os.path.isfile(path): return self._load_text(path) return "" @staticmethod def _extract_json(text: str) -> dict: """ Extract a JSON object from a potentially markdown-wrapped LLM response. Strips leading/trailing markdown fences (```json ... ```) then parses. Falls back to a regex scan for the first ``{...}`` block if plain parsing fails. """ text = text.strip() # Strip markdown fences. if text.startswith("```"): text = re.sub(r"^```[a-z]*\n?", "", text) text = re.sub(r"\n?```\s*$", "", text.strip()) try: return json.loads(text) except json.JSONDecodeError: m = re.search(r"\{.*\}", text, re.DOTALL) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass raise ValueError( "Could not parse JSON from LLM response.\n" f"Response (first 500 chars): {text[:500]}" ) # ------------------------------------------------------------------ # Brief dispatch # ------------------------------------------------------------------ def _dispatch_brief(self, brief: TaskBrief) -> dict: """ Send a TaskBrief to the appropriate agent and return the raw result dict. Routing ------- preferred_runtime == "coding_agent" → coding runtime adapter preferred_runtime == "standard" → LLM adapter directly Blackboard events emitted: spawned → completed | failed. """ if self._dry_run: logger.info( "[DRY-RUN] dispatch tier=%d role=%s task=%.80s", brief.tier, brief.role, brief.task, ) return {"status": "done", "output": "{}", "artifacts": []} self._bb.update_brief_status(brief.brief_id, "active") self._bb.log_event( "spawned", brief_id=brief.brief_id, detail={"tier": brief.tier, "role": brief.role}, ) try: if brief.preferred_runtime == "coding_agent": result = self._dispatch_via_runtime(brief) else: result = self._dispatch_via_llm(brief) self._bb.update_brief_result(brief.brief_id, result) self._bb.log_event( "completed", brief_id=brief.brief_id, detail={"status": result.get("status")}, ) return result except Exception as exc: self._bb.update_brief_status(brief.brief_id, "failed") self._bb.log_event( "failed", brief_id=brief.brief_id, detail={"error": str(exc)}, ) raise def _dispatch_via_llm(self, brief: TaskBrief) -> dict: """Call the LLM adapter with the tier system prompt + brief JSON.""" tier_prompt = self._load_tier_prompt(brief.tier) personality = self._load_personality(brief.agent_personality) system_prompt = "\n\n".join(filter(None, [tier_prompt, personality])) capability = _TIER_CAPABILITIES.get(brief.tier, "capable") user_message = json.dumps(brief.to_dict(), indent=2) raw = self._llm.complete( prompt=user_message, capability=capability, context={ "system_prompt": system_prompt, "max_tokens": 4096, "temperature": 0, }, ) return self._extract_json(raw) def _dispatch_via_runtime(self, brief: TaskBrief) -> dict: """Spawn a coding agent via the runtime adapter and collect its result.""" task_str = json.dumps(brief.to_dict(), indent=2) capability = _TIER_CAPABILITIES.get(brief.tier, "capable") timeout_s: int = brief.context.get("timeout_s", 300) agent_id = self._coding_runtime.spawn( task=task_str, capability=capability, context=brief.context, ) logger.info( "Spawned coding agent %s for brief %s", agent_id, brief.brief_id ) result = self._coding_runtime.get_result(agent_id, timeout_s=timeout_s) # Attempt to parse JSON from the agent's text output. if isinstance(result.get("output"), str) and result["output"].strip(): try: parsed = self._extract_json(result["output"]) result.update(parsed) except ValueError: pass # Keep raw string output as-is. return result # ------------------------------------------------------------------ # Escalation loop # ------------------------------------------------------------------ def _run_with_escalation( self, brief: TaskBrief, workstream_id: Optional[str] = None, ) -> dict: """ Dispatch a brief and apply the escalation policy until done or exhausted. On retry the amended brief is persisted to the Blackboard before being re-submitted. """ while True: result = self._dispatch_brief(brief) decision = self._escalation.handle(brief, result) if decision.action == "complete": return result if decision.action == "escalate": self._bb.log_event( "escalated", brief_id=brief.brief_id, detail={"reason": decision.reason}, ) raise EscalationError( f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) " f"escalated: {decision.reason}" ) # "retry" or "salvage_and_retry" self._bb.log_event( "retried", brief_id=brief.brief_id, detail={"reason": decision.reason, "action": decision.action}, ) amended = decision.amended_brief if amended is None: raise EscalationError( f"Escalation returned action={decision.action!r} " "but no amended_brief was provided." ) # Persist the new brief and loop. self._bb.create_brief(amended, workstream_id=workstream_id) brief = amended # ------------------------------------------------------------------ # Tier output parsers # ------------------------------------------------------------------ def _parse_t1_output( self, result: dict, root_brief: TaskBrief ) -> list[TaskBrief]: """Build T2 TaskBriefs from T1 (Visionary) JSON output.""" retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) workstreams: list[dict] = result.get("workstreams", []) # T1 sets the canonical goal_anchor; propagate it back to root. goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor root_brief.goal_anchor = goal_anchor briefs: list[TaskBrief] = [] for ws in workstreams: role = ws.get("role", "default") brief = root_brief.make_child_brief( tier=2, role=role, task=ws.get("task", ""), workstream=ws.get("name", ""), acceptance_criteria=ws.get("acceptance_criteria", []), preferred_runtime="standard", agent_personality=self._resolve_personality(2, role), retry_budget=retry_bad, ) briefs.append(brief) return briefs def _parse_t2_output( self, result: dict, parent: TaskBrief ) -> list[TaskBrief]: """Build T3 TaskBriefs from T2 (Architect) JSON output.""" retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) subtasks: list[dict] = result.get("subtasks", []) arch_summary: str = result.get("architecture_summary", "") briefs: list[TaskBrief] = [] for st in subtasks: role = st.get("role", "default") brief = parent.make_child_brief( tier=3, role=role, task=st.get("task", ""), workstream=parent.workstream, acceptance_criteria=st.get("acceptance_criteria", []), preferred_runtime=st.get("preferred_runtime", "standard"), agent_personality=self._resolve_personality(3, role), retry_budget=retry_bad, context={"architecture_summary": arch_summary}, ) briefs.append(brief) return briefs def _parse_t3_output( self, result: dict, parent: TaskBrief ) -> list[TaskBrief]: """Build T4 TaskBriefs from T3 (Squad Lead) JSON output.""" retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) tasks: list[dict] = result.get("tasks", []) plan_summary: str = result.get("plan_summary", "") briefs: list[TaskBrief] = [] for task in tasks: role = task.get("role", "default") pref_runtime = task.get("preferred_runtime", "standard") brief = parent.make_child_brief( tier=4, role=role, task=task.get("task", ""), workstream=parent.workstream, acceptance_criteria=task.get("acceptance_criteria", []), preferred_runtime=pref_runtime, agent_personality=self._resolve_personality(4, role), retry_budget=retry_bad, context={ "plan_summary": plan_summary, "depends_on": task.get("depends_on", []), }, ) briefs.append(brief) return briefs # ------------------------------------------------------------------ # VCS helpers # ------------------------------------------------------------------ def _commit_artifacts( self, artifacts: list[dict], brief: TaskBrief, ) -> None: """Commit T4 *file* artifacts to the configured VCS adapter.""" if not self._vcs or self._dry_run: if self._dry_run: logger.info( "[DRY-RUN] Would commit %d artifact(s) for brief %s", len(artifacts), brief.brief_id, ) return file_map: dict[str, str] = { a["path"]: a["content"] for a in artifacts if a.get("type") == "file" and a.get("path") and a.get("content") is not None } if not file_map: return branch: str = self._config.get("run", {}).get("base_branch", "main") message = ( f"feat({brief.workstream}): artifacts from {brief.role} " f"[brief {brief.brief_id[:8]}]" ) try: # GitHubAdapter.commit accepts dict[str, str] as files. sha = self._vcs.commit(file_map, message) # type: ignore[call-arg] logger.info( "Committed %d artifact(s) → SHA %s", len(file_map), sha ) except Exception as exc: logger.warning("VCS commit failed: %s", exc) # ------------------------------------------------------------------ # Notification # ------------------------------------------------------------------ def _notify_run(self, outcome: str, goal: str, detail: dict) -> None: if not self._notify or self._dry_run: if self._dry_run: logger.info( "[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal ) return level = "info" if outcome == "complete" else "error" if outcome == "complete": message = f"Pipeline complete: {goal[:80]}" else: message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}" self._notify.send( message, context={ "level": level, "run_id": self._bb.run_id, "outcome": outcome, **{k: str(v) for k, v in detail.items()}, }, ) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def run(self) -> None: """ Execute the full T1→T5 pipeline. Steps ----- 1. Dispatch T1 Visionary to decompose the goal into workstreams. 2. For each workstream: T2 Architect → T3 Squad Lead → T4 Implementer → T5 Verifier. 3. Commit passing T4 artifacts via VCS adapter (if configured). 4. Notify on completion or terminal failure via notify adapter. """ goal: str = self._config["run"]["goal"] self._bb.create_run(goal=goal) self._bb.update_run_status("active") logger.info("Pipeline started — goal: %s", goal) try: self._orchestrate(goal) self._bb.update_run_status("done") summary = self._bb.get_run_summary() logger.info("Pipeline complete. Summary: %s", summary) self._notify_run("complete", goal, summary) except Exception as exc: self._bb.update_run_status("failed") logger.error("Pipeline failed: %s", exc, exc_info=True) self._notify_run("failed", goal, {"error": str(exc)}) raise finally: self._bb.close() # ------------------------------------------------------------------ # Internal orchestration # ------------------------------------------------------------------ def _orchestrate(self, goal: str) -> None: """Build the root T1 brief, dispatch it, and fan out per workstream.""" retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) # ---- T1: Visionary ---- t1_brief = TaskBrief( run_id=self._bb.run_id, tier=1, role="default", goal_anchor=goal, task=( "You are the T1 Visionary. " "Decompose the following goal into parallel workstreams " f"for the engineering team: {goal}" ), workstream="root", retry_budget=retry_bad, preferred_runtime="standard", agent_personality=self._resolve_personality(1, "default"), ) self._bb.create_brief(t1_brief) t1_result = self._run_with_escalation(t1_brief) t2_briefs = self._parse_t1_output(t1_result, t1_brief) logger.info("T1 produced %d workstream(s)", len(t2_briefs)) # ---- T2..T5: per workstream ---- for t2_brief in t2_briefs: ws_id = self._bb.create_workstream( name=t2_brief.workstream, tier=2 ) self._bb.create_brief(t2_brief, workstream_id=ws_id) self._bb.update_workstream_status(ws_id, "active") try: self._run_workstream(t2_brief, ws_id) self._bb.update_workstream_status(ws_id, "done") except EscalationError as exc: self._bb.update_workstream_status(ws_id, "failed") self._bb.log_event( "failed", detail={"error": str(exc), "workstream": t2_brief.workstream}, ) logger.error( "Workstream %r failed: %s", t2_brief.workstream, exc ) def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None: """Drive T2 → T3 → T4 → T5 for a single workstream.""" # T2: Architect t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id) t3_briefs = self._parse_t2_output(t2_result, t2_brief) logger.info( "T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs) ) for t3_brief in t3_briefs: self._bb.create_brief(t3_brief, workstream_id=ws_id) try: # T3: Squad Lead t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id) t4_briefs = self._parse_t3_output(t3_result, t3_brief) logger.info( "T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs) ) for t4_brief in t4_briefs: self._bb.create_brief(t4_brief, workstream_id=ws_id) try: # T4: Implementer t4_result = self._run_with_escalation( t4_brief, workstream_id=ws_id ) artifacts: list[dict] = t4_result.get("artifacts", []) # T5: Verifier t5_brief = t4_brief.make_child_brief( tier=5, role="code", task=( "Verify the following T4 implementation artifacts " "against all acceptance criteria. " f"T4 output: {json.dumps(t4_result)[:2000]}" ), workstream=t4_brief.workstream, acceptance_criteria=t4_brief.acceptance_criteria, preferred_runtime="standard", agent_personality=self._resolve_personality(5, "code"), retry_budget=self._config.get( "retry_defaults", {} ).get("bad_output", 3), context={"t4_result": t4_result}, ) self._bb.create_brief(t5_brief, workstream_id=ws_id) t5_result = self._run_with_escalation( t5_brief, workstream_id=ws_id ) # Commit on verified pass. if t5_result.get("status") in ("passed", "done"): self._commit_artifacts(artifacts, t4_brief) except EscalationError as exc: logger.error( "T4/T5 escalation in %s: %s", t4_brief.role, exc ) except EscalationError as exc: logger.error("T3 escalation in %s: %s", t3_brief.role, exc) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def _configure_logging(verbose: bool = False) -> None: level = logging.DEBUG if verbose else logging.INFO logging.basicConfig( level=level, format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Run the-agency T1→T5 pipeline.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( "--config", default="config/team.yaml", help="Path to team.yaml configuration file.", ) parser.add_argument( "--dry-run", action="store_true", help=( "Log all planned actions without executing LLM calls, " "VCS commits, or notifications." ), ) parser.add_argument( "--verbose", action="store_true", help="Enable DEBUG-level logging.", ) args = parser.parse_args() _configure_logging(args.verbose) runner = TeamRunner(config_path=args.config, dry_run=args.dry_run) runner.run()