feat: implement TeamRunner orchestration loop

Core orchestration engine:
- T1→T5 tier execution loop with dynamic task decomposition
- Config-driven adapter registry (team.yaml adapter_registry section)
- Config-driven runtime registry (top-level runtime section in team.yaml)
- Dynamic adapter loading via importlib (no hardcoded imports)
- Capability-based LLM dispatch (reasoning-heavy / capable / fast-cheap per tier)
- T4 defaults to coding_agent runtime (Claude Code)
- Blackboard integration for inter-tier state sharing
- Escalation handling with retry + tier promotion
- Dry-run mode for testing without LLM/VCS calls
- agent_personality injection from role_registry → system prompts
This commit is contained in:
2026-03-16 11:38:33 -04:00
parent 72bd744664
commit ec1f3db4bf
2 changed files with 821 additions and 71 deletions

View File

@@ -7,10 +7,21 @@ adapters:
llm: anthropic llm: anthropic
vcs: github vcs: github
notify: openclaw notify: openclaw
runtime: openclaw
adapter_registry:
llm:
anthropic: "adapters.llm.anthropic:AnthropicAdapter"
vcs:
github: "adapters.vcs.github:GitHubAdapter"
notify:
openclaw: "adapters.notify.openclaw:OpenClawNotifyAdapter"
runtime:
openclaw: "adapters.runtime.openclaw:OpenClawRuntimeAdapter"
claude_code: "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter"
models: models:
provider: anthropic default_max_tokens: 4096
default_temperature: 0
capability_map: capability_map:
reasoning-heavy: reasoning-heavy:
anthropic: claude-opus-4-6 anthropic: claude-opus-4-6

View File

@@ -1,99 +1,838 @@
""" """
core/team_runner.py core/team_runner.py
Top-level orchestration entry point — Phase 2 stub. Top-level orchestration entry point for the-agency pipeline.
The TeamRunner is responsible for: The TeamRunner loads team.yaml, builds the adapter registry, and drives the
1. Loading config/team.yaml and config/role_registry.yaml. full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling.
2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime).
3. Creating a Blackboard for the run.
4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary.
5. Recursively spawning T2→T5 briefs based on tier outputs.
6. Using EscalationHandler to manage retries, salvage, and escalation.
7. Writing final run status and summary to the Blackboard.
TODO (Phase 2): Runtime adapters are config-driven: every string-valued key in the top-level
- Load and validate team.yaml configuration. ``runtime:`` section of team.yaml is instantiated as a RuntimeAdapter and
- Build adapter registry (map adapter keys → concrete adapter classes). stored in ``self._runtimes[name]``. Non-string values (e.g. ``native_teams:
- Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5. false``) are silently skipped. Dispatch routing uses
- Parse tier JSON outputs into child TaskBrief objects via make_child_brief(). ``brief.preferred_runtime`` to look up the right adapter at call time.
- Integrate EscalationHandler into the dispatch loop.
- Support --dry-run flag (log actions without executing). CLI usage::
- Emit blackboard events at each stage (spawned, completed, failed, etc.).
- Expose a CLI entry point (argparse or click). python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose]
""" """
from __future__ import annotations from __future__ import annotations
# TODO (Phase 2): Uncomment and implement imports as adapters are built. import argparse
# import argparse import json
# import yaml import logging
# from core.task_brief import TaskBrief import os
# from core.blackboard import Blackboard import re
# from core.escalation import EscalationHandler import uuid
# from adapters.llm.anthropic import AnthropicAdapter from typing import Optional
# from adapters.vcs.github import GitHubAdapter
# from adapters.notify.openclaw import OpenClawNotifyAdapter
# from adapters.runtime.openclaw import OpenClawRuntimeAdapter
# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter
import yaml
from core.blackboard import Blackboard
from core.escalation import EscalationHandler
from core.task_brief import TaskBrief
import importlib
from adapters.base.llm import LLMAdapter
from adapters.base.notify import NotifyAdapter
from adapters.base.runtime import RuntimeAdapter
from adapters.base.vcs import VCSAdapter
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Maps tier number → prompt file path (relative to project root).
_TIER_PROMPTS: dict[int, str] = {
1: "prompts/t1_visionary.md",
2: "prompts/t2_architect.md",
3: "prompts/t3_squad_lead.md",
4: "prompts/t4_implementer.md",
5: "prompts/t5_verifier.md",
}
# Maps tier number → LLM capability hint.
_TIER_CAPABILITIES: dict[int, str] = {
1: "reasoning-heavy",
2: "reasoning-heavy",
3: "capable",
4: "fast-cheap",
5: "fast-cheap",
}
# ---------------------------------------------------------------------------
# Adapter registries
#
# Values are "module.path:ClassName" strings resolved lazily via importlib.
# To add a new adapter, append an entry here — no changes to TeamRunner needed.
# team.yaml may also supply a full "module.path:ClassName" value directly,
# enabling third-party adapters without touching this file.
# ---------------------------------------------------------------------------
# Adapter registries are loaded from team.yaml at runtime (adapter_registry section).
# Fallback built-ins are used only if team.yaml doesn't define adapter_registry.
_BUILTIN_ADAPTER_REGISTRY: dict[str, dict[str, str]] = {
"llm": {"anthropic": "adapters.llm.anthropic:AnthropicAdapter"},
"vcs": {"github": "adapters.vcs.github:GitHubAdapter"},
"notify": {"openclaw": "adapters.notify.openclaw:OpenClawNotifyAdapter"},
"runtime": {
"openclaw": "adapters.runtime.openclaw:OpenClawRuntimeAdapter",
"claude_code": "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter",
},
}
def _load_adapter_class(key: str, registry: dict[str, str], label: str) -> type:
"""
Resolve a short name or dotted "module:ClassName" path to an adapter class.
Resolution order:
1. If *key* is in *registry*, use the mapped dotted path.
2. Otherwise, treat *key* itself as a dotted path (custom / third-party).
"""
dotted = registry.get(key, key)
if ":" not in dotted:
raise ValueError(
f"Unknown {label} adapter {key!r}. "
f"Built-in choices: {list(registry)}. "
f"Or supply a full 'module.path:ClassName' value in team.yaml."
)
module_path, class_name = dotted.rsplit(":", 1)
try:
module = importlib.import_module(module_path)
except ModuleNotFoundError as exc:
raise ImportError(
f"Cannot import {label} adapter module {module_path!r}: {exc}"
) from exc
try:
return getattr(module, class_name)
except AttributeError:
raise ImportError(
f"Module {module_path!r} has no class {class_name!r}"
)
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class EscalationError(RuntimeError):
"""Raised when a brief escalates past its retry budget with no recovery."""
# ---------------------------------------------------------------------------
# TeamRunner
# ---------------------------------------------------------------------------
class TeamRunner: class TeamRunner:
""" """
Orchestrates a full T1→T5 agent pipeline run. Orchestrates a full T1→T5 agent pipeline run.
Usage (Phase 2):: Usage::
runner = TeamRunner(config_path="config/team.yaml") runner = TeamRunner(config_path="config/team.yaml")
runner.run() runner.run()
Dry-run mode logs all planned actions but skips LLM calls, VCS commits,
and notifications::
runner = TeamRunner(config_path="config/team.yaml", dry_run=True)
runner.run()
""" """
def __init__(self, config_path: str = "config/team.yaml") -> None: def __init__(
# TODO (Phase 2): Load YAML config. self,
# Instantiate adapters based on config.adapters keys. config_path: str = "config/team.yaml",
# Create a Blackboard for this run. dry_run: bool = False,
raise NotImplementedError("TeamRunner.__init__ is not yet implemented.") ) -> None:
"""
Load configuration and instantiate adapters.
Parameters
----------
config_path : Path to team.yaml.
dry_run : When True, skip LLM calls, VCS commits, and notifications.
All planned actions are logged at INFO level.
Runtime adapters are built from the top-level ``runtime:`` section of
team.yaml. Each string-valued key becomes an entry in
``self._runtimes``; non-string values (e.g. ``native_teams: false``)
are ignored. Adding a new runtime type requires only a new key in
team.yaml — no changes to TeamRunner are needed.
"""
self._dry_run = dry_run
self._config = self._load_yaml(config_path)
self._role_registry = self._load_yaml("config/role_registry.yaml")
# Merge config-defined adapter_registry over built-in fallbacks.
self._adapter_registry: dict[str, dict[str, str]] = {
k: {**v} for k, v in _BUILTIN_ADAPTER_REGISTRY.items()
}
for kind, entries in self._config.get("adapter_registry", {}).items():
self._adapter_registry.setdefault(kind, {}).update(entries)
self._escalation = EscalationHandler()
run_id = str(uuid.uuid4())
self._bb = Blackboard(run_id=run_id)
# Build adapters — VCS and notify are optional and swallow init errors.
adapter_cfg: dict = self._config.get("adapters", {})
runtime_cfg: dict = self._config.get("runtime", {})
if dry_run:
# In dry-run mode the LLM adapter is never actually called, so we
# tolerate missing dependencies (e.g. 'anthropic' SDK not installed).
try:
self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic"))
except (ImportError, ValueError) as exc:
logger.warning(
"LLM adapter unavailable in dry-run mode (%s) — continuing.", exc
)
self._llm = None # type: ignore[assignment]
else:
self._llm = self._build_llm(adapter_cfg.get("llm", "anthropic"))
self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment]
self._adapter_registry.get("vcs", {}), adapter_cfg.get("vcs"), "VCS"
)
self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment]
self._adapter_registry.get("notify", {}), adapter_cfg.get("notify"), "notify"
)
# Runtime adapters are fully config-driven — one entry per string-valued
# key in the top-level ``runtime:`` section of team.yaml.
self._runtimes: dict[str, RuntimeAdapter] = self._build_runtimes(runtime_cfg)
logger.info(
"TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run
)
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
@staticmethod
def _load_yaml(path: str) -> dict:
with open(path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
@staticmethod
def _load_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as fh:
return fh.read()
def _build_llm(self, key: str) -> LLMAdapter:
cls = _load_adapter_class(key, self._adapter_registry.get("llm", {}), "LLM")
return cls(self._config)
def _build_optional(
self,
registry: dict[str, str],
key: Optional[str],
label: str,
) -> Optional[object]:
"""Build an optional adapter, returning None on any init error."""
if not key:
return None
try:
cls = _load_adapter_class(key, registry, label)
return cls(self._config)
except (ImportError, ValueError) as exc:
logger.warning("Unknown %s adapter %r — skipping. (%s)", label, key, exc)
return None
except Exception as exc:
logger.warning(
"%s adapter %r could not be initialised (%s) — skipping.",
label,
key,
exc,
)
return None
def _build_runtime(self, key: str) -> RuntimeAdapter:
cls = _load_adapter_class(key, self._adapter_registry.get("runtime", {}), "runtime")
return cls(self._config)
def _build_runtimes(self, runtime_cfg: dict) -> dict[str, RuntimeAdapter]:
"""
Build a name → RuntimeAdapter mapping from the ``runtime:`` config block.
Every key whose value is a string is treated as a runtime adapter name
and instantiated via ``_build_runtime``. Non-string values (e.g.
``native_teams: false``) are skipped so that boolean/numeric control
flags can coexist in the same config section.
"""
runtimes: dict[str, RuntimeAdapter] = {}
for name, value in runtime_cfg.items():
if not isinstance(value, str):
continue
runtimes[name] = self._build_runtime(value)
return runtimes
# ------------------------------------------------------------------
# Role registry
# ------------------------------------------------------------------
def _resolve_personality(self, tier: int, role: str) -> Optional[str]:
"""Return the path to the agent persona .md file, or None."""
tier_key = f"t{tier}"
tier_map: dict = self._role_registry.get(tier_key, {})
path = tier_map.get(role) or tier_map.get("default")
if path and os.path.isfile(path):
return path
return None
# ------------------------------------------------------------------
# Prompt helpers
# ------------------------------------------------------------------
def _load_tier_prompt(self, tier: int) -> str:
"""Load the system prompt for a tier from the prompts/ directory."""
path = _TIER_PROMPTS.get(tier, "")
if path and os.path.isfile(path):
return self._load_text(path)
logger.warning("Tier %d prompt not found at %r", tier, path)
return ""
def _load_personality(self, path: Optional[str]) -> str:
if path and os.path.isfile(path):
return self._load_text(path)
return ""
@staticmethod
def _extract_json(text: str) -> dict:
"""
Extract a JSON object from a potentially markdown-wrapped LLM response.
Strips leading/trailing markdown fences (```json ... ```) then parses.
Falls back to a regex scan for the first ``{...}`` block if plain
parsing fails.
"""
text = text.strip()
# Strip markdown fences.
if text.startswith("```"):
text = re.sub(r"^```[a-z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text.strip())
try:
return json.loads(text)
except json.JSONDecodeError:
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
raise ValueError(
"Could not parse JSON from LLM response.\n"
f"Response (first 500 chars): {text[:500]}"
)
# ------------------------------------------------------------------
# Brief dispatch
# ------------------------------------------------------------------
def _dispatch_brief(self, brief: TaskBrief) -> dict:
"""
Send a TaskBrief to the appropriate agent and return the raw result dict.
Routing
-------
preferred_runtime == "standard" or empty/None → LLM adapter directly
Otherwise → look up self._runtimes[preferred_runtime]; falls back to
self._runtimes["default"] and then to LLM if no runtime is found.
Blackboard events emitted: spawned → completed | failed.
"""
if self._dry_run:
logger.info(
"[DRY-RUN] dispatch tier=%d role=%s task=%.80s",
brief.tier,
brief.role,
brief.task,
)
return {"status": "done", "output": "{}", "artifacts": []}
self._bb.update_brief_status(brief.brief_id, "active")
self._bb.log_event(
"spawned",
brief_id=brief.brief_id,
detail={"tier": brief.tier, "role": brief.role},
)
try:
pref = brief.preferred_runtime
if not pref or pref == "standard":
result = self._dispatch_via_llm(brief)
else:
runtime = self._runtimes.get(pref) or self._runtimes.get("default")
if runtime is None:
logger.warning(
"No runtime adapter found for %r (and no 'default') — "
"falling back to LLM for brief %s",
pref,
brief.brief_id,
)
result = self._dispatch_via_llm(brief)
else:
result = self._dispatch_via_runtime(brief, runtime)
self._bb.update_brief_result(brief.brief_id, result)
self._bb.log_event(
"completed",
brief_id=brief.brief_id,
detail={"status": result.get("status")},
)
return result
except Exception as exc:
self._bb.update_brief_status(brief.brief_id, "failed")
self._bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": str(exc)},
)
raise
def _dispatch_via_llm(self, brief: TaskBrief) -> dict:
"""Call the LLM adapter with the tier system prompt + brief JSON."""
tier_prompt = self._load_tier_prompt(brief.tier)
personality = self._load_personality(brief.agent_personality)
system_prompt = "\n\n".join(filter(None, [tier_prompt, personality]))
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
user_message = json.dumps(brief.to_dict(), indent=2)
raw = self._llm.complete(
prompt=user_message,
capability=capability,
context={
"system_prompt": system_prompt,
},
)
return self._extract_json(raw)
def _dispatch_via_runtime(self, brief: TaskBrief, runtime: RuntimeAdapter) -> dict:
"""Spawn an agent via *runtime* and collect its result."""
task_str = json.dumps(brief.to_dict(), indent=2)
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
timeout_s: int = brief.context.get("timeout_s", 300)
agent_id = runtime.spawn(
task=task_str,
capability=capability,
context=brief.context,
)
logger.info(
"Spawned runtime agent %s for brief %s", agent_id, brief.brief_id
)
result = runtime.get_result(agent_id, timeout_s=timeout_s)
# Attempt to parse JSON from the agent's text output.
if isinstance(result.get("output"), str) and result["output"].strip():
try:
parsed = self._extract_json(result["output"])
result.update(parsed)
except ValueError:
pass # Keep raw string output as-is.
return result
# ------------------------------------------------------------------
# Escalation loop
# ------------------------------------------------------------------
def _run_with_escalation(
self,
brief: TaskBrief,
workstream_id: Optional[str] = None,
) -> dict:
"""
Dispatch a brief and apply the escalation policy until done or exhausted.
On retry the amended brief is persisted to the Blackboard before
being re-submitted.
"""
while True:
result = self._dispatch_brief(brief)
decision = self._escalation.handle(brief, result)
if decision.action == "complete":
return result
if decision.action == "escalate":
self._bb.log_event(
"escalated",
brief_id=brief.brief_id,
detail={"reason": decision.reason},
)
raise EscalationError(
f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) "
f"escalated: {decision.reason}"
)
# "retry" or "salvage_and_retry"
self._bb.log_event(
"retried",
brief_id=brief.brief_id,
detail={"reason": decision.reason, "action": decision.action},
)
amended = decision.amended_brief
if amended is None:
raise EscalationError(
f"Escalation returned action={decision.action!r} "
"but no amended_brief was provided."
)
# Persist the new brief and loop.
self._bb.create_brief(amended, workstream_id=workstream_id)
brief = amended
# ------------------------------------------------------------------
# Tier output parsers
# ------------------------------------------------------------------
def _parse_t1_output(
self, result: dict, root_brief: TaskBrief
) -> list[TaskBrief]:
"""Build T2 TaskBriefs from T1 (Visionary) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
workstreams: list[dict] = result.get("workstreams", [])
# T1 sets the canonical goal_anchor; propagate it back to root.
goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor
root_brief.goal_anchor = goal_anchor
briefs: list[TaskBrief] = []
for ws in workstreams:
role = ws.get("role", "default")
brief = root_brief.make_child_brief(
tier=2,
role=role,
task=ws.get("task", ""),
workstream=ws.get("name", ""),
acceptance_criteria=ws.get("acceptance_criteria", []),
preferred_runtime="standard",
agent_personality=self._resolve_personality(2, role),
retry_budget=retry_bad,
)
briefs.append(brief)
return briefs
def _parse_t2_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T3 TaskBriefs from T2 (Architect) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
subtasks: list[dict] = result.get("subtasks", [])
arch_summary: str = result.get("architecture_summary", "")
briefs: list[TaskBrief] = []
for st in subtasks:
role = st.get("role", "default")
brief = parent.make_child_brief(
tier=3,
role=role,
task=st.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=st.get("acceptance_criteria", []),
preferred_runtime=st.get("preferred_runtime", "standard"),
agent_personality=self._resolve_personality(3, role),
retry_budget=retry_bad,
context={"architecture_summary": arch_summary},
)
briefs.append(brief)
return briefs
def _parse_t3_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T4 TaskBriefs from T3 (Squad Lead) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
tasks: list[dict] = result.get("tasks", [])
plan_summary: str = result.get("plan_summary", "")
briefs: list[TaskBrief] = []
for task in tasks:
role = task.get("role", "default")
# T4 is the coding/implementation tier; default to coding_agent
# so implementers use Claude Code unless T3 explicitly overrides.
pref_runtime = task.get("preferred_runtime", "coding_agent")
brief = parent.make_child_brief(
tier=4,
role=role,
task=task.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=task.get("acceptance_criteria", []),
preferred_runtime=pref_runtime,
agent_personality=self._resolve_personality(4, role),
retry_budget=retry_bad,
context={
"plan_summary": plan_summary,
"depends_on": task.get("depends_on", []),
},
)
briefs.append(brief)
return briefs
# ------------------------------------------------------------------
# VCS helpers
# ------------------------------------------------------------------
def _commit_artifacts(
self,
artifacts: list[dict],
brief: TaskBrief,
) -> None:
"""Commit T4 *file* artifacts to the configured VCS adapter."""
if not self._vcs or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would commit %d artifact(s) for brief %s",
len(artifacts),
brief.brief_id,
)
return
file_map: dict[str, str] = {
a["path"]: a["content"]
for a in artifacts
if a.get("type") == "file"
and a.get("path")
and a.get("content") is not None
}
if not file_map:
return
branch: str = self._config.get("run", {}).get("base_branch", "main")
message = (
f"feat({brief.workstream}): artifacts from {brief.role} "
f"[brief {brief.brief_id[:8]}]"
)
try:
# GitHubAdapter.commit accepts dict[str, str] as files.
sha = self._vcs.commit(file_map, message) # type: ignore[call-arg]
logger.info(
"Committed %d artifact(s) → SHA %s", len(file_map), sha
)
except Exception as exc:
logger.warning("VCS commit failed: %s", exc)
# ------------------------------------------------------------------
# Notification
# ------------------------------------------------------------------
def _notify_run(self, outcome: str, goal: str, detail: dict) -> None:
if not self._notify or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal
)
return
level = "info" if outcome == "complete" else "error"
if outcome == "complete":
message = f"Pipeline complete: {goal[:80]}"
else:
message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}"
self._notify.send(
message,
context={
"level": level,
"run_id": self._bb.run_id,
"outcome": outcome,
**{k: str(v) for k, v in detail.items()},
},
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(self) -> None: def run(self) -> None:
""" """
Execute the full pipeline from T1 decomposition through T5 verification. Execute the full T1→T5 pipeline.
TODO (Phase 2): Steps
- Build root T1 brief from config.run.goal. -----
- Dispatch to T1 Visionary via LLM adapter. 1. Dispatch T1 Visionary to decompose the goal into workstreams.
- Parse workstreams from T1 output. 2. For each workstream: T2 Architect → T3 Squad Lead →
- For each workstream: dispatch T2 Architect. T4 Implementer → T5 Verifier.
- For each T2 subtask: dispatch T3 Squad Lead. 3. Commit passing T4 artifacts via VCS adapter (if configured).
- For each T3 task: dispatch T4 Implementer. 4. Notify on completion or terminal failure via notify adapter.
- For each T4 artifact set: dispatch T5 Verifier.
- Run escalation handler at each tier on failure.
- Commit passing artifacts via VCS adapter.
- Notify on completion or terminal failure via notify adapter.
""" """
raise NotImplementedError("TeamRunner.run is not yet implemented.") goal: str = self._config["run"]["goal"]
self._bb.create_run(goal=goal)
self._bb.update_run_status("active")
logger.info("Pipeline started — goal: %s", goal)
def _dispatch_brief(self, brief) -> dict: try:
""" self._orchestrate(goal)
Send a single TaskBrief to the appropriate agent and return the result. self._bb.update_run_status("done")
summary = self._bb.get_run_summary()
logger.info("Pipeline complete. Summary: %s", summary)
self._notify_run("complete", goal, summary)
except Exception as exc:
self._bb.update_run_status("failed")
logger.error("Pipeline failed: %s", exc, exc_info=True)
self._notify_run("failed", goal, {"error": str(exc)})
raise
finally:
self._bb.close()
TODO (Phase 2): # ------------------------------------------------------------------
- Select runtime based on brief.preferred_runtime. # Internal orchestration
- Load agent personality from brief.agent_personality (if set). # ------------------------------------------------------------------
- Compose prompt from tier system prompt + brief payload.
- Spawn agent via runtime adapter. def _orchestrate(self, goal: str) -> None:
- Await result via runtime.get_result(). """Build the root T1 brief, dispatch it, and fan out per workstream."""
- Log spawned/completed/failed events to Blackboard. retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
"""
raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.") # ---- T1: Visionary ----
t1_brief = TaskBrief(
run_id=self._bb.run_id,
tier=1,
role="default",
goal_anchor=goal,
task=(
"You are the T1 Visionary. "
"Decompose the following goal into parallel workstreams "
f"for the engineering team: {goal}"
),
workstream="root",
retry_budget=retry_bad,
preferred_runtime="standard",
agent_personality=self._resolve_personality(1, "default"),
)
self._bb.create_brief(t1_brief)
t1_result = self._run_with_escalation(t1_brief)
t2_briefs = self._parse_t1_output(t1_result, t1_brief)
logger.info("T1 produced %d workstream(s)", len(t2_briefs))
# ---- T2..T5: per workstream ----
for t2_brief in t2_briefs:
ws_id = self._bb.create_workstream(
name=t2_brief.workstream, tier=2
)
self._bb.create_brief(t2_brief, workstream_id=ws_id)
self._bb.update_workstream_status(ws_id, "active")
try:
self._run_workstream(t2_brief, ws_id)
self._bb.update_workstream_status(ws_id, "done")
except EscalationError as exc:
self._bb.update_workstream_status(ws_id, "failed")
self._bb.log_event(
"failed",
detail={"error": str(exc), "workstream": t2_brief.workstream},
)
logger.error(
"Workstream %r failed: %s", t2_brief.workstream, exc
)
def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None:
"""Drive T2 → T3 → T4 → T5 for a single workstream."""
# T2: Architect
t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id)
t3_briefs = self._parse_t2_output(t2_result, t2_brief)
logger.info(
"T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs)
)
for t3_brief in t3_briefs:
self._bb.create_brief(t3_brief, workstream_id=ws_id)
try:
# T3: Squad Lead
t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id)
t4_briefs = self._parse_t3_output(t3_result, t3_brief)
logger.info(
"T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs)
)
for t4_brief in t4_briefs:
self._bb.create_brief(t4_brief, workstream_id=ws_id)
try:
# T4: Implementer
t4_result = self._run_with_escalation(
t4_brief, workstream_id=ws_id
)
artifacts: list[dict] = t4_result.get("artifacts", [])
# T5: Verifier
t5_brief = t4_brief.make_child_brief(
tier=5,
role="code",
task=(
"Verify the following T4 implementation artifacts "
"against all acceptance criteria. "
f"T4 output: {json.dumps(t4_result)[:2000]}"
),
workstream=t4_brief.workstream,
acceptance_criteria=t4_brief.acceptance_criteria,
preferred_runtime="standard",
agent_personality=self._resolve_personality(5, "code"),
retry_budget=self._config.get(
"retry_defaults", {}
).get("bad_output", 3),
context={"t4_result": t4_result},
)
self._bb.create_brief(t5_brief, workstream_id=ws_id)
t5_result = self._run_with_escalation(
t5_brief, workstream_id=ws_id
)
# Commit on verified pass.
if t5_result.get("status") in ("passed", "done"):
self._commit_artifacts(artifacts, t4_brief)
except EscalationError as exc:
logger.error(
"T4/T5 escalation in %s: %s", t4_brief.role, exc
)
except EscalationError as exc:
logger.error("T3 escalation in %s: %s", t3_brief.role, exc)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# CLI entry point (Phase 2) # CLI entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# TODO (Phase 2): Implement argparse CLI. def _configure_logging(verbose: bool = False) -> None:
# if __name__ == "__main__": level = logging.DEBUG if verbose else logging.INFO
# parser = argparse.ArgumentParser(description="Run the-agency pipeline.") logging.basicConfig(
# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml") level=level,
# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing") format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
# args = parser.parse_args() datefmt="%Y-%m-%dT%H:%M:%S",
# runner = TeamRunner(config_path=args.config) )
# runner.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the-agency T1→T5 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config",
default="config/team.yaml",
help="Path to team.yaml configuration file.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help=(
"Log all planned actions without executing LLM calls, "
"VCS commits, or notifications."
),
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging.",
)
args = parser.parse_args()
_configure_logging(args.verbose)
runner = TeamRunner(config_path=args.config, dry_run=args.dry_run)
runner.run()