From e097f4be21f2e64ea7cf0259095383c1437c7caf Mon Sep 17 00:00:00 2001 From: Hans Heinemann Date: Sun, 15 Mar 2026 03:15:37 -0400 Subject: [PATCH] feat(core): implement TeamRunner orchestration loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Full T1→T5 pipeline orchestration with adapter registry, escalation, and blackboard event emission. Key design decisions: - Adapter registry maps config keys to concrete classes; VCS and notify are optional (swallow init errors and degrade gracefully) - _dispatch_brief() routes to LLM adapter (standard) or coding runtime (coding_agent) based on brief.preferred_runtime - _run_with_escalation() drives the retry/salvage loop: persists amended briefs to the Blackboard before each re-submission - Tier parsers (_parse_t1/t2/t3_output) build child TaskBriefs, preserving the goal_anchor invariant and resolving agent personalities from the registry - T5 Verifier is always spawned after T4; VCS commit only happens on verified pass (status "passed" or "done") - --dry-run flag: logs all actions, skips LLM, VCS, and notify calls - Exposes CLI via `python -m core.team_runner` with --config, --dry-run, --verbose flags Co-Authored-By: Claude Sonnet 4.6 --- core/team_runner.py | 798 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 729 insertions(+), 69 deletions(-) diff --git a/core/team_runner.py b/core/team_runner.py index d0c260e..5e1c105 100644 --- a/core/team_runner.py +++ b/core/team_runner.py @@ -1,99 +1,759 @@ """ core/team_runner.py -Top-level orchestration entry point — Phase 2 stub. +Top-level orchestration entry point for the-agency pipeline. -The TeamRunner is responsible for: - 1. Loading config/team.yaml and config/role_registry.yaml. - 2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime). - 3. Creating a Blackboard for the run. - 4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary. - 5. Recursively spawning T2→T5 briefs based on tier outputs. - 6. Using EscalationHandler to manage retries, salvage, and escalation. - 7. Writing final run status and summary to the Blackboard. +The TeamRunner loads team.yaml, builds the adapter registry, and drives the +full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling. -TODO (Phase 2): - - Load and validate team.yaml configuration. - - Build adapter registry (map adapter keys → concrete adapter classes). - - Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5. - - Parse tier JSON outputs into child TaskBrief objects via make_child_brief(). - - Integrate EscalationHandler into the dispatch loop. - - Support --dry-run flag (log actions without executing). - - Emit blackboard events at each stage (spawned, completed, failed, etc.). - - Expose a CLI entry point (argparse or click). +CLI usage:: + + python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose] """ from __future__ import annotations -# TODO (Phase 2): Uncomment and implement imports as adapters are built. -# import argparse -# import yaml -# from core.task_brief import TaskBrief -# from core.blackboard import Blackboard -# from core.escalation import EscalationHandler -# from adapters.llm.anthropic import AnthropicAdapter -# from adapters.vcs.github import GitHubAdapter -# from adapters.notify.openclaw import OpenClawNotifyAdapter -# from adapters.runtime.openclaw import OpenClawRuntimeAdapter -# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter +import argparse +import json +import logging +import os +import re +import uuid +from typing import Optional +import yaml + +from core.blackboard import Blackboard +from core.escalation import EscalationHandler +from core.task_brief import TaskBrief + +from adapters.base.llm import LLMAdapter +from adapters.base.notify import NotifyAdapter +from adapters.base.runtime import RuntimeAdapter +from adapters.base.vcs import VCSAdapter +from adapters.llm.anthropic import AnthropicAdapter +from adapters.notify.openclaw import OpenClawNotifyAdapter +from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter +from adapters.runtime.openclaw import OpenClawRuntimeAdapter +from adapters.vcs.github import GitHubAdapter + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Maps tier number → prompt file path (relative to project root). +_TIER_PROMPTS: dict[int, str] = { + 1: "prompts/t1_visionary.md", + 2: "prompts/t2_architect.md", + 3: "prompts/t3_squad_lead.md", + 4: "prompts/t4_implementer.md", + 5: "prompts/t5_verifier.md", +} + +# Maps tier number → LLM capability hint. +_TIER_CAPABILITIES: dict[int, str] = { + 1: "reasoning-heavy", + 2: "reasoning-heavy", + 3: "capable", + 4: "capable", + 5: "fast-cheap", +} + +# Adapter registry: config key → concrete class. +_LLM_ADAPTERS: dict[str, type[LLMAdapter]] = { + "anthropic": AnthropicAdapter, +} +_VCS_ADAPTERS: dict[str, type[VCSAdapter]] = { + "github": GitHubAdapter, +} +_NOTIFY_ADAPTERS: dict[str, type[NotifyAdapter]] = { + "openclaw": OpenClawNotifyAdapter, +} +_RUNTIME_ADAPTERS: dict[str, type[RuntimeAdapter]] = { + "openclaw": OpenClawRuntimeAdapter, + "claude_code": ClaudeCodeRuntimeAdapter, +} + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class EscalationError(RuntimeError): + """Raised when a brief escalates past its retry budget with no recovery.""" + + +# --------------------------------------------------------------------------- +# TeamRunner +# --------------------------------------------------------------------------- class TeamRunner: """ Orchestrates a full T1→T5 agent pipeline run. - Usage (Phase 2):: + Usage:: runner = TeamRunner(config_path="config/team.yaml") runner.run() + + Dry-run mode logs all planned actions but skips LLM calls, VCS commits, + and notifications:: + + runner = TeamRunner(config_path="config/team.yaml", dry_run=True) + runner.run() """ - def __init__(self, config_path: str = "config/team.yaml") -> None: - # TODO (Phase 2): Load YAML config. - # Instantiate adapters based on config.adapters keys. - # Create a Blackboard for this run. - raise NotImplementedError("TeamRunner.__init__ is not yet implemented.") + def __init__( + self, + config_path: str = "config/team.yaml", + dry_run: bool = False, + ) -> None: + """ + Load configuration and instantiate adapters. + + Parameters + ---------- + config_path : Path to team.yaml. + dry_run : When True, skip LLM calls, VCS commits, and notifications. + All planned actions are logged at INFO level. + """ + self._dry_run = dry_run + + self._config = self._load_yaml(config_path) + self._role_registry = self._load_yaml("config/role_registry.yaml") + self._escalation = EscalationHandler() + + run_id = str(uuid.uuid4()) + self._bb = Blackboard(run_id=run_id) + + # Build adapters — VCS and notify are optional and swallow init errors. + adapter_cfg: dict = self._config.get("adapters", {}) + runtime_cfg: dict = self._config.get("runtime", {}) + + self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic")) + self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment] + _VCS_ADAPTERS, adapter_cfg.get("vcs"), "VCS" + ) + self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment] + _NOTIFY_ADAPTERS, adapter_cfg.get("notify"), "notify" + ) + self._default_runtime: RuntimeAdapter = self._build_runtime( + runtime_cfg.get("default", "openclaw") + ) + self._coding_runtime: RuntimeAdapter = self._build_runtime( + runtime_cfg.get("coding_agent", "claude_code") + ) + + logger.info( + "TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run + ) + + # ------------------------------------------------------------------ + # Configuration helpers + # ------------------------------------------------------------------ + + @staticmethod + def _load_yaml(path: str) -> dict: + with open(path, "r", encoding="utf-8") as fh: + return yaml.safe_load(fh) or {} + + @staticmethod + def _load_text(path: str) -> str: + with open(path, "r", encoding="utf-8") as fh: + return fh.read() + + def _build_llm(self, key: str) -> LLMAdapter: + cls = _LLM_ADAPTERS.get(key) + if cls is None: + raise ValueError( + f"Unknown LLM adapter {key!r}. Known: {list(_LLM_ADAPTERS)}" + ) + return cls(self._config) + + def _build_optional( + self, + registry: dict, + key: Optional[str], + label: str, + ) -> Optional[object]: + """Build an optional adapter, returning None on any init error.""" + if not key: + return None + cls = registry.get(key) + if cls is None: + logger.warning("Unknown %s adapter %r — skipping.", label, key) + return None + try: + return cls(self._config) + except Exception as exc: + logger.warning( + "%s adapter %r could not be initialised (%s) — skipping.", + label, + key, + exc, + ) + return None + + def _build_runtime(self, key: str) -> RuntimeAdapter: + cls = _RUNTIME_ADAPTERS.get(key) + if cls is None: + raise ValueError( + f"Unknown runtime adapter {key!r}. Known: {list(_RUNTIME_ADAPTERS)}" + ) + return cls(self._config) + + # ------------------------------------------------------------------ + # Role registry + # ------------------------------------------------------------------ + + def _resolve_personality(self, tier: int, role: str) -> Optional[str]: + """Return the path to the agent persona .md file, or None.""" + tier_key = f"t{tier}" + tier_map: dict = self._role_registry.get(tier_key, {}) + path = tier_map.get(role) or tier_map.get("default") + if path and os.path.isfile(path): + return path + return None + + # ------------------------------------------------------------------ + # Prompt helpers + # ------------------------------------------------------------------ + + def _load_tier_prompt(self, tier: int) -> str: + """Load the system prompt for a tier from the prompts/ directory.""" + path = _TIER_PROMPTS.get(tier, "") + if path and os.path.isfile(path): + return self._load_text(path) + logger.warning("Tier %d prompt not found at %r", tier, path) + return "" + + def _load_personality(self, path: Optional[str]) -> str: + if path and os.path.isfile(path): + return self._load_text(path) + return "" + + @staticmethod + def _extract_json(text: str) -> dict: + """ + Extract a JSON object from a potentially markdown-wrapped LLM response. + + Strips leading/trailing markdown fences (```json ... ```) then parses. + Falls back to a regex scan for the first ``{...}`` block if plain + parsing fails. + """ + text = text.strip() + # Strip markdown fences. + if text.startswith("```"): + text = re.sub(r"^```[a-z]*\n?", "", text) + text = re.sub(r"\n?```\s*$", "", text.strip()) + + try: + return json.loads(text) + except json.JSONDecodeError: + m = re.search(r"\{.*\}", text, re.DOTALL) + if m: + try: + return json.loads(m.group(0)) + except json.JSONDecodeError: + pass + raise ValueError( + "Could not parse JSON from LLM response.\n" + f"Response (first 500 chars): {text[:500]}" + ) + + # ------------------------------------------------------------------ + # Brief dispatch + # ------------------------------------------------------------------ + + def _dispatch_brief(self, brief: TaskBrief) -> dict: + """ + Send a TaskBrief to the appropriate agent and return the raw result dict. + + Routing + ------- + preferred_runtime == "coding_agent" → coding runtime adapter + preferred_runtime == "standard" → LLM adapter directly + + Blackboard events emitted: spawned → completed | failed. + """ + if self._dry_run: + logger.info( + "[DRY-RUN] dispatch tier=%d role=%s task=%.80s", + brief.tier, + brief.role, + brief.task, + ) + return {"status": "done", "output": "{}", "artifacts": []} + + self._bb.update_brief_status(brief.brief_id, "active") + self._bb.log_event( + "spawned", + brief_id=brief.brief_id, + detail={"tier": brief.tier, "role": brief.role}, + ) + + try: + if brief.preferred_runtime == "coding_agent": + result = self._dispatch_via_runtime(brief) + else: + result = self._dispatch_via_llm(brief) + + self._bb.update_brief_result(brief.brief_id, result) + self._bb.log_event( + "completed", + brief_id=brief.brief_id, + detail={"status": result.get("status")}, + ) + return result + + except Exception as exc: + self._bb.update_brief_status(brief.brief_id, "failed") + self._bb.log_event( + "failed", + brief_id=brief.brief_id, + detail={"error": str(exc)}, + ) + raise + + def _dispatch_via_llm(self, brief: TaskBrief) -> dict: + """Call the LLM adapter with the tier system prompt + brief JSON.""" + tier_prompt = self._load_tier_prompt(brief.tier) + personality = self._load_personality(brief.agent_personality) + system_prompt = "\n\n".join(filter(None, [tier_prompt, personality])) + capability = _TIER_CAPABILITIES.get(brief.tier, "capable") + user_message = json.dumps(brief.to_dict(), indent=2) + + raw = self._llm.complete( + prompt=user_message, + capability=capability, + context={ + "system_prompt": system_prompt, + "max_tokens": 4096, + "temperature": 0, + }, + ) + return self._extract_json(raw) + + def _dispatch_via_runtime(self, brief: TaskBrief) -> dict: + """Spawn a coding agent via the runtime adapter and collect its result.""" + task_str = json.dumps(brief.to_dict(), indent=2) + capability = _TIER_CAPABILITIES.get(brief.tier, "capable") + timeout_s: int = brief.context.get("timeout_s", 300) + + agent_id = self._coding_runtime.spawn( + task=task_str, + capability=capability, + context=brief.context, + ) + logger.info( + "Spawned coding agent %s for brief %s", agent_id, brief.brief_id + ) + + result = self._coding_runtime.get_result(agent_id, timeout_s=timeout_s) + + # Attempt to parse JSON from the agent's text output. + if isinstance(result.get("output"), str) and result["output"].strip(): + try: + parsed = self._extract_json(result["output"]) + result.update(parsed) + except ValueError: + pass # Keep raw string output as-is. + + return result + + # ------------------------------------------------------------------ + # Escalation loop + # ------------------------------------------------------------------ + + def _run_with_escalation( + self, + brief: TaskBrief, + workstream_id: Optional[str] = None, + ) -> dict: + """ + Dispatch a brief and apply the escalation policy until done or exhausted. + + On retry the amended brief is persisted to the Blackboard before + being re-submitted. + """ + while True: + result = self._dispatch_brief(brief) + decision = self._escalation.handle(brief, result) + + if decision.action == "complete": + return result + + if decision.action == "escalate": + self._bb.log_event( + "escalated", + brief_id=brief.brief_id, + detail={"reason": decision.reason}, + ) + raise EscalationError( + f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) " + f"escalated: {decision.reason}" + ) + + # "retry" or "salvage_and_retry" + self._bb.log_event( + "retried", + brief_id=brief.brief_id, + detail={"reason": decision.reason, "action": decision.action}, + ) + amended = decision.amended_brief + if amended is None: + raise EscalationError( + f"Escalation returned action={decision.action!r} " + "but no amended_brief was provided." + ) + # Persist the new brief and loop. + self._bb.create_brief(amended, workstream_id=workstream_id) + brief = amended + + # ------------------------------------------------------------------ + # Tier output parsers + # ------------------------------------------------------------------ + + def _parse_t1_output( + self, result: dict, root_brief: TaskBrief + ) -> list[TaskBrief]: + """Build T2 TaskBriefs from T1 (Visionary) JSON output.""" + retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) + workstreams: list[dict] = result.get("workstreams", []) + + # T1 sets the canonical goal_anchor; propagate it back to root. + goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor + root_brief.goal_anchor = goal_anchor + + briefs: list[TaskBrief] = [] + for ws in workstreams: + role = ws.get("role", "default") + brief = root_brief.make_child_brief( + tier=2, + role=role, + task=ws.get("task", ""), + workstream=ws.get("name", ""), + acceptance_criteria=ws.get("acceptance_criteria", []), + preferred_runtime="standard", + agent_personality=self._resolve_personality(2, role), + retry_budget=retry_bad, + ) + briefs.append(brief) + return briefs + + def _parse_t2_output( + self, result: dict, parent: TaskBrief + ) -> list[TaskBrief]: + """Build T3 TaskBriefs from T2 (Architect) JSON output.""" + retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) + subtasks: list[dict] = result.get("subtasks", []) + arch_summary: str = result.get("architecture_summary", "") + + briefs: list[TaskBrief] = [] + for st in subtasks: + role = st.get("role", "default") + brief = parent.make_child_brief( + tier=3, + role=role, + task=st.get("task", ""), + workstream=parent.workstream, + acceptance_criteria=st.get("acceptance_criteria", []), + preferred_runtime=st.get("preferred_runtime", "standard"), + agent_personality=self._resolve_personality(3, role), + retry_budget=retry_bad, + context={"architecture_summary": arch_summary}, + ) + briefs.append(brief) + return briefs + + def _parse_t3_output( + self, result: dict, parent: TaskBrief + ) -> list[TaskBrief]: + """Build T4 TaskBriefs from T3 (Squad Lead) JSON output.""" + retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) + tasks: list[dict] = result.get("tasks", []) + plan_summary: str = result.get("plan_summary", "") + + briefs: list[TaskBrief] = [] + for task in tasks: + role = task.get("role", "default") + pref_runtime = task.get("preferred_runtime", "standard") + brief = parent.make_child_brief( + tier=4, + role=role, + task=task.get("task", ""), + workstream=parent.workstream, + acceptance_criteria=task.get("acceptance_criteria", []), + preferred_runtime=pref_runtime, + agent_personality=self._resolve_personality(4, role), + retry_budget=retry_bad, + context={ + "plan_summary": plan_summary, + "depends_on": task.get("depends_on", []), + }, + ) + briefs.append(brief) + return briefs + + # ------------------------------------------------------------------ + # VCS helpers + # ------------------------------------------------------------------ + + def _commit_artifacts( + self, + artifacts: list[dict], + brief: TaskBrief, + ) -> None: + """Commit T4 *file* artifacts to the configured VCS adapter.""" + if not self._vcs or self._dry_run: + if self._dry_run: + logger.info( + "[DRY-RUN] Would commit %d artifact(s) for brief %s", + len(artifacts), + brief.brief_id, + ) + return + + file_map: dict[str, str] = { + a["path"]: a["content"] + for a in artifacts + if a.get("type") == "file" + and a.get("path") + and a.get("content") is not None + } + if not file_map: + return + + branch: str = self._config.get("run", {}).get("base_branch", "main") + message = ( + f"feat({brief.workstream}): artifacts from {brief.role} " + f"[brief {brief.brief_id[:8]}]" + ) + try: + # GitHubAdapter.commit accepts dict[str, str] as files. + sha = self._vcs.commit(file_map, message) # type: ignore[call-arg] + logger.info( + "Committed %d artifact(s) → SHA %s", len(file_map), sha + ) + except Exception as exc: + logger.warning("VCS commit failed: %s", exc) + + # ------------------------------------------------------------------ + # Notification + # ------------------------------------------------------------------ + + def _notify_run(self, outcome: str, goal: str, detail: dict) -> None: + if not self._notify or self._dry_run: + if self._dry_run: + logger.info( + "[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal + ) + return + + level = "info" if outcome == "complete" else "error" + if outcome == "complete": + message = f"Pipeline complete: {goal[:80]}" + else: + message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}" + + self._notify.send( + message, + context={ + "level": level, + "run_id": self._bb.run_id, + "outcome": outcome, + **{k: str(v) for k, v in detail.items()}, + }, + ) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ def run(self) -> None: """ - Execute the full pipeline from T1 decomposition through T5 verification. + Execute the full T1→T5 pipeline. - TODO (Phase 2): - - Build root T1 brief from config.run.goal. - - Dispatch to T1 Visionary via LLM adapter. - - Parse workstreams from T1 output. - - For each workstream: dispatch T2 Architect. - - For each T2 subtask: dispatch T3 Squad Lead. - - For each T3 task: dispatch T4 Implementer. - - For each T4 artifact set: dispatch T5 Verifier. - - Run escalation handler at each tier on failure. - - Commit passing artifacts via VCS adapter. - - Notify on completion or terminal failure via notify adapter. + Steps + ----- + 1. Dispatch T1 Visionary to decompose the goal into workstreams. + 2. For each workstream: T2 Architect → T3 Squad Lead → + T4 Implementer → T5 Verifier. + 3. Commit passing T4 artifacts via VCS adapter (if configured). + 4. Notify on completion or terminal failure via notify adapter. """ - raise NotImplementedError("TeamRunner.run is not yet implemented.") + goal: str = self._config["run"]["goal"] + self._bb.create_run(goal=goal) + self._bb.update_run_status("active") + logger.info("Pipeline started — goal: %s", goal) - def _dispatch_brief(self, brief) -> dict: - """ - Send a single TaskBrief to the appropriate agent and return the result. + try: + self._orchestrate(goal) + self._bb.update_run_status("done") + summary = self._bb.get_run_summary() + logger.info("Pipeline complete. Summary: %s", summary) + self._notify_run("complete", goal, summary) + except Exception as exc: + self._bb.update_run_status("failed") + logger.error("Pipeline failed: %s", exc, exc_info=True) + self._notify_run("failed", goal, {"error": str(exc)}) + raise + finally: + self._bb.close() - TODO (Phase 2): - - Select runtime based on brief.preferred_runtime. - - Load agent personality from brief.agent_personality (if set). - - Compose prompt from tier system prompt + brief payload. - - Spawn agent via runtime adapter. - - Await result via runtime.get_result(). - - Log spawned/completed/failed events to Blackboard. - """ - raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.") + # ------------------------------------------------------------------ + # Internal orchestration + # ------------------------------------------------------------------ + + def _orchestrate(self, goal: str) -> None: + """Build the root T1 brief, dispatch it, and fan out per workstream.""" + retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3) + + # ---- T1: Visionary ---- + t1_brief = TaskBrief( + run_id=self._bb.run_id, + tier=1, + role="default", + goal_anchor=goal, + task=( + "You are the T1 Visionary. " + "Decompose the following goal into parallel workstreams " + f"for the engineering team: {goal}" + ), + workstream="root", + retry_budget=retry_bad, + preferred_runtime="standard", + agent_personality=self._resolve_personality(1, "default"), + ) + self._bb.create_brief(t1_brief) + + t1_result = self._run_with_escalation(t1_brief) + t2_briefs = self._parse_t1_output(t1_result, t1_brief) + logger.info("T1 produced %d workstream(s)", len(t2_briefs)) + + # ---- T2..T5: per workstream ---- + for t2_brief in t2_briefs: + ws_id = self._bb.create_workstream( + name=t2_brief.workstream, tier=2 + ) + self._bb.create_brief(t2_brief, workstream_id=ws_id) + self._bb.update_workstream_status(ws_id, "active") + + try: + self._run_workstream(t2_brief, ws_id) + self._bb.update_workstream_status(ws_id, "done") + except EscalationError as exc: + self._bb.update_workstream_status(ws_id, "failed") + self._bb.log_event( + "failed", + detail={"error": str(exc), "workstream": t2_brief.workstream}, + ) + logger.error( + "Workstream %r failed: %s", t2_brief.workstream, exc + ) + + def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None: + """Drive T2 → T3 → T4 → T5 for a single workstream.""" + # T2: Architect + t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id) + t3_briefs = self._parse_t2_output(t2_result, t2_brief) + logger.info( + "T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs) + ) + + for t3_brief in t3_briefs: + self._bb.create_brief(t3_brief, workstream_id=ws_id) + try: + # T3: Squad Lead + t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id) + t4_briefs = self._parse_t3_output(t3_result, t3_brief) + logger.info( + "T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs) + ) + + for t4_brief in t4_briefs: + self._bb.create_brief(t4_brief, workstream_id=ws_id) + try: + # T4: Implementer + t4_result = self._run_with_escalation( + t4_brief, workstream_id=ws_id + ) + artifacts: list[dict] = t4_result.get("artifacts", []) + + # T5: Verifier + t5_brief = t4_brief.make_child_brief( + tier=5, + role="code", + task=( + "Verify the following T4 implementation artifacts " + "against all acceptance criteria. " + f"T4 output: {json.dumps(t4_result)[:2000]}" + ), + workstream=t4_brief.workstream, + acceptance_criteria=t4_brief.acceptance_criteria, + preferred_runtime="standard", + agent_personality=self._resolve_personality(5, "code"), + retry_budget=self._config.get( + "retry_defaults", {} + ).get("bad_output", 3), + context={"t4_result": t4_result}, + ) + self._bb.create_brief(t5_brief, workstream_id=ws_id) + t5_result = self._run_with_escalation( + t5_brief, workstream_id=ws_id + ) + + # Commit on verified pass. + if t5_result.get("status") in ("passed", "done"): + self._commit_artifacts(artifacts, t4_brief) + + except EscalationError as exc: + logger.error( + "T4/T5 escalation in %s: %s", t4_brief.role, exc + ) + + except EscalationError as exc: + logger.error("T3 escalation in %s: %s", t3_brief.role, exc) # --------------------------------------------------------------------------- -# CLI entry point (Phase 2) +# CLI entry point # --------------------------------------------------------------------------- -# TODO (Phase 2): Implement argparse CLI. -# if __name__ == "__main__": -# parser = argparse.ArgumentParser(description="Run the-agency pipeline.") -# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml") -# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing") -# args = parser.parse_args() -# runner = TeamRunner(config_path=args.config) -# runner.run() +def _configure_logging(verbose: bool = False) -> None: + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run the-agency T1→T5 pipeline.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--config", + default="config/team.yaml", + help="Path to team.yaml configuration file.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help=( + "Log all planned actions without executing LLM calls, " + "VCS commits, or notifications." + ), + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG-level logging.", + ) + args = parser.parse_args() + _configure_logging(args.verbose) + + runner = TeamRunner(config_path=args.config, dry_run=args.dry_run) + runner.run()