Files
the-agency/core/team_runner.py
Hans Heinemann 71316b3090 refactor(team_runner): replace static adapter imports with dynamic importlib loading
Concrete adapter classes (AnthropicAdapter, GitHubAdapter, etc.) are no
longer imported at the top of team_runner.py. Instead, each registry maps
short names to 'module.path:ClassName' strings resolved lazily via
importlib.import_module at instantiation time.

This means:
- Adding a new adapter requires only an entry in the registry string dict
  (or a full dotted path directly in team.yaml) — no changes to TeamRunner.
- Third-party / custom adapters work out of the box: set e.g.
  adapters.llm: mypackage.llm.openai:OpenAIAdapter in team.yaml.
- The runner no longer hard-wires knowledge of which concrete classes exist.

Addresses tandrewng review comment on PR #1.
2026-03-16 00:30:28 -04:00

785 lines
28 KiB
Python

"""
core/team_runner.py
Top-level orchestration entry point for the-agency pipeline.
The TeamRunner loads team.yaml, builds the adapter registry, and drives the
full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling.
CLI usage::
python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose]
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import uuid
from typing import Optional
import yaml
from core.blackboard import Blackboard
from core.escalation import EscalationHandler
from core.task_brief import TaskBrief
import importlib
from adapters.base.llm import LLMAdapter
from adapters.base.notify import NotifyAdapter
from adapters.base.runtime import RuntimeAdapter
from adapters.base.vcs import VCSAdapter
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Maps tier number → prompt file path (relative to project root).
_TIER_PROMPTS: dict[int, str] = {
1: "prompts/t1_visionary.md",
2: "prompts/t2_architect.md",
3: "prompts/t3_squad_lead.md",
4: "prompts/t4_implementer.md",
5: "prompts/t5_verifier.md",
}
# Maps tier number → LLM capability hint.
_TIER_CAPABILITIES: dict[int, str] = {
1: "reasoning-heavy",
2: "reasoning-heavy",
3: "capable",
4: "capable",
5: "fast-cheap",
}
# ---------------------------------------------------------------------------
# Adapter registries
#
# Values are "module.path:ClassName" strings resolved lazily via importlib.
# To add a new adapter, append an entry here — no changes to TeamRunner needed.
# team.yaml may also supply a full "module.path:ClassName" value directly,
# enabling third-party adapters without touching this file.
# ---------------------------------------------------------------------------
_LLM_ADAPTERS: dict[str, str] = {
"anthropic": "adapters.llm.anthropic:AnthropicAdapter",
}
_VCS_ADAPTERS: dict[str, str] = {
"github": "adapters.vcs.github:GitHubAdapter",
}
_NOTIFY_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.notify.openclaw:OpenClawNotifyAdapter",
}
_RUNTIME_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.runtime.openclaw:OpenClawRuntimeAdapter",
"claude_code": "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter",
}
def _load_adapter_class(key: str, registry: dict[str, str], label: str) -> type:
"""
Resolve a short name or dotted "module:ClassName" path to an adapter class.
Resolution order:
1. If *key* is in *registry*, use the mapped dotted path.
2. Otherwise, treat *key* itself as a dotted path (custom / third-party).
"""
dotted = registry.get(key, key)
if ":" not in dotted:
raise ValueError(
f"Unknown {label} adapter {key!r}. "
f"Built-in choices: {list(registry)}. "
f"Or supply a full 'module.path:ClassName' value in team.yaml."
)
module_path, class_name = dotted.rsplit(":", 1)
try:
module = importlib.import_module(module_path)
except ModuleNotFoundError as exc:
raise ImportError(
f"Cannot import {label} adapter module {module_path!r}: {exc}"
) from exc
try:
return getattr(module, class_name)
except AttributeError:
raise ImportError(
f"Module {module_path!r} has no class {class_name!r}"
)
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class EscalationError(RuntimeError):
"""Raised when a brief escalates past its retry budget with no recovery."""
# ---------------------------------------------------------------------------
# TeamRunner
# ---------------------------------------------------------------------------
class TeamRunner:
"""
Orchestrates a full T1→T5 agent pipeline run.
Usage::
runner = TeamRunner(config_path="config/team.yaml")
runner.run()
Dry-run mode logs all planned actions but skips LLM calls, VCS commits,
and notifications::
runner = TeamRunner(config_path="config/team.yaml", dry_run=True)
runner.run()
"""
def __init__(
self,
config_path: str = "config/team.yaml",
dry_run: bool = False,
) -> None:
"""
Load configuration and instantiate adapters.
Parameters
----------
config_path : Path to team.yaml.
dry_run : When True, skip LLM calls, VCS commits, and notifications.
All planned actions are logged at INFO level.
"""
self._dry_run = dry_run
self._config = self._load_yaml(config_path)
self._role_registry = self._load_yaml("config/role_registry.yaml")
self._escalation = EscalationHandler()
run_id = str(uuid.uuid4())
self._bb = Blackboard(run_id=run_id)
# Build adapters — VCS and notify are optional and swallow init errors.
adapter_cfg: dict = self._config.get("adapters", {})
runtime_cfg: dict = self._config.get("runtime", {})
self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic"))
self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment]
_VCS_ADAPTERS, adapter_cfg.get("vcs"), "VCS"
)
self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment]
_NOTIFY_ADAPTERS, adapter_cfg.get("notify"), "notify"
)
self._default_runtime: RuntimeAdapter = self._build_runtime(
runtime_cfg.get("default", "openclaw")
)
self._coding_runtime: RuntimeAdapter = self._build_runtime(
runtime_cfg.get("coding_agent", "claude_code")
)
logger.info(
"TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run
)
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
@staticmethod
def _load_yaml(path: str) -> dict:
with open(path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
@staticmethod
def _load_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as fh:
return fh.read()
def _build_llm(self, key: str) -> LLMAdapter:
cls = _load_adapter_class(key, _LLM_ADAPTERS, "LLM")
return cls(self._config)
def _build_optional(
self,
registry: dict[str, str],
key: Optional[str],
label: str,
) -> Optional[object]:
"""Build an optional adapter, returning None on any init error."""
if not key:
return None
try:
cls = _load_adapter_class(key, registry, label)
return cls(self._config)
except (ImportError, ValueError) as exc:
logger.warning("Unknown %s adapter %r — skipping. (%s)", label, key, exc)
return None
except Exception as exc:
logger.warning(
"%s adapter %r could not be initialised (%s) — skipping.",
label,
key,
exc,
)
return None
def _build_runtime(self, key: str) -> RuntimeAdapter:
cls = _load_adapter_class(key, _RUNTIME_ADAPTERS, "runtime")
return cls(self._config)
# ------------------------------------------------------------------
# Role registry
# ------------------------------------------------------------------
def _resolve_personality(self, tier: int, role: str) -> Optional[str]:
"""Return the path to the agent persona .md file, or None."""
tier_key = f"t{tier}"
tier_map: dict = self._role_registry.get(tier_key, {})
path = tier_map.get(role) or tier_map.get("default")
if path and os.path.isfile(path):
return path
return None
# ------------------------------------------------------------------
# Prompt helpers
# ------------------------------------------------------------------
def _load_tier_prompt(self, tier: int) -> str:
"""Load the system prompt for a tier from the prompts/ directory."""
path = _TIER_PROMPTS.get(tier, "")
if path and os.path.isfile(path):
return self._load_text(path)
logger.warning("Tier %d prompt not found at %r", tier, path)
return ""
def _load_personality(self, path: Optional[str]) -> str:
if path and os.path.isfile(path):
return self._load_text(path)
return ""
@staticmethod
def _extract_json(text: str) -> dict:
"""
Extract a JSON object from a potentially markdown-wrapped LLM response.
Strips leading/trailing markdown fences (```json ... ```) then parses.
Falls back to a regex scan for the first ``{...}`` block if plain
parsing fails.
"""
text = text.strip()
# Strip markdown fences.
if text.startswith("```"):
text = re.sub(r"^```[a-z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text.strip())
try:
return json.loads(text)
except json.JSONDecodeError:
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
raise ValueError(
"Could not parse JSON from LLM response.\n"
f"Response (first 500 chars): {text[:500]}"
)
# ------------------------------------------------------------------
# Brief dispatch
# ------------------------------------------------------------------
def _dispatch_brief(self, brief: TaskBrief) -> dict:
"""
Send a TaskBrief to the appropriate agent and return the raw result dict.
Routing
-------
preferred_runtime == "coding_agent" → coding runtime adapter
preferred_runtime == "standard" → LLM adapter directly
Blackboard events emitted: spawned → completed | failed.
"""
if self._dry_run:
logger.info(
"[DRY-RUN] dispatch tier=%d role=%s task=%.80s",
brief.tier,
brief.role,
brief.task,
)
return {"status": "done", "output": "{}", "artifacts": []}
self._bb.update_brief_status(brief.brief_id, "active")
self._bb.log_event(
"spawned",
brief_id=brief.brief_id,
detail={"tier": brief.tier, "role": brief.role},
)
try:
if brief.preferred_runtime == "coding_agent":
result = self._dispatch_via_runtime(brief)
else:
result = self._dispatch_via_llm(brief)
self._bb.update_brief_result(brief.brief_id, result)
self._bb.log_event(
"completed",
brief_id=brief.brief_id,
detail={"status": result.get("status")},
)
return result
except Exception as exc:
self._bb.update_brief_status(brief.brief_id, "failed")
self._bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": str(exc)},
)
raise
def _dispatch_via_llm(self, brief: TaskBrief) -> dict:
"""Call the LLM adapter with the tier system prompt + brief JSON."""
tier_prompt = self._load_tier_prompt(brief.tier)
personality = self._load_personality(brief.agent_personality)
system_prompt = "\n\n".join(filter(None, [tier_prompt, personality]))
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
user_message = json.dumps(brief.to_dict(), indent=2)
raw = self._llm.complete(
prompt=user_message,
capability=capability,
context={
"system_prompt": system_prompt,
},
)
return self._extract_json(raw)
def _dispatch_via_runtime(self, brief: TaskBrief) -> dict:
"""Spawn a coding agent via the runtime adapter and collect its result."""
task_str = json.dumps(brief.to_dict(), indent=2)
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
timeout_s: int = brief.context.get("timeout_s", 300)
agent_id = self._coding_runtime.spawn(
task=task_str,
capability=capability,
context=brief.context,
)
logger.info(
"Spawned coding agent %s for brief %s", agent_id, brief.brief_id
)
result = self._coding_runtime.get_result(agent_id, timeout_s=timeout_s)
# Attempt to parse JSON from the agent's text output.
if isinstance(result.get("output"), str) and result["output"].strip():
try:
parsed = self._extract_json(result["output"])
result.update(parsed)
except ValueError:
pass # Keep raw string output as-is.
return result
# ------------------------------------------------------------------
# Escalation loop
# ------------------------------------------------------------------
def _run_with_escalation(
self,
brief: TaskBrief,
workstream_id: Optional[str] = None,
) -> dict:
"""
Dispatch a brief and apply the escalation policy until done or exhausted.
On retry the amended brief is persisted to the Blackboard before
being re-submitted.
"""
while True:
result = self._dispatch_brief(brief)
decision = self._escalation.handle(brief, result)
if decision.action == "complete":
return result
if decision.action == "escalate":
self._bb.log_event(
"escalated",
brief_id=brief.brief_id,
detail={"reason": decision.reason},
)
raise EscalationError(
f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) "
f"escalated: {decision.reason}"
)
# "retry" or "salvage_and_retry"
self._bb.log_event(
"retried",
brief_id=brief.brief_id,
detail={"reason": decision.reason, "action": decision.action},
)
amended = decision.amended_brief
if amended is None:
raise EscalationError(
f"Escalation returned action={decision.action!r} "
"but no amended_brief was provided."
)
# Persist the new brief and loop.
self._bb.create_brief(amended, workstream_id=workstream_id)
brief = amended
# ------------------------------------------------------------------
# Tier output parsers
# ------------------------------------------------------------------
def _parse_t1_output(
self, result: dict, root_brief: TaskBrief
) -> list[TaskBrief]:
"""Build T2 TaskBriefs from T1 (Visionary) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
workstreams: list[dict] = result.get("workstreams", [])
# T1 sets the canonical goal_anchor; propagate it back to root.
goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor
root_brief.goal_anchor = goal_anchor
briefs: list[TaskBrief] = []
for ws in workstreams:
role = ws.get("role", "default")
brief = root_brief.make_child_brief(
tier=2,
role=role,
task=ws.get("task", ""),
workstream=ws.get("name", ""),
acceptance_criteria=ws.get("acceptance_criteria", []),
preferred_runtime="standard",
agent_personality=self._resolve_personality(2, role),
retry_budget=retry_bad,
)
briefs.append(brief)
return briefs
def _parse_t2_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T3 TaskBriefs from T2 (Architect) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
subtasks: list[dict] = result.get("subtasks", [])
arch_summary: str = result.get("architecture_summary", "")
briefs: list[TaskBrief] = []
for st in subtasks:
role = st.get("role", "default")
brief = parent.make_child_brief(
tier=3,
role=role,
task=st.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=st.get("acceptance_criteria", []),
preferred_runtime=st.get("preferred_runtime", "standard"),
agent_personality=self._resolve_personality(3, role),
retry_budget=retry_bad,
context={"architecture_summary": arch_summary},
)
briefs.append(brief)
return briefs
def _parse_t3_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T4 TaskBriefs from T3 (Squad Lead) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
tasks: list[dict] = result.get("tasks", [])
plan_summary: str = result.get("plan_summary", "")
briefs: list[TaskBrief] = []
for task in tasks:
role = task.get("role", "default")
pref_runtime = task.get("preferred_runtime", "standard")
brief = parent.make_child_brief(
tier=4,
role=role,
task=task.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=task.get("acceptance_criteria", []),
preferred_runtime=pref_runtime,
agent_personality=self._resolve_personality(4, role),
retry_budget=retry_bad,
context={
"plan_summary": plan_summary,
"depends_on": task.get("depends_on", []),
},
)
briefs.append(brief)
return briefs
# ------------------------------------------------------------------
# VCS helpers
# ------------------------------------------------------------------
def _commit_artifacts(
self,
artifacts: list[dict],
brief: TaskBrief,
) -> None:
"""Commit T4 *file* artifacts to the configured VCS adapter."""
if not self._vcs or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would commit %d artifact(s) for brief %s",
len(artifacts),
brief.brief_id,
)
return
file_map: dict[str, str] = {
a["path"]: a["content"]
for a in artifacts
if a.get("type") == "file"
and a.get("path")
and a.get("content") is not None
}
if not file_map:
return
branch: str = self._config.get("run", {}).get("base_branch", "main")
message = (
f"feat({brief.workstream}): artifacts from {brief.role} "
f"[brief {brief.brief_id[:8]}]"
)
try:
# GitHubAdapter.commit accepts dict[str, str] as files.
sha = self._vcs.commit(file_map, message) # type: ignore[call-arg]
logger.info(
"Committed %d artifact(s) → SHA %s", len(file_map), sha
)
except Exception as exc:
logger.warning("VCS commit failed: %s", exc)
# ------------------------------------------------------------------
# Notification
# ------------------------------------------------------------------
def _notify_run(self, outcome: str, goal: str, detail: dict) -> None:
if not self._notify or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal
)
return
level = "info" if outcome == "complete" else "error"
if outcome == "complete":
message = f"Pipeline complete: {goal[:80]}"
else:
message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}"
self._notify.send(
message,
context={
"level": level,
"run_id": self._bb.run_id,
"outcome": outcome,
**{k: str(v) for k, v in detail.items()},
},
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(self) -> None:
"""
Execute the full T1→T5 pipeline.
Steps
-----
1. Dispatch T1 Visionary to decompose the goal into workstreams.
2. For each workstream: T2 Architect → T3 Squad Lead →
T4 Implementer → T5 Verifier.
3. Commit passing T4 artifacts via VCS adapter (if configured).
4. Notify on completion or terminal failure via notify adapter.
"""
goal: str = self._config["run"]["goal"]
self._bb.create_run(goal=goal)
self._bb.update_run_status("active")
logger.info("Pipeline started — goal: %s", goal)
try:
self._orchestrate(goal)
self._bb.update_run_status("done")
summary = self._bb.get_run_summary()
logger.info("Pipeline complete. Summary: %s", summary)
self._notify_run("complete", goal, summary)
except Exception as exc:
self._bb.update_run_status("failed")
logger.error("Pipeline failed: %s", exc, exc_info=True)
self._notify_run("failed", goal, {"error": str(exc)})
raise
finally:
self._bb.close()
# ------------------------------------------------------------------
# Internal orchestration
# ------------------------------------------------------------------
def _orchestrate(self, goal: str) -> None:
"""Build the root T1 brief, dispatch it, and fan out per workstream."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
# ---- T1: Visionary ----
t1_brief = TaskBrief(
run_id=self._bb.run_id,
tier=1,
role="default",
goal_anchor=goal,
task=(
"You are the T1 Visionary. "
"Decompose the following goal into parallel workstreams "
f"for the engineering team: {goal}"
),
workstream="root",
retry_budget=retry_bad,
preferred_runtime="standard",
agent_personality=self._resolve_personality(1, "default"),
)
self._bb.create_brief(t1_brief)
t1_result = self._run_with_escalation(t1_brief)
t2_briefs = self._parse_t1_output(t1_result, t1_brief)
logger.info("T1 produced %d workstream(s)", len(t2_briefs))
# ---- T2..T5: per workstream ----
for t2_brief in t2_briefs:
ws_id = self._bb.create_workstream(
name=t2_brief.workstream, tier=2
)
self._bb.create_brief(t2_brief, workstream_id=ws_id)
self._bb.update_workstream_status(ws_id, "active")
try:
self._run_workstream(t2_brief, ws_id)
self._bb.update_workstream_status(ws_id, "done")
except EscalationError as exc:
self._bb.update_workstream_status(ws_id, "failed")
self._bb.log_event(
"failed",
detail={"error": str(exc), "workstream": t2_brief.workstream},
)
logger.error(
"Workstream %r failed: %s", t2_brief.workstream, exc
)
def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None:
"""Drive T2 → T3 → T4 → T5 for a single workstream."""
# T2: Architect
t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id)
t3_briefs = self._parse_t2_output(t2_result, t2_brief)
logger.info(
"T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs)
)
for t3_brief in t3_briefs:
self._bb.create_brief(t3_brief, workstream_id=ws_id)
try:
# T3: Squad Lead
t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id)
t4_briefs = self._parse_t3_output(t3_result, t3_brief)
logger.info(
"T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs)
)
for t4_brief in t4_briefs:
self._bb.create_brief(t4_brief, workstream_id=ws_id)
try:
# T4: Implementer
t4_result = self._run_with_escalation(
t4_brief, workstream_id=ws_id
)
artifacts: list[dict] = t4_result.get("artifacts", [])
# T5: Verifier
t5_brief = t4_brief.make_child_brief(
tier=5,
role="code",
task=(
"Verify the following T4 implementation artifacts "
"against all acceptance criteria. "
f"T4 output: {json.dumps(t4_result)[:2000]}"
),
workstream=t4_brief.workstream,
acceptance_criteria=t4_brief.acceptance_criteria,
preferred_runtime="standard",
agent_personality=self._resolve_personality(5, "code"),
retry_budget=self._config.get(
"retry_defaults", {}
).get("bad_output", 3),
context={"t4_result": t4_result},
)
self._bb.create_brief(t5_brief, workstream_id=ws_id)
t5_result = self._run_with_escalation(
t5_brief, workstream_id=ws_id
)
# Commit on verified pass.
if t5_result.get("status") in ("passed", "done"):
self._commit_artifacts(artifacts, t4_brief)
except EscalationError as exc:
logger.error(
"T4/T5 escalation in %s: %s", t4_brief.role, exc
)
except EscalationError as exc:
logger.error("T3 escalation in %s: %s", t3_brief.role, exc)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def _configure_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the-agency T1→T5 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config",
default="config/team.yaml",
help="Path to team.yaml configuration file.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help=(
"Log all planned actions without executing LLM calls, "
"VCS commits, or notifications."
),
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging.",
)
args = parser.parse_args()
_configure_logging(args.verbose)
runner = TeamRunner(config_path=args.config, dry_run=args.dry_run)
runner.run()