feat(core): implement TeamRunner orchestration loop
Full T1→T5 pipeline orchestration with adapter registry, escalation, and blackboard event emission. Key design decisions: - Adapter registry maps config keys to concrete classes; VCS and notify are optional (swallow init errors and degrade gracefully) - _dispatch_brief() routes to LLM adapter (standard) or coding runtime (coding_agent) based on brief.preferred_runtime - _run_with_escalation() drives the retry/salvage loop: persists amended briefs to the Blackboard before each re-submission - Tier parsers (_parse_t1/t2/t3_output) build child TaskBriefs, preserving the goal_anchor invariant and resolving agent personalities from the registry - T5 Verifier is always spawned after T4; VCS commit only happens on verified pass (status "passed" or "done") - --dry-run flag: logs all actions, skips LLM, VCS, and notify calls - Exposes CLI via `python -m core.team_runner` with --config, --dry-run, --verbose flags Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,99 +1,759 @@
|
||||
"""
|
||||
core/team_runner.py
|
||||
Top-level orchestration entry point — Phase 2 stub.
|
||||
Top-level orchestration entry point for the-agency pipeline.
|
||||
|
||||
The TeamRunner is responsible for:
|
||||
1. Loading config/team.yaml and config/role_registry.yaml.
|
||||
2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime).
|
||||
3. Creating a Blackboard for the run.
|
||||
4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary.
|
||||
5. Recursively spawning T2→T5 briefs based on tier outputs.
|
||||
6. Using EscalationHandler to manage retries, salvage, and escalation.
|
||||
7. Writing final run status and summary to the Blackboard.
|
||||
The TeamRunner loads team.yaml, builds the adapter registry, and drives the
|
||||
full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling.
|
||||
|
||||
TODO (Phase 2):
|
||||
- Load and validate team.yaml configuration.
|
||||
- Build adapter registry (map adapter keys → concrete adapter classes).
|
||||
- Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5.
|
||||
- Parse tier JSON outputs into child TaskBrief objects via make_child_brief().
|
||||
- Integrate EscalationHandler into the dispatch loop.
|
||||
- Support --dry-run flag (log actions without executing).
|
||||
- Emit blackboard events at each stage (spawned, completed, failed, etc.).
|
||||
- Expose a CLI entry point (argparse or click).
|
||||
CLI usage::
|
||||
|
||||
python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
# TODO (Phase 2): Uncomment and implement imports as adapters are built.
|
||||
# import argparse
|
||||
# import yaml
|
||||
# from core.task_brief import TaskBrief
|
||||
# from core.blackboard import Blackboard
|
||||
# from core.escalation import EscalationHandler
|
||||
# from adapters.llm.anthropic import AnthropicAdapter
|
||||
# from adapters.vcs.github import GitHubAdapter
|
||||
# from adapters.notify.openclaw import OpenClawNotifyAdapter
|
||||
# from adapters.runtime.openclaw import OpenClawRuntimeAdapter
|
||||
# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
import yaml
|
||||
|
||||
from core.blackboard import Blackboard
|
||||
from core.escalation import EscalationHandler
|
||||
from core.task_brief import TaskBrief
|
||||
|
||||
from adapters.base.llm import LLMAdapter
|
||||
from adapters.base.notify import NotifyAdapter
|
||||
from adapters.base.runtime import RuntimeAdapter
|
||||
from adapters.base.vcs import VCSAdapter
|
||||
from adapters.llm.anthropic import AnthropicAdapter
|
||||
from adapters.notify.openclaw import OpenClawNotifyAdapter
|
||||
from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter
|
||||
from adapters.runtime.openclaw import OpenClawRuntimeAdapter
|
||||
from adapters.vcs.github import GitHubAdapter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Maps tier number → prompt file path (relative to project root).
|
||||
_TIER_PROMPTS: dict[int, str] = {
|
||||
1: "prompts/t1_visionary.md",
|
||||
2: "prompts/t2_architect.md",
|
||||
3: "prompts/t3_squad_lead.md",
|
||||
4: "prompts/t4_implementer.md",
|
||||
5: "prompts/t5_verifier.md",
|
||||
}
|
||||
|
||||
# Maps tier number → LLM capability hint.
|
||||
_TIER_CAPABILITIES: dict[int, str] = {
|
||||
1: "reasoning-heavy",
|
||||
2: "reasoning-heavy",
|
||||
3: "capable",
|
||||
4: "capable",
|
||||
5: "fast-cheap",
|
||||
}
|
||||
|
||||
# Adapter registry: config key → concrete class.
|
||||
_LLM_ADAPTERS: dict[str, type[LLMAdapter]] = {
|
||||
"anthropic": AnthropicAdapter,
|
||||
}
|
||||
_VCS_ADAPTERS: dict[str, type[VCSAdapter]] = {
|
||||
"github": GitHubAdapter,
|
||||
}
|
||||
_NOTIFY_ADAPTERS: dict[str, type[NotifyAdapter]] = {
|
||||
"openclaw": OpenClawNotifyAdapter,
|
||||
}
|
||||
_RUNTIME_ADAPTERS: dict[str, type[RuntimeAdapter]] = {
|
||||
"openclaw": OpenClawRuntimeAdapter,
|
||||
"claude_code": ClaudeCodeRuntimeAdapter,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Exceptions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class EscalationError(RuntimeError):
|
||||
"""Raised when a brief escalates past its retry budget with no recovery."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TeamRunner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TeamRunner:
|
||||
"""
|
||||
Orchestrates a full T1→T5 agent pipeline run.
|
||||
|
||||
Usage (Phase 2)::
|
||||
Usage::
|
||||
|
||||
runner = TeamRunner(config_path="config/team.yaml")
|
||||
runner.run()
|
||||
|
||||
Dry-run mode logs all planned actions but skips LLM calls, VCS commits,
|
||||
and notifications::
|
||||
|
||||
runner = TeamRunner(config_path="config/team.yaml", dry_run=True)
|
||||
runner.run()
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: str = "config/team.yaml") -> None:
|
||||
# TODO (Phase 2): Load YAML config.
|
||||
# Instantiate adapters based on config.adapters keys.
|
||||
# Create a Blackboard for this run.
|
||||
raise NotImplementedError("TeamRunner.__init__ is not yet implemented.")
|
||||
def __init__(
|
||||
self,
|
||||
config_path: str = "config/team.yaml",
|
||||
dry_run: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Load configuration and instantiate adapters.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config_path : Path to team.yaml.
|
||||
dry_run : When True, skip LLM calls, VCS commits, and notifications.
|
||||
All planned actions are logged at INFO level.
|
||||
"""
|
||||
self._dry_run = dry_run
|
||||
|
||||
self._config = self._load_yaml(config_path)
|
||||
self._role_registry = self._load_yaml("config/role_registry.yaml")
|
||||
self._escalation = EscalationHandler()
|
||||
|
||||
run_id = str(uuid.uuid4())
|
||||
self._bb = Blackboard(run_id=run_id)
|
||||
|
||||
# Build adapters — VCS and notify are optional and swallow init errors.
|
||||
adapter_cfg: dict = self._config.get("adapters", {})
|
||||
runtime_cfg: dict = self._config.get("runtime", {})
|
||||
|
||||
self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic"))
|
||||
self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment]
|
||||
_VCS_ADAPTERS, adapter_cfg.get("vcs"), "VCS"
|
||||
)
|
||||
self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment]
|
||||
_NOTIFY_ADAPTERS, adapter_cfg.get("notify"), "notify"
|
||||
)
|
||||
self._default_runtime: RuntimeAdapter = self._build_runtime(
|
||||
runtime_cfg.get("default", "openclaw")
|
||||
)
|
||||
self._coding_runtime: RuntimeAdapter = self._build_runtime(
|
||||
runtime_cfg.get("coding_agent", "claude_code")
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Configuration helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _load_yaml(path: str) -> dict:
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
return yaml.safe_load(fh) or {}
|
||||
|
||||
@staticmethod
|
||||
def _load_text(path: str) -> str:
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
return fh.read()
|
||||
|
||||
def _build_llm(self, key: str) -> LLMAdapter:
|
||||
cls = _LLM_ADAPTERS.get(key)
|
||||
if cls is None:
|
||||
raise ValueError(
|
||||
f"Unknown LLM adapter {key!r}. Known: {list(_LLM_ADAPTERS)}"
|
||||
)
|
||||
return cls(self._config)
|
||||
|
||||
def _build_optional(
|
||||
self,
|
||||
registry: dict,
|
||||
key: Optional[str],
|
||||
label: str,
|
||||
) -> Optional[object]:
|
||||
"""Build an optional adapter, returning None on any init error."""
|
||||
if not key:
|
||||
return None
|
||||
cls = registry.get(key)
|
||||
if cls is None:
|
||||
logger.warning("Unknown %s adapter %r — skipping.", label, key)
|
||||
return None
|
||||
try:
|
||||
return cls(self._config)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"%s adapter %r could not be initialised (%s) — skipping.",
|
||||
label,
|
||||
key,
|
||||
exc,
|
||||
)
|
||||
return None
|
||||
|
||||
def _build_runtime(self, key: str) -> RuntimeAdapter:
|
||||
cls = _RUNTIME_ADAPTERS.get(key)
|
||||
if cls is None:
|
||||
raise ValueError(
|
||||
f"Unknown runtime adapter {key!r}. Known: {list(_RUNTIME_ADAPTERS)}"
|
||||
)
|
||||
return cls(self._config)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Role registry
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _resolve_personality(self, tier: int, role: str) -> Optional[str]:
|
||||
"""Return the path to the agent persona .md file, or None."""
|
||||
tier_key = f"t{tier}"
|
||||
tier_map: dict = self._role_registry.get(tier_key, {})
|
||||
path = tier_map.get(role) or tier_map.get("default")
|
||||
if path and os.path.isfile(path):
|
||||
return path
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Prompt helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _load_tier_prompt(self, tier: int) -> str:
|
||||
"""Load the system prompt for a tier from the prompts/ directory."""
|
||||
path = _TIER_PROMPTS.get(tier, "")
|
||||
if path and os.path.isfile(path):
|
||||
return self._load_text(path)
|
||||
logger.warning("Tier %d prompt not found at %r", tier, path)
|
||||
return ""
|
||||
|
||||
def _load_personality(self, path: Optional[str]) -> str:
|
||||
if path and os.path.isfile(path):
|
||||
return self._load_text(path)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def _extract_json(text: str) -> dict:
|
||||
"""
|
||||
Extract a JSON object from a potentially markdown-wrapped LLM response.
|
||||
|
||||
Strips leading/trailing markdown fences (```json ... ```) then parses.
|
||||
Falls back to a regex scan for the first ``{...}`` block if plain
|
||||
parsing fails.
|
||||
"""
|
||||
text = text.strip()
|
||||
# Strip markdown fences.
|
||||
if text.startswith("```"):
|
||||
text = re.sub(r"^```[a-z]*\n?", "", text)
|
||||
text = re.sub(r"\n?```\s*$", "", text.strip())
|
||||
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
m = re.search(r"\{.*\}", text, re.DOTALL)
|
||||
if m:
|
||||
try:
|
||||
return json.loads(m.group(0))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
raise ValueError(
|
||||
"Could not parse JSON from LLM response.\n"
|
||||
f"Response (first 500 chars): {text[:500]}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Brief dispatch
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _dispatch_brief(self, brief: TaskBrief) -> dict:
|
||||
"""
|
||||
Send a TaskBrief to the appropriate agent and return the raw result dict.
|
||||
|
||||
Routing
|
||||
-------
|
||||
preferred_runtime == "coding_agent" → coding runtime adapter
|
||||
preferred_runtime == "standard" → LLM adapter directly
|
||||
|
||||
Blackboard events emitted: spawned → completed | failed.
|
||||
"""
|
||||
if self._dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] dispatch tier=%d role=%s task=%.80s",
|
||||
brief.tier,
|
||||
brief.role,
|
||||
brief.task,
|
||||
)
|
||||
return {"status": "done", "output": "{}", "artifacts": []}
|
||||
|
||||
self._bb.update_brief_status(brief.brief_id, "active")
|
||||
self._bb.log_event(
|
||||
"spawned",
|
||||
brief_id=brief.brief_id,
|
||||
detail={"tier": brief.tier, "role": brief.role},
|
||||
)
|
||||
|
||||
try:
|
||||
if brief.preferred_runtime == "coding_agent":
|
||||
result = self._dispatch_via_runtime(brief)
|
||||
else:
|
||||
result = self._dispatch_via_llm(brief)
|
||||
|
||||
self._bb.update_brief_result(brief.brief_id, result)
|
||||
self._bb.log_event(
|
||||
"completed",
|
||||
brief_id=brief.brief_id,
|
||||
detail={"status": result.get("status")},
|
||||
)
|
||||
return result
|
||||
|
||||
except Exception as exc:
|
||||
self._bb.update_brief_status(brief.brief_id, "failed")
|
||||
self._bb.log_event(
|
||||
"failed",
|
||||
brief_id=brief.brief_id,
|
||||
detail={"error": str(exc)},
|
||||
)
|
||||
raise
|
||||
|
||||
def _dispatch_via_llm(self, brief: TaskBrief) -> dict:
|
||||
"""Call the LLM adapter with the tier system prompt + brief JSON."""
|
||||
tier_prompt = self._load_tier_prompt(brief.tier)
|
||||
personality = self._load_personality(brief.agent_personality)
|
||||
system_prompt = "\n\n".join(filter(None, [tier_prompt, personality]))
|
||||
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
|
||||
user_message = json.dumps(brief.to_dict(), indent=2)
|
||||
|
||||
raw = self._llm.complete(
|
||||
prompt=user_message,
|
||||
capability=capability,
|
||||
context={
|
||||
"system_prompt": system_prompt,
|
||||
"max_tokens": 4096,
|
||||
"temperature": 0,
|
||||
},
|
||||
)
|
||||
return self._extract_json(raw)
|
||||
|
||||
def _dispatch_via_runtime(self, brief: TaskBrief) -> dict:
|
||||
"""Spawn a coding agent via the runtime adapter and collect its result."""
|
||||
task_str = json.dumps(brief.to_dict(), indent=2)
|
||||
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
|
||||
timeout_s: int = brief.context.get("timeout_s", 300)
|
||||
|
||||
agent_id = self._coding_runtime.spawn(
|
||||
task=task_str,
|
||||
capability=capability,
|
||||
context=brief.context,
|
||||
)
|
||||
logger.info(
|
||||
"Spawned coding agent %s for brief %s", agent_id, brief.brief_id
|
||||
)
|
||||
|
||||
result = self._coding_runtime.get_result(agent_id, timeout_s=timeout_s)
|
||||
|
||||
# Attempt to parse JSON from the agent's text output.
|
||||
if isinstance(result.get("output"), str) and result["output"].strip():
|
||||
try:
|
||||
parsed = self._extract_json(result["output"])
|
||||
result.update(parsed)
|
||||
except ValueError:
|
||||
pass # Keep raw string output as-is.
|
||||
|
||||
return result
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Escalation loop
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run_with_escalation(
|
||||
self,
|
||||
brief: TaskBrief,
|
||||
workstream_id: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Dispatch a brief and apply the escalation policy until done or exhausted.
|
||||
|
||||
On retry the amended brief is persisted to the Blackboard before
|
||||
being re-submitted.
|
||||
"""
|
||||
while True:
|
||||
result = self._dispatch_brief(brief)
|
||||
decision = self._escalation.handle(brief, result)
|
||||
|
||||
if decision.action == "complete":
|
||||
return result
|
||||
|
||||
if decision.action == "escalate":
|
||||
self._bb.log_event(
|
||||
"escalated",
|
||||
brief_id=brief.brief_id,
|
||||
detail={"reason": decision.reason},
|
||||
)
|
||||
raise EscalationError(
|
||||
f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) "
|
||||
f"escalated: {decision.reason}"
|
||||
)
|
||||
|
||||
# "retry" or "salvage_and_retry"
|
||||
self._bb.log_event(
|
||||
"retried",
|
||||
brief_id=brief.brief_id,
|
||||
detail={"reason": decision.reason, "action": decision.action},
|
||||
)
|
||||
amended = decision.amended_brief
|
||||
if amended is None:
|
||||
raise EscalationError(
|
||||
f"Escalation returned action={decision.action!r} "
|
||||
"but no amended_brief was provided."
|
||||
)
|
||||
# Persist the new brief and loop.
|
||||
self._bb.create_brief(amended, workstream_id=workstream_id)
|
||||
brief = amended
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Tier output parsers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _parse_t1_output(
|
||||
self, result: dict, root_brief: TaskBrief
|
||||
) -> list[TaskBrief]:
|
||||
"""Build T2 TaskBriefs from T1 (Visionary) JSON output."""
|
||||
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
|
||||
workstreams: list[dict] = result.get("workstreams", [])
|
||||
|
||||
# T1 sets the canonical goal_anchor; propagate it back to root.
|
||||
goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor
|
||||
root_brief.goal_anchor = goal_anchor
|
||||
|
||||
briefs: list[TaskBrief] = []
|
||||
for ws in workstreams:
|
||||
role = ws.get("role", "default")
|
||||
brief = root_brief.make_child_brief(
|
||||
tier=2,
|
||||
role=role,
|
||||
task=ws.get("task", ""),
|
||||
workstream=ws.get("name", ""),
|
||||
acceptance_criteria=ws.get("acceptance_criteria", []),
|
||||
preferred_runtime="standard",
|
||||
agent_personality=self._resolve_personality(2, role),
|
||||
retry_budget=retry_bad,
|
||||
)
|
||||
briefs.append(brief)
|
||||
return briefs
|
||||
|
||||
def _parse_t2_output(
|
||||
self, result: dict, parent: TaskBrief
|
||||
) -> list[TaskBrief]:
|
||||
"""Build T3 TaskBriefs from T2 (Architect) JSON output."""
|
||||
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
|
||||
subtasks: list[dict] = result.get("subtasks", [])
|
||||
arch_summary: str = result.get("architecture_summary", "")
|
||||
|
||||
briefs: list[TaskBrief] = []
|
||||
for st in subtasks:
|
||||
role = st.get("role", "default")
|
||||
brief = parent.make_child_brief(
|
||||
tier=3,
|
||||
role=role,
|
||||
task=st.get("task", ""),
|
||||
workstream=parent.workstream,
|
||||
acceptance_criteria=st.get("acceptance_criteria", []),
|
||||
preferred_runtime=st.get("preferred_runtime", "standard"),
|
||||
agent_personality=self._resolve_personality(3, role),
|
||||
retry_budget=retry_bad,
|
||||
context={"architecture_summary": arch_summary},
|
||||
)
|
||||
briefs.append(brief)
|
||||
return briefs
|
||||
|
||||
def _parse_t3_output(
|
||||
self, result: dict, parent: TaskBrief
|
||||
) -> list[TaskBrief]:
|
||||
"""Build T4 TaskBriefs from T3 (Squad Lead) JSON output."""
|
||||
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
|
||||
tasks: list[dict] = result.get("tasks", [])
|
||||
plan_summary: str = result.get("plan_summary", "")
|
||||
|
||||
briefs: list[TaskBrief] = []
|
||||
for task in tasks:
|
||||
role = task.get("role", "default")
|
||||
pref_runtime = task.get("preferred_runtime", "standard")
|
||||
brief = parent.make_child_brief(
|
||||
tier=4,
|
||||
role=role,
|
||||
task=task.get("task", ""),
|
||||
workstream=parent.workstream,
|
||||
acceptance_criteria=task.get("acceptance_criteria", []),
|
||||
preferred_runtime=pref_runtime,
|
||||
agent_personality=self._resolve_personality(4, role),
|
||||
retry_budget=retry_bad,
|
||||
context={
|
||||
"plan_summary": plan_summary,
|
||||
"depends_on": task.get("depends_on", []),
|
||||
},
|
||||
)
|
||||
briefs.append(brief)
|
||||
return briefs
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# VCS helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _commit_artifacts(
|
||||
self,
|
||||
artifacts: list[dict],
|
||||
brief: TaskBrief,
|
||||
) -> None:
|
||||
"""Commit T4 *file* artifacts to the configured VCS adapter."""
|
||||
if not self._vcs or self._dry_run:
|
||||
if self._dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] Would commit %d artifact(s) for brief %s",
|
||||
len(artifacts),
|
||||
brief.brief_id,
|
||||
)
|
||||
return
|
||||
|
||||
file_map: dict[str, str] = {
|
||||
a["path"]: a["content"]
|
||||
for a in artifacts
|
||||
if a.get("type") == "file"
|
||||
and a.get("path")
|
||||
and a.get("content") is not None
|
||||
}
|
||||
if not file_map:
|
||||
return
|
||||
|
||||
branch: str = self._config.get("run", {}).get("base_branch", "main")
|
||||
message = (
|
||||
f"feat({brief.workstream}): artifacts from {brief.role} "
|
||||
f"[brief {brief.brief_id[:8]}]"
|
||||
)
|
||||
try:
|
||||
# GitHubAdapter.commit accepts dict[str, str] as files.
|
||||
sha = self._vcs.commit(file_map, message) # type: ignore[call-arg]
|
||||
logger.info(
|
||||
"Committed %d artifact(s) → SHA %s", len(file_map), sha
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("VCS commit failed: %s", exc)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Notification
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _notify_run(self, outcome: str, goal: str, detail: dict) -> None:
|
||||
if not self._notify or self._dry_run:
|
||||
if self._dry_run:
|
||||
logger.info(
|
||||
"[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal
|
||||
)
|
||||
return
|
||||
|
||||
level = "info" if outcome == "complete" else "error"
|
||||
if outcome == "complete":
|
||||
message = f"Pipeline complete: {goal[:80]}"
|
||||
else:
|
||||
message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}"
|
||||
|
||||
self._notify.send(
|
||||
message,
|
||||
context={
|
||||
"level": level,
|
||||
"run_id": self._bb.run_id,
|
||||
"outcome": outcome,
|
||||
**{k: str(v) for k, v in detail.items()},
|
||||
},
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Execute the full pipeline from T1 decomposition through T5 verification.
|
||||
Execute the full T1→T5 pipeline.
|
||||
|
||||
TODO (Phase 2):
|
||||
- Build root T1 brief from config.run.goal.
|
||||
- Dispatch to T1 Visionary via LLM adapter.
|
||||
- Parse workstreams from T1 output.
|
||||
- For each workstream: dispatch T2 Architect.
|
||||
- For each T2 subtask: dispatch T3 Squad Lead.
|
||||
- For each T3 task: dispatch T4 Implementer.
|
||||
- For each T4 artifact set: dispatch T5 Verifier.
|
||||
- Run escalation handler at each tier on failure.
|
||||
- Commit passing artifacts via VCS adapter.
|
||||
- Notify on completion or terminal failure via notify adapter.
|
||||
Steps
|
||||
-----
|
||||
1. Dispatch T1 Visionary to decompose the goal into workstreams.
|
||||
2. For each workstream: T2 Architect → T3 Squad Lead →
|
||||
T4 Implementer → T5 Verifier.
|
||||
3. Commit passing T4 artifacts via VCS adapter (if configured).
|
||||
4. Notify on completion or terminal failure via notify adapter.
|
||||
"""
|
||||
raise NotImplementedError("TeamRunner.run is not yet implemented.")
|
||||
goal: str = self._config["run"]["goal"]
|
||||
self._bb.create_run(goal=goal)
|
||||
self._bb.update_run_status("active")
|
||||
logger.info("Pipeline started — goal: %s", goal)
|
||||
|
||||
def _dispatch_brief(self, brief) -> dict:
|
||||
"""
|
||||
Send a single TaskBrief to the appropriate agent and return the result.
|
||||
try:
|
||||
self._orchestrate(goal)
|
||||
self._bb.update_run_status("done")
|
||||
summary = self._bb.get_run_summary()
|
||||
logger.info("Pipeline complete. Summary: %s", summary)
|
||||
self._notify_run("complete", goal, summary)
|
||||
except Exception as exc:
|
||||
self._bb.update_run_status("failed")
|
||||
logger.error("Pipeline failed: %s", exc, exc_info=True)
|
||||
self._notify_run("failed", goal, {"error": str(exc)})
|
||||
raise
|
||||
finally:
|
||||
self._bb.close()
|
||||
|
||||
TODO (Phase 2):
|
||||
- Select runtime based on brief.preferred_runtime.
|
||||
- Load agent personality from brief.agent_personality (if set).
|
||||
- Compose prompt from tier system prompt + brief payload.
|
||||
- Spawn agent via runtime adapter.
|
||||
- Await result via runtime.get_result().
|
||||
- Log spawned/completed/failed events to Blackboard.
|
||||
"""
|
||||
raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.")
|
||||
# ------------------------------------------------------------------
|
||||
# Internal orchestration
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _orchestrate(self, goal: str) -> None:
|
||||
"""Build the root T1 brief, dispatch it, and fan out per workstream."""
|
||||
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
|
||||
|
||||
# ---- T1: Visionary ----
|
||||
t1_brief = TaskBrief(
|
||||
run_id=self._bb.run_id,
|
||||
tier=1,
|
||||
role="default",
|
||||
goal_anchor=goal,
|
||||
task=(
|
||||
"You are the T1 Visionary. "
|
||||
"Decompose the following goal into parallel workstreams "
|
||||
f"for the engineering team: {goal}"
|
||||
),
|
||||
workstream="root",
|
||||
retry_budget=retry_bad,
|
||||
preferred_runtime="standard",
|
||||
agent_personality=self._resolve_personality(1, "default"),
|
||||
)
|
||||
self._bb.create_brief(t1_brief)
|
||||
|
||||
t1_result = self._run_with_escalation(t1_brief)
|
||||
t2_briefs = self._parse_t1_output(t1_result, t1_brief)
|
||||
logger.info("T1 produced %d workstream(s)", len(t2_briefs))
|
||||
|
||||
# ---- T2..T5: per workstream ----
|
||||
for t2_brief in t2_briefs:
|
||||
ws_id = self._bb.create_workstream(
|
||||
name=t2_brief.workstream, tier=2
|
||||
)
|
||||
self._bb.create_brief(t2_brief, workstream_id=ws_id)
|
||||
self._bb.update_workstream_status(ws_id, "active")
|
||||
|
||||
try:
|
||||
self._run_workstream(t2_brief, ws_id)
|
||||
self._bb.update_workstream_status(ws_id, "done")
|
||||
except EscalationError as exc:
|
||||
self._bb.update_workstream_status(ws_id, "failed")
|
||||
self._bb.log_event(
|
||||
"failed",
|
||||
detail={"error": str(exc), "workstream": t2_brief.workstream},
|
||||
)
|
||||
logger.error(
|
||||
"Workstream %r failed: %s", t2_brief.workstream, exc
|
||||
)
|
||||
|
||||
def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None:
|
||||
"""Drive T2 → T3 → T4 → T5 for a single workstream."""
|
||||
# T2: Architect
|
||||
t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id)
|
||||
t3_briefs = self._parse_t2_output(t2_result, t2_brief)
|
||||
logger.info(
|
||||
"T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs)
|
||||
)
|
||||
|
||||
for t3_brief in t3_briefs:
|
||||
self._bb.create_brief(t3_brief, workstream_id=ws_id)
|
||||
try:
|
||||
# T3: Squad Lead
|
||||
t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id)
|
||||
t4_briefs = self._parse_t3_output(t3_result, t3_brief)
|
||||
logger.info(
|
||||
"T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs)
|
||||
)
|
||||
|
||||
for t4_brief in t4_briefs:
|
||||
self._bb.create_brief(t4_brief, workstream_id=ws_id)
|
||||
try:
|
||||
# T4: Implementer
|
||||
t4_result = self._run_with_escalation(
|
||||
t4_brief, workstream_id=ws_id
|
||||
)
|
||||
artifacts: list[dict] = t4_result.get("artifacts", [])
|
||||
|
||||
# T5: Verifier
|
||||
t5_brief = t4_brief.make_child_brief(
|
||||
tier=5,
|
||||
role="code",
|
||||
task=(
|
||||
"Verify the following T4 implementation artifacts "
|
||||
"against all acceptance criteria. "
|
||||
f"T4 output: {json.dumps(t4_result)[:2000]}"
|
||||
),
|
||||
workstream=t4_brief.workstream,
|
||||
acceptance_criteria=t4_brief.acceptance_criteria,
|
||||
preferred_runtime="standard",
|
||||
agent_personality=self._resolve_personality(5, "code"),
|
||||
retry_budget=self._config.get(
|
||||
"retry_defaults", {}
|
||||
).get("bad_output", 3),
|
||||
context={"t4_result": t4_result},
|
||||
)
|
||||
self._bb.create_brief(t5_brief, workstream_id=ws_id)
|
||||
t5_result = self._run_with_escalation(
|
||||
t5_brief, workstream_id=ws_id
|
||||
)
|
||||
|
||||
# Commit on verified pass.
|
||||
if t5_result.get("status") in ("passed", "done"):
|
||||
self._commit_artifacts(artifacts, t4_brief)
|
||||
|
||||
except EscalationError as exc:
|
||||
logger.error(
|
||||
"T4/T5 escalation in %s: %s", t4_brief.role, exc
|
||||
)
|
||||
|
||||
except EscalationError as exc:
|
||||
logger.error("T3 escalation in %s: %s", t3_brief.role, exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI entry point (Phase 2)
|
||||
# CLI entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# TODO (Phase 2): Implement argparse CLI.
|
||||
# if __name__ == "__main__":
|
||||
# parser = argparse.ArgumentParser(description="Run the-agency pipeline.")
|
||||
# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml")
|
||||
# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing")
|
||||
# args = parser.parse_args()
|
||||
# runner = TeamRunner(config_path=args.config)
|
||||
# runner.run()
|
||||
def _configure_logging(verbose: bool = False) -> None:
|
||||
level = logging.DEBUG if verbose else logging.INFO
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run the-agency T1→T5 pipeline.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
default="config/team.yaml",
|
||||
help="Path to team.yaml configuration file.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Log all planned actions without executing LLM calls, "
|
||||
"VCS commits, or notifications."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="Enable DEBUG-level logging.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
_configure_logging(args.verbose)
|
||||
|
||||
runner = TeamRunner(config_path=args.config, dry_run=args.dry_run)
|
||||
runner.run()
|
||||
|
||||
Reference in New Issue
Block a user