Files
the-agency/core/team_runner.py
Hans Heinemann 8994f87a43 feat: implement core/team_runner.py and cli/agency.py
- core/blackboard.py: add t3_task_lists table, extend event kinds to
  include full visibility vocabulary (gate_pending, gate_approved,
  gate_rejected, gate_paused, gate_resumed, path_amendment, log), and
  add query methods (get_events, get_latest_gate_event, get_t3_task_lists,
  all_t3_committed, get_briefs, get_workstreams, etc.)

- core/team_runner.py: full run lifecycle orchestrator
  - Loads team.yaml + role_registry.yaml, instantiates all four adapter types
  - T1 Plan (two-phase: plan + accept), T2 Lead/Specialist/Synthesis,
    T3 Squad Lead with mesh draft/commit cycle and mesh-timeout escalation,
    T4 swarm+pipeline with dep-ordering, T5 fan-out + joint verdict
  - Async (asyncio.gather) for parallel workstream and T4/T5 fan-out
  - Gate logic: gate_pending → notify → poll blackboard → gate_approved/rejected
  - Path amendment monitor (background task)
  - EscalationHandler integrated into _dispatch_with_retry
  - T3 mesh timeout → T2 re-scope escalation
  - Terminal failure → notify + run status=failed
  - VCS branch creation + PR at T1 Accept phase
  - Runtime selection: coding_agent runtime for preferred_runtime="coding_agent",
    tier_runtime_map overrides, fallback to default runtime

- cli/agency.py: agency CLI with subcommands
  - run: start pipeline, prints run_id + watch/inspect hints
  - watch: tails blackboard events live with ANSI-coloured output
  - inspect: run tree, --tier filter, --brief detail view
  - approve: write gate_approved directly to blackboard (universal gate path)
  - reject: write gate_rejected with --reason
  - pause: write gate_paused signal
  - resume: write gate_resumed signal

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:40:43 -04:00

1626 lines
61 KiB
Python

"""
core/team_runner.py
Full run lifecycle orchestrator for the-agency tiered agent pipeline.
Responsibilities
----------------
1. Load config/team.yaml and config/role_registry.yaml.
2. Instantiate adapters (LLM, VCS, notify, runtime) from config.
3. Create a Blackboard for the run.
4. Build the root T1 brief and kick off the spawn loop.
5. Spawn loop: detect pending briefs, enforce gates, call runtime_adapter.spawn().
6. Gate logic: write gate_pending → notify → poll for gate_approved/gate_rejected.
7. Path amendment monitor: detect path_amendment events, relay to higher tier.
8. T3 mesh timeout → T2 escalation.
9. T1 failure + terminal human escalation (runner-owned; tier failures owned by escalation.py).
10. Write final run status and create PR.
"""
from __future__ import annotations
import asyncio
import importlib
import json
import logging
import os
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import yaml
from core.blackboard import Blackboard
from core.escalation import EscalationHandler
from core.task_brief import TaskBrief
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Adapter registry — maps config key → dotted class path
# ---------------------------------------------------------------------------
_LLM_ADAPTERS: dict[str, str] = {
"anthropic": "adapters.llm.anthropic.AnthropicAdapter",
}
_VCS_ADAPTERS: dict[str, str] = {
"github": "adapters.vcs.github.GitHubAdapter",
}
_NOTIFY_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.notify.openclaw.OpenClawNotifyAdapter",
}
_RUNTIME_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.runtime.openclaw.OpenClawRuntimeAdapter",
"claude_code": "adapters.runtime.claude_code.ClaudeCodeRuntimeAdapter",
}
# Poll intervals (seconds)
_GATE_POLL_INTERVAL_S = 5
_PATH_AMENDMENT_POLL_INTERVAL_S = 10
# Tier → capability level
_TIER_CAPABILITY: dict[int, str] = {
1: "reasoning-heavy",
2: "reasoning-heavy",
3: "capable",
4: "fast-cheap",
5: "capable",
}
# Tier → default agent timeout (seconds)
_TIER_TIMEOUT_S: dict[int, int] = {
1: 300,
2: 600,
3: 300,
4: 600,
5: 300,
}
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class GateRejectedError(Exception):
"""Raised when a human rejects an inspection gate."""
def __init__(self, gate: str, reason: str = "") -> None:
self.gate = gate
self.reason = reason
super().__init__(f"Gate {gate!r} rejected: {reason}")
class GateTimeoutError(Exception):
"""Raised when an inspection gate auto-rejects due to timeout."""
def __init__(self, gate: str, timeout_minutes: int) -> None:
self.gate = gate
self.timeout_minutes = timeout_minutes
super().__init__(f"Gate {gate!r} timed out after {timeout_minutes}m")
class TerminalEscalationError(Exception):
"""Raised when the run hits an unrecoverable failure that needs human intervention."""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _load_adapter(registry: dict[str, str], key: str, config: dict) -> Any:
"""Import and instantiate an adapter by registry key."""
if key not in registry:
raise ValueError(
f"Unknown adapter key {key!r}. Known: {sorted(registry.keys())}"
)
module_path, class_name = registry[key].rsplit(".", 1)
module = importlib.import_module(module_path)
cls = getattr(module, class_name)
return cls(config)
def _strip_json_fences(text: str) -> str:
"""Remove ``` / ```json fences from an LLM response."""
text = text.strip()
if text.startswith("```"):
lines = text.splitlines()
# Drop first line (``` or ```json) and last line if it's ```
start = 1
end = len(lines) - 1 if lines[-1].strip() == "```" else len(lines)
text = "\n".join(lines[start:end])
return text.strip()
def _extract_json(raw: str) -> dict:
"""
Try to parse a JSON dict from raw agent output.
Handles markdown fences and leading/trailing prose.
"""
cleaned = _strip_json_fences(raw)
# Direct parse
try:
result = json.loads(cleaned)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
# Find first { … } block
start = cleaned.find("{")
end = cleaned.rfind("}") + 1
if start >= 0 and end > start:
try:
result = json.loads(cleaned[start:end])
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
raise ValueError(f"No JSON object found in output: {raw[:300]!r}")
# ---------------------------------------------------------------------------
# TeamRunner
# ---------------------------------------------------------------------------
class TeamRunner:
"""
Orchestrates a full T1→T5 agent pipeline run.
Usage::
runner = TeamRunner(config_path="config/team.yaml")
runner.run()
"""
def __init__(
self,
config_path: str = "config/team.yaml",
role_registry_path: str = "config/role_registry.yaml",
dry_run: bool = False,
run_id: Optional[str] = None,
) -> None:
self.dry_run = dry_run
# ------------------------------------------------------------------
# Load configs
# ------------------------------------------------------------------
with open(config_path) as fh:
self.config: dict = yaml.safe_load(fh)
with open(role_registry_path) as fh:
self.role_registry: dict = yaml.safe_load(fh)
# ------------------------------------------------------------------
# Run identity
# ------------------------------------------------------------------
self.run_id: str = run_id or str(uuid.uuid4())[:8]
# ------------------------------------------------------------------
# Instantiate adapters
# ------------------------------------------------------------------
adapter_cfg = self.config.get("adapters", {})
self.llm = _load_adapter(
_LLM_ADAPTERS, adapter_cfg.get("llm", "anthropic"), self.config
)
self.vcs = _load_adapter(
_VCS_ADAPTERS, adapter_cfg.get("vcs", "github"), self.config
)
self.notify = _load_adapter(
_NOTIFY_ADAPTERS, adapter_cfg.get("notify", "openclaw"), self.config
)
runtime_cfg = self.config.get("runtime", {})
default_runtime_key = runtime_cfg.get("default", "openclaw")
self._default_runtime = _load_adapter(
_RUNTIME_ADAPTERS, default_runtime_key, self.config
)
coding_agent_key = runtime_cfg.get("coding_agent", "claude_code")
try:
self._coding_runtime = _load_adapter(
_RUNTIME_ADAPTERS, coding_agent_key, self.config
)
except Exception as exc:
logger.warning(
"Coding agent runtime (%s) unavailable — falling back to default: %s",
coding_agent_key,
exc,
)
self._coding_runtime = self._default_runtime
self._tier_runtime_map: dict[str, str] = runtime_cfg.get("tier_runtime_map", {})
# ------------------------------------------------------------------
# Visibility / gate config
# ------------------------------------------------------------------
vis = self.config.get("visibility", {})
strict = vis.get("strict_mode", False)
raw_gates = vis.get("inspection_gates", {})
if strict:
self._gates: dict[str, bool] = {
g: True
for g in ("t1_plan", "t2_lead", "t2_synthesis", "t3_plan", "t5_verdict")
}
else:
self._gates = {
"t1_plan": True, # always required by design
"t2_lead": raw_gates.get("t2_lead", False),
"t2_synthesis": raw_gates.get("t2_synthesis", True),
"t3_plan": raw_gates.get("t3_plan", False),
"t5_verdict": raw_gates.get("t5_verdict", False),
}
self._gate_timeout_minutes: int = vis.get("gate_timeout_minutes", 60)
self._t3_mesh_timeout_minutes: int = self.config.get(
"t3_mesh_timeout_minutes", 10
)
# ------------------------------------------------------------------
# Retry defaults
# ------------------------------------------------------------------
retry = self.config.get("retry_defaults", {})
self._retry_bad_output: int = retry.get("bad_output", 3)
self._retry_partial: int = retry.get("partial", 2)
self._retry_blocked: int = retry.get("blocked", 0)
# ------------------------------------------------------------------
# Blackboard
# ------------------------------------------------------------------
self.bb = Blackboard(self.run_id)
self.bb.create_run(goal=self.config["run"]["goal"])
# ------------------------------------------------------------------
# Escalation handler
# ------------------------------------------------------------------
self._escalation = EscalationHandler()
self._log(
f"TeamRunner initialized — run_id={self.run_id!r} "
f"goal={self.config['run']['goal']!r}"
)
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
def run(self) -> None:
"""Synchronous entry point. Runs the async pipeline."""
asyncio.run(self._run_async())
# ------------------------------------------------------------------
# Main async pipeline
# ------------------------------------------------------------------
async def _run_async(self) -> None:
goal = self.config["run"]["goal"]
base_branch = self.config["run"].get("base_branch", "main")
branch = f"integration/{self.run_id}"
self.bb.update_run_status("active")
self._log(f"Run active — goal={goal!r} branch={branch!r}")
try:
# Create integration branch
if not self.dry_run:
try:
self.vcs.create_branch(branch)
self._log(f"Branch {branch!r} created")
except Exception as exc:
self._log(
f"Warning: could not create branch {branch!r}: {exc}",
level="warning",
)
# ----------------------------------------------------------
# T1 Plan phase
# ----------------------------------------------------------
workplan = await self._run_t1_plan(goal)
# ----------------------------------------------------------
# T2→T5 execution per workplan
# ----------------------------------------------------------
ws_results = await self._execute_workplan(workplan)
# ----------------------------------------------------------
# T1 Accept phase
# ----------------------------------------------------------
pr_url = await self._run_t1_accept(workplan, ws_results, branch, base_branch)
self.bb.update_run_status("review")
self._log(f"Run complete — PR: {pr_url or '(none)'}")
except (GateRejectedError, GateTimeoutError) as exc:
self._log(f"Run paused at gate: {exc}", level="warning")
self.bb.update_run_status("review") # Halted awaiting human
raise
except TerminalEscalationError as exc:
self._log(f"Terminal escalation: {exc}", level="error")
self.bb.update_run_status("failed")
self.bb.log_event(
"failed", detail={"error": str(exc), "terminal": True}
)
self.notify.send(
f"[agency] Run {self.run_id} — TERMINAL FAILURE: {exc}",
context={"run_id": self.run_id, "goal": goal, "error": str(exc)},
)
raise
except Exception as exc:
logger.exception("Unexpected failure in run %s", self.run_id)
self.bb.update_run_status("failed")
self.bb.log_event(
"failed", detail={"error": str(exc), "terminal": True}
)
self.notify.send(
f"[agency] Run {self.run_id} — FAILED: {exc}",
context={"run_id": self.run_id, "goal": goal, "error": str(exc)},
)
raise
finally:
self.bb.close()
# ------------------------------------------------------------------
# T1 Plan phase
# ------------------------------------------------------------------
async def _run_t1_plan(self, goal: str) -> dict:
"""
Run T1 Phase 1 (Plan):
1. Spawn T1 Visionary with planning prompt.
2. Parse workplan JSON from output.
3. Halt at t1_plan gate (always required).
4. Return workplan dict.
"""
persona = self._resolve_persona("t1", "default")
fallback = self._load_prompt("t1_visionary")
brief = TaskBrief(
run_id=self.run_id,
tier=1,
role="t1_plan",
goal_anchor=goal,
workstream="t1",
task=(
f"{fallback}\n\n"
f"---\n\n"
f"GOAL: {goal}\n\n"
f"PHASE 1 — PLAN:\n"
f"1. Assess complexity (high | medium | low).\n"
f"2. Identify workstreams and declare tier_path per workstream "
f"(e.g. [\"t2\",\"t3\",\"t4\",\"t5\"] or [\"t3\",\"t4\",\"t5\"] or [\"t4\",\"t5\"]).\n"
f"3. Declare parallelism groups (which workstreams can run concurrently).\n"
f"4. Set retry_budget_multiplier (1 = simple, 2 = complex).\n"
f"5. Self-critique in one pass — amend the plan.\n\n"
f"Return ONLY a JSON object matching this schema exactly "
f"(no markdown fences, no commentary):\n"
f'{{\n'
f' "run_id": "{self.run_id}",\n'
f' "goal_anchor": "<copy goal verbatim>",\n'
f' "complexity": "high|medium|low",\n'
f' "retry_budget_multiplier": 1,\n'
f' "workstreams": [\n'
f' {{\n'
f' "id": "ws-<slug>",\n'
f' "name": "<name>",\n'
f' "domain": "<domain>",\n'
f' "tier_path": ["t2","t3","t4","t5"],\n'
f' "parallel_group": "A",\n'
f' "t2_specialist": "<optional agent path>",\n'
f' "notes": "<notes>"\n'
f' }}\n'
f' ],\n'
f' "parallelism": {{\n'
f' "groups": {{"A": ["ws-..."]}},\n'
f' "sequence": ["A"]\n'
f' }},\n'
f' "self_critique_summary": "<summary>"\n'
f'}}'
),
acceptance_criteria=["Valid JSON workplan returned with workstreams array"],
retry_budget=self._retry_bad_output,
preferred_runtime="standard",
agent_personality=persona,
)
brief.validate()
self.bb.create_brief(brief)
self._log("T1 Plan phase starting")
result = await self._dispatch_with_retry(brief)
# Parse workplan
workplan = self._parse_json_result(result, brief)
workplan.setdefault("run_id", self.run_id)
workplan.setdefault("goal_anchor", goal)
self.bb.update_brief_result(brief.brief_id, workplan)
self._log(
f"T1 Plan done — complexity={workplan.get('complexity')!r} "
f"workstreams={len(workplan.get('workstreams', []))}"
)
# Gate: t1_plan (always required by design)
await self._check_gate(
gate_name="t1_plan",
brief=brief,
summary=self._format_workplan_summary(workplan),
what_happens_next=(
f"T2 Architects will spawn for "
f"{len(workplan.get('workstreams', []))} workstream(s)"
),
)
return workplan
# ------------------------------------------------------------------
# Workplan execution
# ------------------------------------------------------------------
async def _execute_workplan(self, workplan: dict) -> dict[str, Any]:
"""
Execute all workstreams in parallelism-group order.
Returns a dict mapping workstream_id → final context.
"""
parallelism = workplan.get("parallelism", {})
groups: dict[str, list[str]] = parallelism.get("groups", {})
sequence: list[str] = parallelism.get("sequence", [])
ws_map = {ws["id"]: ws for ws in workplan.get("workstreams", [])}
# Normalise: if no explicit parallelism, put all in one group
if not sequence:
sequence = ["A"]
groups = {"A": list(ws_map.keys())}
all_results: dict[str, Any] = {}
for group_name in sequence:
ws_ids = groups.get(group_name, [])
ws_defs = [ws_map[wid] for wid in ws_ids if wid in ws_map]
self._log(
f"Parallelism group {group_name!r}: "
f"{[w['name'] for w in ws_defs]}"
)
group_results = await asyncio.gather(
*[self._run_workstream(ws, workplan) for ws in ws_defs],
return_exceptions=True,
)
for ws, res in zip(ws_defs, group_results):
if isinstance(res, Exception):
self._log(
f"Workstream {ws['name']!r} failed: {res}", level="error"
)
all_results[ws["id"]] = {"error": str(res)}
else:
all_results[ws["id"]] = res
return all_results
async def _run_workstream(self, ws_def: dict, workplan: dict) -> dict:
"""Execute the full tier_path for one workstream."""
ws_id = ws_def["id"]
ws_name = ws_def["name"]
tier_path: list[str] = ws_def.get("tier_path", ["t2", "t3", "t4", "t5"])
retry_mul = int(workplan.get("retry_budget_multiplier", 1))
ws_bb_id = self.bb.create_workstream(
workstream_id=ws_id, name=ws_name, tier=int(tier_path[0][1:]) if tier_path else 2
)
self.bb.update_workstream_status(ws_bb_id, "active")
self._log(f"Workstream {ws_name!r} starting — tier_path={tier_path}")
ctx: dict = {
"workstream_id": ws_id,
"workstream_name": ws_name,
"domain": ws_def.get("domain", "default"),
"goal_anchor": workplan["goal_anchor"],
"t1_notes": ws_def.get("notes", ""),
}
try:
for tier_name in tier_path:
if tier_name == "t2":
ctx = await self._run_t2(ws_def, ctx, workplan, retry_mul)
elif tier_name == "t3":
ctx = await self._run_t3(ws_def, ctx, workplan, retry_mul)
elif tier_name == "t4":
ctx = await self._run_t4(ws_def, ctx, workplan, retry_mul)
elif tier_name == "t5":
ctx = await self._run_t5(ws_def, ctx, workplan, retry_mul)
else:
self._log(
f"Unknown tier {tier_name!r} in path for {ws_name!r} — skipping",
level="warning",
)
self.bb.update_workstream_status(ws_bb_id, "done")
self._log(f"Workstream {ws_name!r} complete")
return ctx
except Exception:
self.bb.update_workstream_status(ws_bb_id, "failed")
raise
# ------------------------------------------------------------------
# T2 — Architect
# ------------------------------------------------------------------
async def _run_t2(
self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int
) -> dict:
"""T2 Lead Architect → T2 Specialist → Lead Synthesis."""
ws_name = ws_def["name"]
domain = ws_def.get("domain", "default")
goal = workplan["goal_anchor"]
budget = int(self._retry_bad_output * retry_mul)
# --- Lead Architect ---
lead_persona = self._resolve_persona("t2", "default")
lead_brief = TaskBrief(
run_id=self.run_id,
tier=2,
role="t2_lead",
goal_anchor=goal,
workstream=ws_def["id"],
task=(
f"{self._load_prompt('t2_architect')}\n\n---\n\n"
f"T2 LEAD ARCHITECT — {ws_name}\n"
f"Goal: {goal}\nDomain: {domain}\n"
f"T1 notes: {ws_def.get('notes', '')}\n\n"
f"1. Define explicit domain boundaries (owns / excludes).\n"
f"2. Publish shared assumptions (auth, data formats, API patterns).\n"
f"3. Outline overall architectural approach.\n\n"
f"Return JSON:\n"
f'{{\n'
f' "domain_boundaries": {{"description":"","owns":[],"explicitly_excludes":[]}},\n'
f' "shared_assumptions": {{"auth":"","data_formats":"","api_patterns":"","key_decisions":[]}},\n'
f' "architecture_outline": ""\n'
f'}}'
),
acceptance_criteria=["domain_boundaries and shared_assumptions present"],
context={"workstream_context": self._safe_ctx(ctx)},
retry_budget=budget,
preferred_runtime="standard",
agent_personality=lead_persona,
)
lead_brief.validate()
self.bb.create_brief(lead_brief, workstream_id=ws_def["id"])
self._log(f"[{ws_name}] T2 Lead spawning")
lead_result = await self._dispatch_with_retry(lead_brief)
lead_json = self._try_parse_json(lead_result)
self.bb.update_brief_result(lead_brief.brief_id, lead_json)
ctx["t2_lead_output"] = lead_json
ctx["domain_boundaries"] = lead_json.get("domain_boundaries", {})
ctx["shared_assumptions"] = lead_json.get("shared_assumptions", {})
# Gate: t2_lead
if self._gates.get("t2_lead"):
await self._check_gate(
gate_name="t2_lead",
brief=lead_brief,
summary=f"T2 Lead published domain boundaries for {ws_name}",
what_happens_next=f"T2 {domain} specialist will spawn",
)
# --- Specialist ---
spec_persona = self._resolve_persona("t2", domain)
spec_brief = TaskBrief(
run_id=self.run_id,
tier=2,
role="t2_specialist",
goal_anchor=goal,
workstream=ws_def["id"],
task=(
f"T2 SPECIALIST ARCHITECT — {ws_name} ({domain})\n"
f"Goal: {goal}\n\n"
f"Domain boundaries:\n{json.dumps(ctx['domain_boundaries'], indent=2)}\n\n"
f"Shared assumptions:\n{json.dumps(ctx['shared_assumptions'], indent=2)}\n\n"
f"Design the {domain} domain architecture:\n"
f"- Component breakdown\n- Interface contracts\n"
f"- Data models\n- Key implementation decisions\n\n"
f"Return JSON:\n"
f'{{\n'
f' "architecture": "",\n'
f' "components": [{{"name":"","responsibility":"","interfaces":[]}}],\n'
f' "interface_contracts": {{}},\n'
f' "t3_brief_context": ""\n'
f'}}'
),
acceptance_criteria=["architecture and components present"],
context={"workstream_context": self._safe_ctx(ctx)},
retry_budget=budget,
preferred_runtime="standard",
agent_personality=spec_persona,
)
spec_brief.validate()
self.bb.create_brief(spec_brief, workstream_id=ws_def["id"])
self._log(f"[{ws_name}] T2 Specialist ({domain}) spawning")
spec_result = await self._dispatch_with_retry(spec_brief)
spec_json = self._try_parse_json(spec_result)
self.bb.update_brief_result(spec_brief.brief_id, spec_json)
ctx["t2_specialist_output"] = spec_json
# --- Lead Synthesis ---
synth_brief = TaskBrief(
run_id=self.run_id,
tier=2,
role="t2_synthesis",
goal_anchor=goal,
workstream=ws_def["id"],
task=(
f"T2 LEAD — SYNTHESIS for {ws_name}\n\n"
f"Specialist output (truncated):\n"
f"{json.dumps(spec_json, indent=2)[:3000]}\n\n"
f"Synthesise into canonical architecture.\n\n"
f"Return JSON:\n"
f'{{\n'
f' "canonical_architecture": "",\n'
f' "t3_task_context": "",\n'
f' "interface_contracts": {{}},\n'
f' "implementation_notes": []\n'
f'}}'
),
acceptance_criteria=["canonical_architecture present"],
context={"workstream_context": self._safe_ctx(ctx)},
retry_budget=budget,
preferred_runtime="standard",
agent_personality=lead_persona,
)
synth_brief.validate()
self.bb.create_brief(synth_brief, workstream_id=ws_def["id"])
self._log(f"[{ws_name}] T2 Lead synthesis spawning")
synth_result = await self._dispatch_with_retry(synth_brief)
synth_json = self._try_parse_json(synth_result)
self.bb.update_brief_result(synth_brief.brief_id, synth_json)
ctx["canonical_architecture"] = synth_json
self.bb.log_event(
"completed",
detail={"tier": 2, "workstream": ws_name, "phase": "synthesis"},
)
self._log(f"[{ws_name}] T2 synthesis done")
# Gate: t2_synthesis
if self._gates.get("t2_synthesis"):
await self._check_gate(
gate_name="t2_synthesis",
brief=synth_brief,
summary=f"T2 canonical architecture ready for {ws_name}",
what_happens_next="T3 Squad Lead(s) will spawn and negotiate task breakdown",
)
return ctx
# ------------------------------------------------------------------
# T3 — Squad Lead (with mesh coordination)
# ------------------------------------------------------------------
async def _run_t3(
self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int
) -> dict:
"""T3 Squad Lead — mesh draft/commit cycle, then returns task_list."""
ws_id = ws_def["id"]
ws_name = ws_def["name"]
domain = ws_def.get("domain", "default")
goal = workplan["goal_anchor"]
budget = int(self._retry_bad_output * retry_mul)
architecture = ctx.get("canonical_architecture") or ctx.get(
"t2_specialist_output", {}
)
persona = self._resolve_persona("t3", domain)
brief = TaskBrief(
run_id=self.run_id,
tier=3,
role="t3_squad_lead",
goal_anchor=goal,
workstream=ws_id,
task=(
f"{self._load_prompt('t3_squad_lead')}\n\n---\n\n"
f"T3 SQUAD LEAD — {ws_name}\n"
f"Goal: {goal}\n\n"
f"Architecture (truncated):\n"
f"{json.dumps(architecture, indent=2)[:3000]}\n\n"
f"Break the work into atomic T4 tasks. Declare dependencies.\n"
f"Pipeline tasks (sequential) set deps; swarm tasks (parallel) set deps=[].\n\n"
f"Return JSON:\n"
f'{{\n'
f' "task_list": [\n'
f' {{\n'
f' "id": "t4-<slug>",\n'
f' "task": "implement <specific thing>",\n'
f' "acceptance_criteria": [],\n'
f' "deps": [],\n'
f' "t5_type": "code|integration|api",\n'
f' "preferred_runtime": "coding_agent"\n'
f' }}\n'
f' ],\n'
f' "coordination_notes": ""\n'
f'}}'
),
acceptance_criteria=["task_list with at least one task"],
context={"workstream_context": self._safe_ctx(ctx)},
retry_budget=budget,
preferred_runtime="standard",
agent_personality=persona,
)
brief.validate()
self.bb.create_brief(brief, workstream_id=ws_id)
self._log(f"[{ws_name}] T3 Squad Lead spawning")
# Register draft in t3_task_lists
self.bb.create_t3_draft(workstream_id=ws_id, t3_agent_id=brief.brief_id)
t3_result = await self._dispatch_with_retry(brief)
t3_json = self._try_parse_json(t3_result)
task_list: list[dict] = t3_json.get("task_list", [])
if not task_list:
self._log(f"[{ws_name}] T3 returned empty task_list — using fallback task", level="warning")
task_list = [
{
"id": "t4-default",
"task": f"Implement {ws_name}: {goal}",
"acceptance_criteria": ["Implementation complete"],
"deps": [],
"t5_type": "code",
"preferred_runtime": "coding_agent",
}
]
# Commit task list (mesh — single T3 per workstream for now)
self.bb.commit_t3_task_list(
workstream_id=ws_id,
t3_agent_id=brief.brief_id,
tasks=task_list,
)
self.bb.update_brief_result(brief.brief_id, t3_json)
# T3 mesh timeout check (verify committed within deadline)
await self._await_t3_mesh_commit(ws_id, ws_name, ws_def, ctx, workplan, retry_mul)
ctx["t3_output"] = t3_json
ctx["task_list"] = task_list
self.bb.log_event("completed", detail={"tier": 3, "workstream": ws_name})
self._log(f"[{ws_name}] T3 done — {len(task_list)} task(s) declared")
# Gate: t3_plan
if self._gates.get("t3_plan"):
await self._check_gate(
gate_name="t3_plan",
brief=brief,
summary=f"T3 task breakdown for {ws_name}: {len(task_list)} task(s)",
what_happens_next=f"T4 workers will spawn for {len(task_list)} task(s)",
)
return ctx
async def _await_t3_mesh_commit(
self,
ws_id: str,
ws_name: str,
ws_def: dict,
ctx: dict,
workplan: dict,
retry_mul: int,
) -> None:
"""
Wait until all T3s in the domain have committed their task lists.
If timeout expires, escalate to T2 (domain boundary problem).
"""
deadline = time.monotonic() + self._t3_mesh_timeout_minutes * 60
poll = 3.0
while time.monotonic() < deadline:
if self.bb.all_t3_committed(ws_id):
return
await asyncio.sleep(poll)
# Timeout — escalate to T2
self._log(
f"[{ws_name}] T3 mesh timeout after {self._t3_mesh_timeout_minutes}m — "
f"escalating to T2",
level="warning",
)
self.bb.log_event(
"escalated",
detail={
"reason": "t3_mesh_timeout",
"workstream": ws_name,
"timeout_minutes": self._t3_mesh_timeout_minutes,
"t3_task_lists": self.bb.get_t3_task_lists(ws_id),
},
)
self.notify.send(
f"[agency] Run {self.run_id}: T3 mesh timeout for workstream {ws_name!r}. "
f"T3 squad leads failed to commit task lists within "
f"{self._t3_mesh_timeout_minutes}m. Re-running T2 to re-scope.",
context={"run_id": self.run_id, "workstream": ws_name},
)
# Re-run T2 to re-scope (consume retry budget)
domain = ws_def.get("domain", "default")
budget = int(self._retry_bad_output * retry_mul)
if budget > 0:
ctx = await self._run_t2(ws_def, ctx, workplan, max(1, retry_mul - 1))
else:
raise TerminalEscalationError(
f"T3 mesh timeout for {ws_name!r} and T2 retry budget exhausted"
)
# ------------------------------------------------------------------
# T4 — Implementers (swarm + pipeline)
# ------------------------------------------------------------------
async def _run_t4(
self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int
) -> dict:
"""Dispatch T4 tasks respecting dep ordering (pipeline) and parallelism (swarm)."""
ws_name = ws_def["name"]
task_list: list[dict] = ctx.get("task_list", [])
if not task_list:
self._log(f"[{ws_name}] No T4 tasks — skipping", level="warning")
return ctx
t4_results: dict[str, Any] = {}
completed_ids: set[str] = set()
pending = list(task_list)
max_rounds = len(pending) * 2 + 5
for _ in range(max_rounds):
if not pending:
break
ready = [
t for t in pending
if all(dep in completed_ids for dep in t.get("deps", []))
]
if not ready:
self._log(
f"[{ws_name}] T4 dependency deadlock — "
f"remaining: {[t['id'] for t in pending]}",
level="warning",
)
break
self._log(f"[{ws_name}] T4 spawning {len(ready)} task(s) in parallel")
results = await asyncio.gather(
*[
self._run_t4_task(task, ws_def, ctx, workplan, retry_mul)
for task in ready
],
return_exceptions=True,
)
for task, res in zip(ready, results):
tid = task["id"]
if isinstance(res, Exception):
self._log(
f"[{ws_name}] T4 task {tid!r} failed: {res}", level="error"
)
t4_results[tid] = {"status": "failed", "error": str(res)}
else:
t4_results[tid] = res
completed_ids.add(tid)
pending = [t for t in pending if t["id"] not in completed_ids]
ctx["t4_results"] = t4_results
self._log(f"[{ws_name}] T4 complete — {len(t4_results)} task(s)")
return ctx
async def _run_t4_task(
self,
task: dict,
ws_def: dict,
ctx: dict,
workplan: dict,
retry_mul: int,
) -> dict:
domain = ws_def.get("domain", "default")
persona = self._resolve_persona("t4", domain)
budget = int(self._retry_bad_output * retry_mul)
brief = TaskBrief(
run_id=self.run_id,
tier=4,
role="t4_implementer",
goal_anchor=workplan["goal_anchor"],
workstream=ws_def["id"],
task=(
f"{self._load_prompt('t4_implementer')}\n\n---\n\n"
f"{task.get('task', 'Implement task')}"
),
acceptance_criteria=task.get("acceptance_criteria", []),
context={
"task_id": task.get("id"),
"canonical_architecture": self._safe_ctx(
ctx.get("canonical_architecture", {})
),
},
retry_budget=budget,
preferred_runtime=task.get("preferred_runtime", "coding_agent"),
agent_personality=persona,
)
brief.validate()
self.bb.create_brief(brief, workstream_id=ws_def["id"])
result = await self._dispatch_with_retry(brief)
self.bb.update_brief_result(brief.brief_id, result)
self.bb.log_event(
"completed",
brief_id=brief.brief_id,
detail={"tier": 4, "task_id": task.get("id"), "workstream": ws_def["name"]},
)
return result
# ------------------------------------------------------------------
# T5 — Verifiers (fan-out + consensus)
# ------------------------------------------------------------------
async def _run_t5(
self, ws_def: dict, ctx: dict, workplan: dict, retry_mul: int
) -> dict:
"""Fan-out T5 verifiers, compute joint verdict."""
ws_name = ws_def["name"]
t4_results: dict[str, Any] = ctx.get("t4_results", {})
if not t4_results:
self._log(f"[{ws_name}] No T4 results to verify — skipping T5", level="warning")
return ctx
task_list: list[dict] = ctx.get("task_list", [])
task_map = {t["id"]: t for t in task_list}
budget = int(self._retry_bad_output * retry_mul)
self._log(f"[{ws_name}] T5 spawning {len(t4_results)} verifier(s)")
briefs: list[TaskBrief] = []
for task_id, t4_result in t4_results.items():
task_def = task_map.get(task_id, {})
t5_type = task_def.get("t5_type", "code")
persona = self._resolve_persona("t5", t5_type)
b = TaskBrief(
run_id=self.run_id,
tier=5,
role=f"t5_{t5_type}",
goal_anchor=workplan["goal_anchor"],
workstream=ws_def["id"],
task=(
f"{self._load_prompt('t5_verifier')}\n\n---\n\n"
f"T5 VERIFIER — verify: {task_def.get('task', task_id)}\n\n"
f"T4 output (truncated):\n{json.dumps(t4_result, indent=2)[:2000]}\n\n"
f"Acceptance criteria: {task_def.get('acceptance_criteria', [])}\n"
f"Goal: {workplan['goal_anchor']}\n\n"
f"Return JSON:\n"
f'{{\n'
f' "verifier_id": "{task_id}",\n'
f' "scope": "{task_id}",\n'
f' "verdict": "pass|fail",\n'
f' "issues": [],\n'
f' "notes": ""\n'
f'}}'
),
acceptance_criteria=["verdict is pass or fail"],
context={"task_id": task_id, "t4_result": self._safe_ctx(t4_result)},
retry_budget=budget,
preferred_runtime=task_def.get("preferred_runtime", "coding_agent"),
agent_personality=persona,
)
b.validate()
self.bb.create_brief(b, workstream_id=ws_def["id"])
briefs.append(b)
t5_results = await asyncio.gather(
*[self._dispatch_with_retry(b) for b in briefs],
return_exceptions=True,
)
parsed_results: list[dict] = []
for b, res in zip(briefs, t5_results):
if isinstance(res, Exception):
parsed_results.append(
{"verifier_id": b.brief_id, "verdict": "fail", "issues": [str(res)]}
)
else:
self.bb.update_brief_result(b.brief_id, res)
parsed_results.append(self._try_parse_json(res))
joint = self._compute_joint_verdict(parsed_results)
ctx["t5_results"] = parsed_results
ctx["joint_verdict"] = joint
self.bb.log_event(
"completed",
detail={
"tier": 5,
"workstream": ws_name,
"joint_verdict": joint.get("joint_verdict"),
},
)
self._log(
f"[{ws_name}] T5 joint verdict: {joint.get('joint_verdict')} "
f"({joint.get('summary')})"
)
# Gate: t5_verdict
if self._gates.get("t5_verdict") and briefs:
await self._check_gate(
gate_name="t5_verdict",
brief=briefs[0],
summary=f"T5 verdict for {ws_name}: {joint.get('joint_verdict')}{joint.get('summary')}",
what_happens_next="Workstream will be marked done if verdict is 'pass'",
)
verdict = joint.get("joint_verdict", "fail")
if verdict == "partial":
failed = joint.get("failed_scopes", [])
self._log(f"[{ws_name}] T5 partial — failed scopes: {failed}", level="warning")
self.bb.log_event(
"retried",
detail={"workstream": ws_name, "failed_scopes": failed},
)
elif verdict == "fail":
self._log(f"[{ws_name}] T5 all fail — escalating", level="warning")
self.bb.log_event(
"escalated",
detail={"workstream": ws_name, "verdict": verdict},
)
return ctx
def _compute_joint_verdict(self, t5_results: list[dict]) -> dict:
verdicts: list[str] = []
failed_scopes: list[str] = []
for r in t5_results:
v = r.get("verdict", "fail").lower()
scope = r.get("scope", r.get("verifier_id", "unknown"))
if v in ("pass", "done", "completed"):
verdicts.append("pass")
else:
verdicts.append("fail")
failed_scopes.append(scope)
if not verdicts:
joint = "fail"
elif all(v == "pass" for v in verdicts):
joint = "pass"
elif any(v == "pass" for v in verdicts):
joint = "partial"
else:
joint = "fail"
return {
"t5_results": t5_results,
"joint_verdict": joint,
"failed_scopes": failed_scopes,
"summary": f"{verdicts.count('pass')}/{len(verdicts)} verifiers passed",
}
# ------------------------------------------------------------------
# T1 Accept phase
# ------------------------------------------------------------------
async def _run_t1_accept(
self,
workplan: dict,
ws_results: dict,
branch: str,
base_branch: str,
) -> str:
"""T1 Phase 2 — validate output, create PR, notify."""
goal = workplan["goal_anchor"]
persona = self._resolve_persona("t1", "default")
brief = TaskBrief(
run_id=self.run_id,
tier=1,
role="t1_accept",
goal_anchor=goal,
workstream="t1",
task=(
f"PHASE 2 — ACCEPT\n\n"
f"Original goal: {goal}\n\n"
f"All workstreams have completed. Validate that the implementation "
f"aligns with the original goal anchor and that acceptance criteria "
f"are satisfied.\n\n"
f"Return JSON:\n"
f'{{\n'
f' "verdict": "pass|fail",\n'
f' "summary": "...",\n'
f' "issues": []\n'
f'}}'
),
acceptance_criteria=["verdict is pass or fail"],
context={"ws_results_summary": {k: "done" for k in ws_results}},
retry_budget=1,
preferred_runtime="standard",
agent_personality=persona,
)
brief.validate()
self.bb.create_brief(brief)
result = await self._dispatch_with_retry(brief)
accept_json = self._try_parse_json(result)
self.bb.update_brief_result(brief.brief_id, accept_json)
verdict = accept_json.get("verdict", "pass")
summary = accept_json.get("summary", "T1 accept complete")
if verdict == "fail":
issues = accept_json.get("issues", [])
self._log(f"T1 Accept verdict: fail — {issues}", level="warning")
self.bb.log_event(
"escalated",
detail={"tier": 1, "phase": "accept", "issues": issues},
)
# Create PR
pr_url = ""
if not self.dry_run:
try:
workstreams = workplan.get("workstreams", [])
ws_lines = "\n".join(
f"- **{ws['name']}** ({', '.join(ws.get('tier_path', []))}): "
f"{ws.get('notes', '')}"
for ws in workstreams
)
pr_body = (
f"## Run `{self.run_id}`\n\n"
f"**Goal:** {goal}\n\n"
f"**Complexity:** {workplan.get('complexity', 'unknown')}\n\n"
f"### Workstreams\n{ws_lines}\n\n"
f"### T1 Accept Summary\n{summary}\n\n"
f"---\n*Generated by the-agency — do not auto-merge.*"
)
pr_url = self.vcs.create_pr(
title=f"[agency] {self.run_id}: {goal[:55]}",
body=pr_body,
head=branch,
base=base_branch,
)
self._log(f"PR created: {pr_url}")
except Exception as exc:
self._log(f"Warning: PR creation failed: {exc}", level="warning")
self.notify.send(
f"[agency] Run {self.run_id} complete — "
f"verdict={verdict!r} PR={pr_url or '(none)'}\n"
f"Goal: {goal}",
context={
"run_id": self.run_id,
"goal": goal,
"verdict": verdict,
"pr_url": pr_url,
},
)
return pr_url
# ------------------------------------------------------------------
# Core dispatch
# ------------------------------------------------------------------
async def _dispatch_brief(self, brief: TaskBrief) -> dict:
"""
Dispatch a single brief to the appropriate runtime adapter.
Writes spawned/failed events; marks brief active.
Returns the raw result dict.
"""
runtime = self._select_runtime(brief)
task_text = self._build_task_prompt(brief)
capability = _TIER_CAPABILITY.get(brief.tier, "capable")
ctx: dict[str, Any] = {
"run_id": self.run_id,
"brief_id": brief.brief_id,
}
self.bb.update_brief_status(brief.brief_id, "active")
self.bb.log_event(
"spawned",
brief_id=brief.brief_id,
detail={
"tier": brief.tier,
"role": brief.role,
"runtime": type(runtime).__name__,
"dry_run": self.dry_run,
},
)
if self.dry_run:
self._log(
f"[DRY RUN] T{brief.tier} {brief.role!r}: "
f"{brief.task[:80].splitlines()[0]!r}"
)
return {"status": "done", "output": f"[dry-run T{brief.tier}/{brief.role}]"}
try:
agent_id = runtime.spawn(task_text, capability, ctx)
timeout_s = _TIER_TIMEOUT_S.get(brief.tier, 300)
result = runtime.get_result(agent_id, timeout_s)
except TimeoutError as exc:
self.bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": "timeout", "msg": str(exc)},
)
raise
except Exception as exc:
self.bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": str(exc)},
)
raise
# Attempt inline JSON parse of string output
if isinstance(result.get("output"), str):
raw = result["output"].strip()
try:
parsed = _extract_json(raw)
result = {**result, **parsed}
except ValueError:
result["raw_output"] = raw
result.setdefault("status", "done")
return result
async def _dispatch_with_retry(self, brief: TaskBrief) -> dict:
"""
Dispatch a brief and apply EscalationHandler retry/salvage/escalate logic.
Returns the final accepted result dict.
On escalation, returns result with ``escalated=True`` rather than raising.
"""
current = brief
while True:
result = await self._dispatch_brief(current)
decision = self._escalation.handle(current, result)
if decision.action == "complete":
return result
elif decision.action in ("retry", "salvage_and_retry"):
self._log(
f"T{brief.tier} {brief.role!r}: {decision.action}{decision.reason}"
)
self.bb.log_event(
"retried",
brief_id=current.brief_id,
detail={"action": decision.action, "reason": decision.reason},
)
self.bb.increment_brief_retry(current.brief_id)
current = decision.amended_brief # type: ignore[assignment]
self.bb.create_brief(current)
else: # escalate
self._log(
f"T{brief.tier} {brief.role!r}: escalating — {decision.reason}",
level="warning",
)
self.bb.log_event(
"escalated",
brief_id=current.brief_id,
detail={"reason": decision.reason},
)
self.bb.update_brief_status(current.brief_id, "failed")
return {**result, "escalated": True, "escalation_reason": decision.reason}
# ------------------------------------------------------------------
# Gate logic
# ------------------------------------------------------------------
async def _check_gate(
self,
gate_name: str,
brief: Optional[TaskBrief],
summary: str,
what_happens_next: str,
) -> None:
"""
Halt execution and wait for human gate approval.
Writes gate_pending, notifies, then polls the blackboard for
gate_approved or gate_rejected. gate_approved → continue;
gate_rejected → raise GateRejectedError; timeout → raise GateTimeoutError.
"""
brief_id = brief.brief_id if brief else None
gate_pending_time = _now_iso()
self._log(f"Gate {gate_name!r}{summary}")
self.bb.log_event(
"gate_pending",
brief_id=brief_id,
detail={
"gate": gate_name,
"summary": summary,
"what_happens_next": what_happens_next,
},
)
self.notify.send(
f"[agency] GATE {gate_name!r}{summary}\n"
f"Run: {self.run_id}\n"
f"Next: {what_happens_next}\n\n"
f" agency approve {self.run_id} # continue\n"
f" agency reject {self.run_id} --reason '...' # re-invoke tier",
context={
"run_id": self.run_id,
"gate": gate_name,
"summary": summary,
"what_happens_next": what_happens_next,
},
)
deadline = time.monotonic() + self._gate_timeout_minutes * 60
while time.monotonic() < deadline:
ev = self.bb.get_latest_gate_event(gate_name, after_iso=gate_pending_time)
if ev:
kind = ev["kind"]
detail: dict = {}
try:
detail = json.loads(ev.get("detail") or "{}")
except (json.JSONDecodeError, TypeError):
pass
if kind == "gate_approved":
self._log(f"Gate {gate_name!r} approved")
return
if kind == "gate_rejected":
reason = detail.get("reason", "no reason given")
self._log(f"Gate {gate_name!r} rejected: {reason}", level="warning")
raise GateRejectedError(gate=gate_name, reason=reason)
await asyncio.sleep(_GATE_POLL_INTERVAL_S)
# Timeout → auto-reject
self._log(
f"Gate {gate_name!r} timed out after {self._gate_timeout_minutes}m — auto-rejecting",
level="warning",
)
self.bb.log_event(
"gate_rejected",
brief_id=brief_id,
detail={
"gate": gate_name,
"reason": "gate_timeout",
"timeout_minutes": self._gate_timeout_minutes,
},
)
self.notify.send(
f"[agency] Run {self.run_id}: gate {gate_name!r} timed out — run paused",
context={"run_id": self.run_id, "gate": gate_name},
)
raise GateTimeoutError(gate=gate_name, timeout_minutes=self._gate_timeout_minutes)
# ------------------------------------------------------------------
# Path amendment monitor
# ------------------------------------------------------------------
async def monitor_path_amendments(self) -> None:
"""
Background task: poll for path_amendment events and relay to
the relevant higher tier via a log event + notification.
Run this as an asyncio background task during the pipeline if needed.
"""
seen_ids: set[str] = set()
while True:
events = self.bb.get_events(kinds=["path_amendment"])
for ev in events:
if ev["event_id"] in seen_ids:
continue
seen_ids.add(ev["event_id"])
try:
detail = json.loads(ev.get("detail") or "{}")
except (json.JSONDecodeError, TypeError):
detail = {}
proposed_by = detail.get("proposed_by", "unknown")
reason = detail.get("reason", "")
amendment = detail.get("amendment", {})
self._log(
f"Path amendment from {proposed_by!r}: {reason}",
level="warning",
)
self.notify.send(
f"[agency] Path amendment — {proposed_by}: {reason}\n"
f"Amendment: {json.dumps(amendment)}\n"
f"Run: {self.run_id}",
context={
"run_id": self.run_id,
"proposed_by": proposed_by,
"reason": reason,
"amendment": amendment,
},
)
await asyncio.sleep(_PATH_AMENDMENT_POLL_INTERVAL_S)
# ------------------------------------------------------------------
# Runtime selection
# ------------------------------------------------------------------
def _select_runtime(self, brief: TaskBrief) -> Any:
"""Select the runtime adapter for a brief."""
tier_key = f"t{brief.tier}"
override = self._tier_runtime_map.get(tier_key)
if override == "coding_agent" or (
not override and brief.preferred_runtime == "coding_agent"
):
return self._coding_runtime
return self._default_runtime
# ------------------------------------------------------------------
# Persona / prompt helpers
# ------------------------------------------------------------------
def _resolve_persona(self, tier_key: str, domain: str) -> Optional[str]:
"""Return the agent personality file path, or None if not found."""
tier_entries = self.role_registry.get(tier_key, {})
path = tier_entries.get(domain) or tier_entries.get("default")
if path and os.path.exists(path):
return path
if path:
logger.debug("Persona file not found at %s — using no personality", path)
return None
def _load_prompt(self, prompt_name: str) -> str:
"""Load a fallback tier prompt from prompts/."""
path = f"prompts/{prompt_name}.md"
try:
return Path(path).read_text()
except FileNotFoundError:
return ""
def _build_task_prompt(self, brief: TaskBrief) -> str:
"""Compose the full task string sent to the runtime."""
parts: list[str] = []
if brief.agent_personality:
try:
personality = Path(brief.agent_personality).read_text()
parts.append(f"<system>\n{personality}\n</system>\n")
except FileNotFoundError:
logger.warning("Personality file not found: %s", brief.agent_personality)
parts.append(brief.task)
if brief.context:
try:
ctx_json = json.dumps(brief.context, indent=2)
parts.append(f"\n\nContext:\n{ctx_json}")
except (TypeError, ValueError):
pass
if brief.acceptance_criteria:
criteria = "\n".join(f"- {c}" for c in brief.acceptance_criteria)
parts.append(f"\n\nAcceptance criteria:\n{criteria}")
return "\n".join(parts)
# ------------------------------------------------------------------
# Result parsing helpers
# ------------------------------------------------------------------
def _parse_json_result(self, result: dict, brief: TaskBrief) -> dict:
"""Extract a JSON dict from an agent result, raising on failure."""
# Already parsed inline in _dispatch_brief
for key in ("workstreams", "goal_anchor", "complexity", "run_id", "task_list"):
if key in result:
return result
raw = result.get("raw_output") or result.get("output", "")
if isinstance(raw, str) and raw.strip():
try:
return _extract_json(raw)
except ValueError:
pass
raise ValueError(
f"Could not parse JSON workplan from T{brief.tier}/{brief.role} result. "
f"raw={str(result)[:300]!r}"
)
def _try_parse_json(self, result: dict) -> dict:
"""
Best-effort JSON extraction from an agent result.
Returns result as-is if it already looks like structured output,
otherwise attempts to parse raw_output/output.
"""
if isinstance(result, dict) and len(result) > 2:
return result
raw = result.get("raw_output") or result.get("output", "")
if isinstance(raw, str) and raw.strip():
try:
return _extract_json(raw)
except ValueError:
pass
return result
# ------------------------------------------------------------------
# Formatting helpers
# ------------------------------------------------------------------
def _format_workplan_summary(self, workplan: dict) -> str:
workstreams = workplan.get("workstreams", [])
complexity = workplan.get("complexity", "unknown")
lines = [
f"Complexity: {complexity}",
f"Retry budget multiplier: {workplan.get('retry_budget_multiplier', 1)}x",
f"Workstreams ({len(workstreams)}):",
]
for ws in workstreams:
path_str = "".join(ws.get("tier_path", []))
lines.append(f" [{ws.get('parallel_group', '?')}] {ws.get('name', ws.get('id'))}{path_str}")
critique = workplan.get("self_critique_summary", "")
if critique:
lines.append(f"Self-critique: {critique}")
return "\n".join(lines)
@staticmethod
def _safe_ctx(ctx: Any, max_len: int = 4000) -> Any:
"""Truncate a context value to avoid token overflow in prompts."""
if isinstance(ctx, dict):
try:
s = json.dumps(ctx)
if len(s) <= max_len:
return ctx
return {"_truncated": True, "preview": s[:max_len]}
except (TypeError, ValueError):
return str(ctx)[:max_len]
return ctx
# ------------------------------------------------------------------
# Logging
# ------------------------------------------------------------------
def _log(self, message: str, level: str = "info") -> None:
"""Write a log event to the blackboard and to the Python logger."""
try:
self.bb.log_event("log", detail={"level": level, "message": message})
except Exception:
pass
getattr(logger, level if level in ("debug", "info", "warning", "error") else "info")(
"[%s] %s", self.run_id, message
)
# ---------------------------------------------------------------------------
# CLI entry point (thin wrapper — full CLI in cli/agency.py)
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
parser = argparse.ArgumentParser(description="Run the-agency pipeline.")
parser.add_argument("--config", default="config/team.yaml")
parser.add_argument("--registry", default="config/role_registry.yaml")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
runner = TeamRunner(
config_path=args.config,
role_registry_path=args.registry,
dry_run=args.dry_run,
)
runner.run()