Files
the-agency/core/team_runner.py
Hans Heinemann 45e3b7663e fix: lazy-import anthropic SDK; tolerate LLM adapter failure in dry-run mode
--dry-run was crashing with ModuleNotFoundError because:
1. adapters/llm/anthropic.py imported 'anthropic' at module level
2. TeamRunner.__init__ always built the LLM adapter regardless of dry_run flag

Fixes:
- Move 'import anthropic' inside AnthropicAdapter.__init__ (lazy import)
  so the module loads cleanly without the SDK installed
- In TeamRunner.__init__, wrap _build_llm in a try/except when dry_run=True
  so missing adapter deps are logged as warnings, not fatal errors

dry-run now works with no third-party packages installed.
2026-03-16 01:03:17 -04:00

833 lines
31 KiB
Python

"""
core/team_runner.py
Top-level orchestration entry point for the-agency pipeline.
The TeamRunner loads team.yaml, builds the adapter registry, and drives the
full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling.
Runtime adapters are config-driven: every string-valued key in the top-level
``runtime:`` section of team.yaml is instantiated as a RuntimeAdapter and
stored in ``self._runtimes[name]``. Non-string values (e.g. ``native_teams:
false``) are silently skipped. Dispatch routing uses
``brief.preferred_runtime`` to look up the right adapter at call time.
CLI usage::
python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose]
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import uuid
from typing import Optional
import yaml
from core.blackboard import Blackboard
from core.escalation import EscalationHandler
from core.task_brief import TaskBrief
import importlib
from adapters.base.llm import LLMAdapter
from adapters.base.notify import NotifyAdapter
from adapters.base.runtime import RuntimeAdapter
from adapters.base.vcs import VCSAdapter
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Maps tier number → prompt file path (relative to project root).
_TIER_PROMPTS: dict[int, str] = {
1: "prompts/t1_visionary.md",
2: "prompts/t2_architect.md",
3: "prompts/t3_squad_lead.md",
4: "prompts/t4_implementer.md",
5: "prompts/t5_verifier.md",
}
# Maps tier number → LLM capability hint.
_TIER_CAPABILITIES: dict[int, str] = {
1: "reasoning-heavy",
2: "reasoning-heavy",
3: "capable",
4: "capable",
5: "fast-cheap",
}
# ---------------------------------------------------------------------------
# Adapter registries
#
# Values are "module.path:ClassName" strings resolved lazily via importlib.
# To add a new adapter, append an entry here — no changes to TeamRunner needed.
# team.yaml may also supply a full "module.path:ClassName" value directly,
# enabling third-party adapters without touching this file.
# ---------------------------------------------------------------------------
_LLM_ADAPTERS: dict[str, str] = {
"anthropic": "adapters.llm.anthropic:AnthropicAdapter",
}
_VCS_ADAPTERS: dict[str, str] = {
"github": "adapters.vcs.github:GitHubAdapter",
}
_NOTIFY_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.notify.openclaw:OpenClawNotifyAdapter",
}
_RUNTIME_ADAPTERS: dict[str, str] = {
"openclaw": "adapters.runtime.openclaw:OpenClawRuntimeAdapter",
"claude_code": "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter",
}
def _load_adapter_class(key: str, registry: dict[str, str], label: str) -> type:
"""
Resolve a short name or dotted "module:ClassName" path to an adapter class.
Resolution order:
1. If *key* is in *registry*, use the mapped dotted path.
2. Otherwise, treat *key* itself as a dotted path (custom / third-party).
"""
dotted = registry.get(key, key)
if ":" not in dotted:
raise ValueError(
f"Unknown {label} adapter {key!r}. "
f"Built-in choices: {list(registry)}. "
f"Or supply a full 'module.path:ClassName' value in team.yaml."
)
module_path, class_name = dotted.rsplit(":", 1)
try:
module = importlib.import_module(module_path)
except ModuleNotFoundError as exc:
raise ImportError(
f"Cannot import {label} adapter module {module_path!r}: {exc}"
) from exc
try:
return getattr(module, class_name)
except AttributeError:
raise ImportError(
f"Module {module_path!r} has no class {class_name!r}"
)
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class EscalationError(RuntimeError):
"""Raised when a brief escalates past its retry budget with no recovery."""
# ---------------------------------------------------------------------------
# TeamRunner
# ---------------------------------------------------------------------------
class TeamRunner:
"""
Orchestrates a full T1→T5 agent pipeline run.
Usage::
runner = TeamRunner(config_path="config/team.yaml")
runner.run()
Dry-run mode logs all planned actions but skips LLM calls, VCS commits,
and notifications::
runner = TeamRunner(config_path="config/team.yaml", dry_run=True)
runner.run()
"""
def __init__(
self,
config_path: str = "config/team.yaml",
dry_run: bool = False,
) -> None:
"""
Load configuration and instantiate adapters.
Parameters
----------
config_path : Path to team.yaml.
dry_run : When True, skip LLM calls, VCS commits, and notifications.
All planned actions are logged at INFO level.
Runtime adapters are built from the top-level ``runtime:`` section of
team.yaml. Each string-valued key becomes an entry in
``self._runtimes``; non-string values (e.g. ``native_teams: false``)
are ignored. Adding a new runtime type requires only a new key in
team.yaml — no changes to TeamRunner are needed.
"""
self._dry_run = dry_run
self._config = self._load_yaml(config_path)
self._role_registry = self._load_yaml("config/role_registry.yaml")
self._escalation = EscalationHandler()
run_id = str(uuid.uuid4())
self._bb = Blackboard(run_id=run_id)
# Build adapters — VCS and notify are optional and swallow init errors.
adapter_cfg: dict = self._config.get("adapters", {})
runtime_cfg: dict = self._config.get("runtime", {})
if dry_run:
# In dry-run mode the LLM adapter is never actually called, so we
# tolerate missing dependencies (e.g. 'anthropic' SDK not installed).
try:
self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic"))
except (ImportError, ValueError) as exc:
logger.warning(
"LLM adapter unavailable in dry-run mode (%s) — continuing.", exc
)
self._llm = None # type: ignore[assignment]
else:
self._llm = self._build_llm(adapter_cfg.get("llm", "anthropic"))
self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment]
_VCS_ADAPTERS, adapter_cfg.get("vcs"), "VCS"
)
self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment]
_NOTIFY_ADAPTERS, adapter_cfg.get("notify"), "notify"
)
# Runtime adapters are fully config-driven — one entry per string-valued
# key in the top-level ``runtime:`` section of team.yaml.
self._runtimes: dict[str, RuntimeAdapter] = self._build_runtimes(runtime_cfg)
logger.info(
"TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run
)
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
@staticmethod
def _load_yaml(path: str) -> dict:
with open(path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
@staticmethod
def _load_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as fh:
return fh.read()
def _build_llm(self, key: str) -> LLMAdapter:
cls = _load_adapter_class(key, _LLM_ADAPTERS, "LLM")
return cls(self._config)
def _build_optional(
self,
registry: dict[str, str],
key: Optional[str],
label: str,
) -> Optional[object]:
"""Build an optional adapter, returning None on any init error."""
if not key:
return None
try:
cls = _load_adapter_class(key, registry, label)
return cls(self._config)
except (ImportError, ValueError) as exc:
logger.warning("Unknown %s adapter %r — skipping. (%s)", label, key, exc)
return None
except Exception as exc:
logger.warning(
"%s adapter %r could not be initialised (%s) — skipping.",
label,
key,
exc,
)
return None
def _build_runtime(self, key: str) -> RuntimeAdapter:
cls = _load_adapter_class(key, _RUNTIME_ADAPTERS, "runtime")
return cls(self._config)
def _build_runtimes(self, runtime_cfg: dict) -> dict[str, RuntimeAdapter]:
"""
Build a name → RuntimeAdapter mapping from the ``runtime:`` config block.
Every key whose value is a string is treated as a runtime adapter name
and instantiated via ``_build_runtime``. Non-string values (e.g.
``native_teams: false``) are skipped so that boolean/numeric control
flags can coexist in the same config section.
"""
runtimes: dict[str, RuntimeAdapter] = {}
for name, value in runtime_cfg.items():
if not isinstance(value, str):
continue
runtimes[name] = self._build_runtime(value)
return runtimes
# ------------------------------------------------------------------
# Role registry
# ------------------------------------------------------------------
def _resolve_personality(self, tier: int, role: str) -> Optional[str]:
"""Return the path to the agent persona .md file, or None."""
tier_key = f"t{tier}"
tier_map: dict = self._role_registry.get(tier_key, {})
path = tier_map.get(role) or tier_map.get("default")
if path and os.path.isfile(path):
return path
return None
# ------------------------------------------------------------------
# Prompt helpers
# ------------------------------------------------------------------
def _load_tier_prompt(self, tier: int) -> str:
"""Load the system prompt for a tier from the prompts/ directory."""
path = _TIER_PROMPTS.get(tier, "")
if path and os.path.isfile(path):
return self._load_text(path)
logger.warning("Tier %d prompt not found at %r", tier, path)
return ""
def _load_personality(self, path: Optional[str]) -> str:
if path and os.path.isfile(path):
return self._load_text(path)
return ""
@staticmethod
def _extract_json(text: str) -> dict:
"""
Extract a JSON object from a potentially markdown-wrapped LLM response.
Strips leading/trailing markdown fences (```json ... ```) then parses.
Falls back to a regex scan for the first ``{...}`` block if plain
parsing fails.
"""
text = text.strip()
# Strip markdown fences.
if text.startswith("```"):
text = re.sub(r"^```[a-z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text.strip())
try:
return json.loads(text)
except json.JSONDecodeError:
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
raise ValueError(
"Could not parse JSON from LLM response.\n"
f"Response (first 500 chars): {text[:500]}"
)
# ------------------------------------------------------------------
# Brief dispatch
# ------------------------------------------------------------------
def _dispatch_brief(self, brief: TaskBrief) -> dict:
"""
Send a TaskBrief to the appropriate agent and return the raw result dict.
Routing
-------
preferred_runtime == "standard" or empty/None → LLM adapter directly
Otherwise → look up self._runtimes[preferred_runtime]; falls back to
self._runtimes["default"] and then to LLM if no runtime is found.
Blackboard events emitted: spawned → completed | failed.
"""
if self._dry_run:
logger.info(
"[DRY-RUN] dispatch tier=%d role=%s task=%.80s",
brief.tier,
brief.role,
brief.task,
)
return {"status": "done", "output": "{}", "artifacts": []}
self._bb.update_brief_status(brief.brief_id, "active")
self._bb.log_event(
"spawned",
brief_id=brief.brief_id,
detail={"tier": brief.tier, "role": brief.role},
)
try:
pref = brief.preferred_runtime
if not pref or pref == "standard":
result = self._dispatch_via_llm(brief)
else:
runtime = self._runtimes.get(pref) or self._runtimes.get("default")
if runtime is None:
logger.warning(
"No runtime adapter found for %r (and no 'default') — "
"falling back to LLM for brief %s",
pref,
brief.brief_id,
)
result = self._dispatch_via_llm(brief)
else:
result = self._dispatch_via_runtime(brief, runtime)
self._bb.update_brief_result(brief.brief_id, result)
self._bb.log_event(
"completed",
brief_id=brief.brief_id,
detail={"status": result.get("status")},
)
return result
except Exception as exc:
self._bb.update_brief_status(brief.brief_id, "failed")
self._bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": str(exc)},
)
raise
def _dispatch_via_llm(self, brief: TaskBrief) -> dict:
"""Call the LLM adapter with the tier system prompt + brief JSON."""
tier_prompt = self._load_tier_prompt(brief.tier)
personality = self._load_personality(brief.agent_personality)
system_prompt = "\n\n".join(filter(None, [tier_prompt, personality]))
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
user_message = json.dumps(brief.to_dict(), indent=2)
raw = self._llm.complete(
prompt=user_message,
capability=capability,
context={
"system_prompt": system_prompt,
},
)
return self._extract_json(raw)
def _dispatch_via_runtime(self, brief: TaskBrief, runtime: RuntimeAdapter) -> dict:
"""Spawn an agent via *runtime* and collect its result."""
task_str = json.dumps(brief.to_dict(), indent=2)
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
timeout_s: int = brief.context.get("timeout_s", 300)
agent_id = runtime.spawn(
task=task_str,
capability=capability,
context=brief.context,
)
logger.info(
"Spawned runtime agent %s for brief %s", agent_id, brief.brief_id
)
result = runtime.get_result(agent_id, timeout_s=timeout_s)
# Attempt to parse JSON from the agent's text output.
if isinstance(result.get("output"), str) and result["output"].strip():
try:
parsed = self._extract_json(result["output"])
result.update(parsed)
except ValueError:
pass # Keep raw string output as-is.
return result
# ------------------------------------------------------------------
# Escalation loop
# ------------------------------------------------------------------
def _run_with_escalation(
self,
brief: TaskBrief,
workstream_id: Optional[str] = None,
) -> dict:
"""
Dispatch a brief and apply the escalation policy until done or exhausted.
On retry the amended brief is persisted to the Blackboard before
being re-submitted.
"""
while True:
result = self._dispatch_brief(brief)
decision = self._escalation.handle(brief, result)
if decision.action == "complete":
return result
if decision.action == "escalate":
self._bb.log_event(
"escalated",
brief_id=brief.brief_id,
detail={"reason": decision.reason},
)
raise EscalationError(
f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) "
f"escalated: {decision.reason}"
)
# "retry" or "salvage_and_retry"
self._bb.log_event(
"retried",
brief_id=brief.brief_id,
detail={"reason": decision.reason, "action": decision.action},
)
amended = decision.amended_brief
if amended is None:
raise EscalationError(
f"Escalation returned action={decision.action!r} "
"but no amended_brief was provided."
)
# Persist the new brief and loop.
self._bb.create_brief(amended, workstream_id=workstream_id)
brief = amended
# ------------------------------------------------------------------
# Tier output parsers
# ------------------------------------------------------------------
def _parse_t1_output(
self, result: dict, root_brief: TaskBrief
) -> list[TaskBrief]:
"""Build T2 TaskBriefs from T1 (Visionary) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
workstreams: list[dict] = result.get("workstreams", [])
# T1 sets the canonical goal_anchor; propagate it back to root.
goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor
root_brief.goal_anchor = goal_anchor
briefs: list[TaskBrief] = []
for ws in workstreams:
role = ws.get("role", "default")
brief = root_brief.make_child_brief(
tier=2,
role=role,
task=ws.get("task", ""),
workstream=ws.get("name", ""),
acceptance_criteria=ws.get("acceptance_criteria", []),
preferred_runtime="standard",
agent_personality=self._resolve_personality(2, role),
retry_budget=retry_bad,
)
briefs.append(brief)
return briefs
def _parse_t2_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T3 TaskBriefs from T2 (Architect) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
subtasks: list[dict] = result.get("subtasks", [])
arch_summary: str = result.get("architecture_summary", "")
briefs: list[TaskBrief] = []
for st in subtasks:
role = st.get("role", "default")
brief = parent.make_child_brief(
tier=3,
role=role,
task=st.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=st.get("acceptance_criteria", []),
preferred_runtime=st.get("preferred_runtime", "standard"),
agent_personality=self._resolve_personality(3, role),
retry_budget=retry_bad,
context={"architecture_summary": arch_summary},
)
briefs.append(brief)
return briefs
def _parse_t3_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T4 TaskBriefs from T3 (Squad Lead) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
tasks: list[dict] = result.get("tasks", [])
plan_summary: str = result.get("plan_summary", "")
briefs: list[TaskBrief] = []
for task in tasks:
role = task.get("role", "default")
pref_runtime = task.get("preferred_runtime", "standard")
brief = parent.make_child_brief(
tier=4,
role=role,
task=task.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=task.get("acceptance_criteria", []),
preferred_runtime=pref_runtime,
agent_personality=self._resolve_personality(4, role),
retry_budget=retry_bad,
context={
"plan_summary": plan_summary,
"depends_on": task.get("depends_on", []),
},
)
briefs.append(brief)
return briefs
# ------------------------------------------------------------------
# VCS helpers
# ------------------------------------------------------------------
def _commit_artifacts(
self,
artifacts: list[dict],
brief: TaskBrief,
) -> None:
"""Commit T4 *file* artifacts to the configured VCS adapter."""
if not self._vcs or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would commit %d artifact(s) for brief %s",
len(artifacts),
brief.brief_id,
)
return
file_map: dict[str, str] = {
a["path"]: a["content"]
for a in artifacts
if a.get("type") == "file"
and a.get("path")
and a.get("content") is not None
}
if not file_map:
return
branch: str = self._config.get("run", {}).get("base_branch", "main")
message = (
f"feat({brief.workstream}): artifacts from {brief.role} "
f"[brief {brief.brief_id[:8]}]"
)
try:
# GitHubAdapter.commit accepts dict[str, str] as files.
sha = self._vcs.commit(file_map, message) # type: ignore[call-arg]
logger.info(
"Committed %d artifact(s) → SHA %s", len(file_map), sha
)
except Exception as exc:
logger.warning("VCS commit failed: %s", exc)
# ------------------------------------------------------------------
# Notification
# ------------------------------------------------------------------
def _notify_run(self, outcome: str, goal: str, detail: dict) -> None:
if not self._notify or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal
)
return
level = "info" if outcome == "complete" else "error"
if outcome == "complete":
message = f"Pipeline complete: {goal[:80]}"
else:
message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}"
self._notify.send(
message,
context={
"level": level,
"run_id": self._bb.run_id,
"outcome": outcome,
**{k: str(v) for k, v in detail.items()},
},
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(self) -> None:
"""
Execute the full T1→T5 pipeline.
Steps
-----
1. Dispatch T1 Visionary to decompose the goal into workstreams.
2. For each workstream: T2 Architect → T3 Squad Lead →
T4 Implementer → T5 Verifier.
3. Commit passing T4 artifacts via VCS adapter (if configured).
4. Notify on completion or terminal failure via notify adapter.
"""
goal: str = self._config["run"]["goal"]
self._bb.create_run(goal=goal)
self._bb.update_run_status("active")
logger.info("Pipeline started — goal: %s", goal)
try:
self._orchestrate(goal)
self._bb.update_run_status("done")
summary = self._bb.get_run_summary()
logger.info("Pipeline complete. Summary: %s", summary)
self._notify_run("complete", goal, summary)
except Exception as exc:
self._bb.update_run_status("failed")
logger.error("Pipeline failed: %s", exc, exc_info=True)
self._notify_run("failed", goal, {"error": str(exc)})
raise
finally:
self._bb.close()
# ------------------------------------------------------------------
# Internal orchestration
# ------------------------------------------------------------------
def _orchestrate(self, goal: str) -> None:
"""Build the root T1 brief, dispatch it, and fan out per workstream."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
# ---- T1: Visionary ----
t1_brief = TaskBrief(
run_id=self._bb.run_id,
tier=1,
role="default",
goal_anchor=goal,
task=(
"You are the T1 Visionary. "
"Decompose the following goal into parallel workstreams "
f"for the engineering team: {goal}"
),
workstream="root",
retry_budget=retry_bad,
preferred_runtime="standard",
agent_personality=self._resolve_personality(1, "default"),
)
self._bb.create_brief(t1_brief)
t1_result = self._run_with_escalation(t1_brief)
t2_briefs = self._parse_t1_output(t1_result, t1_brief)
logger.info("T1 produced %d workstream(s)", len(t2_briefs))
# ---- T2..T5: per workstream ----
for t2_brief in t2_briefs:
ws_id = self._bb.create_workstream(
name=t2_brief.workstream, tier=2
)
self._bb.create_brief(t2_brief, workstream_id=ws_id)
self._bb.update_workstream_status(ws_id, "active")
try:
self._run_workstream(t2_brief, ws_id)
self._bb.update_workstream_status(ws_id, "done")
except EscalationError as exc:
self._bb.update_workstream_status(ws_id, "failed")
self._bb.log_event(
"failed",
detail={"error": str(exc), "workstream": t2_brief.workstream},
)
logger.error(
"Workstream %r failed: %s", t2_brief.workstream, exc
)
def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None:
"""Drive T2 → T3 → T4 → T5 for a single workstream."""
# T2: Architect
t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id)
t3_briefs = self._parse_t2_output(t2_result, t2_brief)
logger.info(
"T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs)
)
for t3_brief in t3_briefs:
self._bb.create_brief(t3_brief, workstream_id=ws_id)
try:
# T3: Squad Lead
t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id)
t4_briefs = self._parse_t3_output(t3_result, t3_brief)
logger.info(
"T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs)
)
for t4_brief in t4_briefs:
self._bb.create_brief(t4_brief, workstream_id=ws_id)
try:
# T4: Implementer
t4_result = self._run_with_escalation(
t4_brief, workstream_id=ws_id
)
artifacts: list[dict] = t4_result.get("artifacts", [])
# T5: Verifier
t5_brief = t4_brief.make_child_brief(
tier=5,
role="code",
task=(
"Verify the following T4 implementation artifacts "
"against all acceptance criteria. "
f"T4 output: {json.dumps(t4_result)[:2000]}"
),
workstream=t4_brief.workstream,
acceptance_criteria=t4_brief.acceptance_criteria,
preferred_runtime="standard",
agent_personality=self._resolve_personality(5, "code"),
retry_budget=self._config.get(
"retry_defaults", {}
).get("bad_output", 3),
context={"t4_result": t4_result},
)
self._bb.create_brief(t5_brief, workstream_id=ws_id)
t5_result = self._run_with_escalation(
t5_brief, workstream_id=ws_id
)
# Commit on verified pass.
if t5_result.get("status") in ("passed", "done"):
self._commit_artifacts(artifacts, t4_brief)
except EscalationError as exc:
logger.error(
"T4/T5 escalation in %s: %s", t4_brief.role, exc
)
except EscalationError as exc:
logger.error("T3 escalation in %s: %s", t3_brief.role, exc)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def _configure_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the-agency T1→T5 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config",
default="config/team.yaml",
help="Path to team.yaml configuration file.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help=(
"Log all planned actions without executing LLM calls, "
"VCS commits, or notifications."
),
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging.",
)
args = parser.parse_args()
_configure_logging(args.verbose)
runner = TeamRunner(config_path=args.config, dry_run=args.dry_run)
runner.run()