Compare commits

...

20 Commits

Author SHA1 Message Date
5b0d00a799 feat(config): T3 backend/frontend → senior devs; update agency-agents submodule
T3 squad leads now use dedicated senior roles instead of sharing
architects with T2:
- T3 backend: backend-architect → senior-backend-developer
- T3 frontend: frontend-architect → senior-frontend-developer

Clean tier separation:
  T2 = architects (design)
  T3 = senior devs (lead + implement-or-delegate)
  T4 = developers (pure implementation)

Depends on: coding-with-hans-heinemann/agency-agents#2
2026-03-16 11:31:32 -04:00
461b36bc5d fix(config): T2 backend → backend-architect (per review) 2026-03-16 11:13:19 -04:00
9c1794c58a fix(config): T3 backend → backend-architect; update agency-agents submodule
- T3 backend squad lead should be an architect, not a developer (per review)
- Submodule updated to include frontend-architect + backend-developer roles
2026-03-16 10:36:33 -04:00
8adab6fbc5 refactor(config): replace senior-developer with role-specific specialists
- T2 frontend: software-architect → frontend-architect
- T3 backend: senior-developer → backend-developer
- T3 frontend: senior-developer → frontend-architect
- T3 default: senior-developer → backend-developer
- T4 backend: backend-architect → backend-developer (implementation, not architecture)
- T4 default: senior-developer → backend-developer

Depends on: coding-with-hans-heinemann/agency-agents#1 (new agent roles)
2026-03-16 10:11:47 -04:00
d02faf5cac feat(config): expand role_registry + fix T4 default runtime
Role registry additions:
- T2: add ai, security, mobile domains
- T3: add data, ai, security, mobile, database, devops, docs domains
- T4: add data (data-engineer), embedded (firmware-engineer)
- T5: add accessibility, e2e, frontend, data verifier roles

TeamRunner consistency fix:
- T4 briefs now default to 'coding_agent' runtime (Claude Code)
  per build spec: 'Claude Code as default T4 runtime'
  T3 can still override preferred_runtime per task
2026-03-16 09:14:22 -04:00
c70448b61c chore: sync agency-agents submodule with upstream 2026-03-16 09:00:15 -04:00
aef553bdc8 refactor: T4 fast-cheap; move adapter registries from code to team.yaml 2026-03-16 01:14:35 -04:00
8277a00118 perf(team_runner): T4 capability fast-cheap — implementers produce structured JSON, don't need capable tier 2026-03-16 01:14:08 -04:00
45e3b7663e fix: lazy-import anthropic SDK; tolerate LLM adapter failure in dry-run mode
--dry-run was crashing with ModuleNotFoundError because:
1. adapters/llm/anthropic.py imported 'anthropic' at module level
2. TeamRunner.__init__ always built the LLM adapter regardless of dry_run flag

Fixes:
- Move 'import anthropic' inside AnthropicAdapter.__init__ (lazy import)
  so the module loads cleanly without the SDK installed
- In TeamRunner.__init__, wrap _build_llm in a try/except when dry_run=True
  so missing adapter deps are logged as warnings, not fatal errors

dry-run now works with no third-party packages installed.
2026-03-16 01:03:17 -04:00
7b1cf7315c refactor(team_runner): make runtimes config-driven — replace hardcoded slots with dict
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 00:50:17 -04:00
71316b3090 refactor(team_runner): replace static adapter imports with dynamic importlib loading
Concrete adapter classes (AnthropicAdapter, GitHubAdapter, etc.) are no
longer imported at the top of team_runner.py. Instead, each registry maps
short names to 'module.path:ClassName' strings resolved lazily via
importlib.import_module at instantiation time.

This means:
- Adding a new adapter requires only an entry in the registry string dict
  (or a full dotted path directly in team.yaml) — no changes to TeamRunner.
- Third-party / custom adapters work out of the box: set e.g.
  adapters.llm: mypackage.llm.openai:OpenAIAdapter in team.yaml.
- The runner no longer hard-wires knowledge of which concrete classes exist.

Addresses tandrewng review comment on PR #1.
2026-03-16 00:30:28 -04:00
bd96a83069 fix: derive LLM provider from adapter, not config
Remove redundant models.provider from team.yaml. Each adapter knows its
own provider key — AnthropicAdapter always looks up 'anthropic' in the
capability_map. This avoids a footgun where adapters.llm and models.provider
could disagree.

Future adapters (OpenAIAdapter, OllamaAdapter) will hardcode their own key
the same way.
2026-03-15 23:47:52 -04:00
60576fbf2f fix: remove hardcoded max_tokens/temperature from _dispatch_via_llm
Both values are now sourced from team.yaml (models.default_max_tokens and
models.default_temperature) via the adapter's __init__, eliminating the
last hardcoded magic numbers. Callers can still override per-call via
context dict if needed.
2026-03-15 21:43:01 -04:00
8524b63a76 fix: read default_temperature from team.yaml; update docstrings
- Add default_temperature: 0 to config/team.yaml models block
- Read self._default_temperature from models cfg in __init__
- Use self._default_temperature as fallback in complete() instead of hardcoded 0
- Update class docstring to document both default_max_tokens and default_temperature
- Update complete() context param docs to reference team.yaml keys
2026-03-15 21:40:05 -04:00
6856f10c27 fix(adapter/llm): make max_tokens configurable via team.yaml models.default_max_tokens 2026-03-15 18:55:57 -04:00
e097f4be21 feat(core): implement TeamRunner orchestration loop
Full T1→T5 pipeline orchestration with adapter registry, escalation,
and blackboard event emission.

Key design decisions:
- Adapter registry maps config keys to concrete classes; VCS and notify
  are optional (swallow init errors and degrade gracefully)
- _dispatch_brief() routes to LLM adapter (standard) or coding runtime
  (coding_agent) based on brief.preferred_runtime
- _run_with_escalation() drives the retry/salvage loop: persists amended
  briefs to the Blackboard before each re-submission
- Tier parsers (_parse_t1/t2/t3_output) build child TaskBriefs, preserving
  the goal_anchor invariant and resolving agent personalities from the registry
- T5 Verifier is always spawned after T4; VCS commit only happens on
  verified pass (status "passed" or "done")
- --dry-run flag: logs all actions, skips LLM, VCS, and notify calls
- Exposes CLI via `python -m core.team_runner` with --config, --dry-run,
  --verbose flags

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 03:15:37 -04:00
97e7be80d1 feat(adapter/runtime): implement OpenClaw and ClaudeCode runtime adapters
OpenClawRuntimeAdapter:
- spawn() shells out to `openclaw session spawn --task <t> --mode run`
- get_result() polls `openclaw session get <id>` until terminal status or timeout
- kill() calls `openclaw session kill <id>`, silently succeeds if finished
- Parses JSON or raw-text session IDs; raises NotImplementedError with
  helpful message when openclaw CLI is absent from PATH

ClaudeCodeRuntimeAdapter:
- spawn() launches `claude --permission-mode bypassPermissions --print <task>`
  in a temp dir (or context["workdir"]), returns a UUID job_id
- Tracks all Popen instances in a thread-safe dict
- get_result() calls communicate(timeout=...), raises TimeoutError on timeout
- kill() terminates the Popen; silently ignores already-finished processes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 03:15:21 -04:00
c88c4309ac feat(adapter/notify): implement OpenClawNotifyAdapter
Sends notifications via `openclaw system event --text <msg> --mode now`.
- Always logs locally (info/warning/error) regardless of CLI availability
- Gracefully handles FileNotFoundError (openclaw not on PATH) and
  TimeoutExpired; notifications are best-effort and never crash the pipeline
- OPENCLAW_SIGNAL_NUMBER env var stored for future direct-signal support

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 03:15:13 -04:00
b212082b58 feat(adapter/vcs): implement GitHubAdapter
Uses PyGithub to interact with the GitHub REST API.
- Reads GITHUB_TOKEN from env; parses owner/repo from SSH or HTTPS URL
- create_branch() creates a branch off the configured base branch
- commit() accepts dict[str, str] {path: content} or list[str] of
  local paths; uses Contents API (create_file / update_file)
- create_pr() and get_pr_status() delegate to PyGithub pull-request API

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 03:15:06 -04:00
9646a146bc feat(adapter/llm): implement AnthropicAdapter
Implements AnthropicAdapter using the anthropic SDK.
- Reads ANTHROPIC_API_KEY from env; raises ValueError if missing
- resolve_model() looks up capability_map in team.yaml config,
  falls back to "capable" tier then hard-coded claude-sonnet-4-6
- complete() supports system_prompt, max_tokens (default 4096),
  and temperature (default 0) via the context dict
- Adds PyGithub to requirements.txt (needed by GitHubAdapter)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 03:15:01 -04:00
10 changed files with 1593 additions and 211 deletions

View File

@@ -1,16 +1,15 @@
"""
adapters/llm/anthropic.py
Anthropic Claude adapter — Phase 2 stub.
Anthropic Claude LLM adapter — Phase 2 implementation.
TODO (Phase 2):
- Implement complete() using the anthropic SDK (anthropic.Anthropic client).
- Implement resolve_model() by reading config/team.yaml capability_map.
- Handle streaming responses, rate-limit retries, and token counting.
- Support system-prompt injection via context["system_prompt"].
- Map capability → model using the provider's capability_map config.
Uses the ``anthropic`` SDK to call Claude models. Model selection is driven
by the capability_map in team.yaml so the adapter stays provider-agnostic in
configuration.
"""
from __future__ import annotations
import os
from adapters.base.llm import LLMAdapter
@@ -18,27 +17,123 @@ class AnthropicAdapter(LLMAdapter):
"""
LLM adapter for Anthropic Claude models.
Reads model configuration from config/team.yaml:
models.provider: anthropic
models.capability_map.reasoning-heavy.anthropic: claude-opus-4-6
models.capability_map.capable.anthropic: claude-sonnet-4-6
models.capability_map.fast-cheap.anthropic: claude-haiku-3-5
Reads model configuration from the loaded team.yaml config dict::
models:
default_max_tokens: 4096 # fallback max_tokens for all calls
default_temperature: 0 # fallback temperature for all calls
capability_map:
reasoning-heavy:
anthropic: claude-opus-4-6
capable:
anthropic: claude-sonnet-4-6
fast-cheap:
anthropic: claude-haiku-3-5
The provider key used when looking up ``capability_map`` is hardcoded to
``"anthropic"`` — the adapter knows its own provider; there is no need for
a separate ``models.provider`` config field.
Both ``default_max_tokens`` and ``default_temperature`` can be overridden
per-call via the ``context`` dict passed to :meth:`complete`.
Environment variables
---------------------
ANTHROPIC_API_KEY : Required. Authenticates with the Anthropic API.
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract API key from environment (ANTHROPIC_API_KEY).
# Initialise the anthropic.Anthropic() client.
raise NotImplementedError("AnthropicAdapter.__init__ is not yet implemented.")
"""
Initialise the Anthropic adapter.
Parameters
----------
config : Loaded team.yaml config dict.
Raises
------
ValueError
If ANTHROPIC_API_KEY is not set in the environment.
"""
try:
import anthropic as _anthropic
except ModuleNotFoundError as exc:
raise ImportError(
"The 'anthropic' package is required for AnthropicAdapter. "
"Install it with: pip install anthropic"
) from exc
self._config = config
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError(
"ANTHROPIC_API_KEY environment variable is not set. "
"Export it before running the-agency."
)
self._client = _anthropic.Anthropic(api_key=api_key)
self._models_cfg: dict = config.get("models", {})
self._default_max_tokens: int = self._models_cfg.get("default_max_tokens", 4096)
self._default_temperature: float = self._models_cfg.get("default_temperature", 0)
def complete(self, prompt: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Call anthropic client messages.create().
# Use resolve_model(capability) to pick the model.
# Support context keys: system_prompt, max_tokens, temperature.
# Return response text as a plain string.
raise NotImplementedError("AnthropicAdapter.complete is not yet implemented.")
"""
Send a prompt to a Claude model and return the text response.
Parameters
----------
prompt : User-role prompt content.
capability : One of "reasoning-heavy" | "capable" | "fast-cheap".
context : Optional per-call overrides:
system_prompt (str) — prepended as the system turn.
max_tokens (int) — defaults to models.default_max_tokens in team.yaml.
temperature (float) — defaults to models.default_temperature in team.yaml.
Returns
-------
The model's text completion as a plain string.
"""
model = self.resolve_model(capability)
max_tokens: int = context.get("max_tokens", self._default_max_tokens)
temperature: float = context.get("temperature", self._default_temperature)
system_prompt: str = context.get("system_prompt", "")
create_kwargs: dict = {
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
}
if system_prompt:
create_kwargs["system"] = system_prompt
if temperature != 0.0:
create_kwargs["temperature"] = temperature
response = self._client.messages.create(**create_kwargs)
return response.content[0].text
def resolve_model(self, capability: str) -> str:
# TODO (Phase 2): Look up capability in team.yaml capability_map.
# Fall back to "capable" tier model if capability is unknown.
raise NotImplementedError("AnthropicAdapter.resolve_model is not yet implemented.")
"""
Map a capability string to the Anthropic model identifier.
Looks up ``config.models.capability_map[capability][provider]``.
Falls back to the "capable" tier model if the capability is unknown.
Parameters
----------
capability : One of "reasoning-heavy" | "capable" | "fast-cheap".
Returns
-------
Anthropic model identifier (e.g. "claude-opus-4-6").
"""
# The adapter knows its own provider — no need to read it from config.
cap_map: dict = self._models_cfg.get("capability_map", {})
if capability in cap_map and "anthropic" in cap_map[capability]:
return cap_map[capability]["anthropic"]
# Fall back to "capable" tier
if "capable" in cap_map and "anthropic" in cap_map["capable"]:
return cap_map["capable"]["anthropic"]
# Hard-coded last resort
return "claude-sonnet-4-6"

View File

@@ -1,35 +1,93 @@
"""
adapters/notify/openclaw.py
OpenClaw notification adapter — Phase 2 stub.
OpenClaw notification adapter — Phase 2 implementation.
TODO (Phase 2):
- Implement send() to dispatch notifications via the OpenClaw API.
- Support context keys: channel, severity, run_id, brief_id.
- Read endpoint and credentials from environment (OPENCLAW_API_KEY, OPENCLAW_URL).
- Handle rate limiting and delivery retries.
Sends notifications by shelling out to the ``openclaw`` CLI::
openclaw system event --text "<message>" --mode now
If the binary is not on PATH the method logs a warning and returns without
raising — notifications are best-effort and should never crash the pipeline.
"""
from __future__ import annotations
import logging
import os
import subprocess
from adapters.base.notify import NotifyAdapter
logger = logging.getLogger(__name__)
class OpenClawNotifyAdapter(NotifyAdapter):
"""
Notification adapter that sends messages via OpenClaw.
Notification adapter that dispatches messages via the ``openclaw`` CLI.
Expects environment variables:
OPENCLAW_API_KEY — authentication token
OPENCLAW_URL — base URL for the OpenClaw API (optional, defaults to hosted)
Environment variables
---------------------
OPENCLAW_SIGNAL_NUMBER : Optional. Direct signal target for OpenClaw sends.
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract OPENCLAW_API_KEY and OPENCLAW_URL from environment.
# Initialise an HTTP client (e.g. httpx or requests).
raise NotImplementedError("OpenClawNotifyAdapter.__init__ is not yet implemented.")
"""
Initialise the OpenClaw notification adapter.
Parameters
----------
config : Loaded team.yaml config dict (reserved for future options).
"""
self._config = config
self._signal_number: str = os.environ.get("OPENCLAW_SIGNAL_NUMBER", "")
def send(self, message: str, context: dict) -> None:
# TODO (Phase 2): POST notification payload to OpenClaw API.
# Include message, context (channel, severity, run_id, brief_id).
# Log delivery confirmation or raise on failure.
raise NotImplementedError("OpenClawNotifyAdapter.send is not yet implemented.")
"""
Send a notification via ``openclaw system event``.
Parameters
----------
message : Human-readable notification text.
context : Optional metadata. Recognised keys:
level (str) — "info" | "warning" | "error"; logged locally.
run_id (str) — included in the local log record.
brief_id (str) — included in the local log record.
Notes
-----
If the ``openclaw`` binary is not present on PATH, the method logs a
warning and returns silently. Notifications are best-effort.
"""
level: str = context.get("level", "info")
run_id: str = context.get("run_id", "")
brief_id: str = context.get("brief_id", "")
# Always log locally regardless of CLI availability.
log_msg = "[notify:%s] %s (run=%s brief=%s)" % (level, message, run_id, brief_id)
if level == "error":
logger.error(log_msg)
elif level == "warning":
logger.warning(log_msg)
else:
logger.info(log_msg)
cmd = ["openclaw", "system", "event", "--text", message, "--mode", "now"]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
logger.warning(
"openclaw event returned non-zero exit %d: %s",
result.returncode,
result.stderr.strip(),
)
except FileNotFoundError:
logger.warning(
"openclaw CLI not found on PATH; notification not delivered: %s",
message,
)
except subprocess.TimeoutExpired:
logger.warning("openclaw event timed out for message: %s", message)

View File

@@ -1,51 +1,163 @@
"""
adapters/runtime/claude_code.py
Claude Code agent runtime adapter — Phase 2 stub.
Claude Code sub-agent runtime adapter — Phase 2 implementation.
TODO (Phase 2):
- Implement spawn() to launch a Claude Code sub-agent via the Agent SDK.
- Implement get_result() to await agent completion and parse the output.
- Implement kill() to terminate the sub-agent process or session.
- Map task brief context (files, constraints, artifacts) into the agent's
system prompt and tool context.
- Handle Claude Code tool-use responses and extract structured output.
Spawns the ``claude`` CLI as a non-interactive subprocess for T4/T5
implementation tasks::
claude --permission-mode bypassPermissions --print "<task>"
Each spawned process is tracked by a UUID job_id so callers can later poll
for the result or terminate the job. Stdout is captured and returned as the
agent output; stderr is included for debugging.
"""
from __future__ import annotations
import logging
import subprocess
import tempfile
import threading
import uuid
from adapters.base.runtime import RuntimeAdapter
logger = logging.getLogger(__name__)
class ClaudeCodeRuntimeAdapter(RuntimeAdapter):
"""
Runtime adapter that spawns Claude Code sub-agents for coding tasks.
Runtime adapter that spawns ``claude`` CLI sub-agents for coding tasks.
Used when a TaskBrief has preferred_runtime == "coding_agent".
Credentials are inherited from the environment (``ANTHROPIC_API_KEY``).
The ``claude`` CLI must be installed and reachable on PATH.
Expects the Claude Code CLI / Agent SDK to be available in the environment.
Credentials are inherited from the environment (ANTHROPIC_API_KEY).
Used when a TaskBrief has ``preferred_runtime == "coding_agent"``.
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Validate that Claude Code CLI or SDK is accessible.
# Initialise any agent session management state.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.__init__ is not yet implemented.")
"""
Initialise the Claude Code runtime adapter.
Parameters
----------
config : Loaded team.yaml config dict (reserved for future options).
"""
self._config = config
# Maps job_id → running Popen instance.
self._jobs: dict[str, subprocess.Popen] = {}
self._lock = threading.Lock()
# ------------------------------------------------------------------
# RuntimeAdapter interface
# ------------------------------------------------------------------
def spawn(self, task: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Launch a Claude Code sub-agent.
# Compose a structured system prompt from task + context.
# Inject relevant files and constraints as tool context.
# Return an agent_id that maps to a running agent session.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.spawn is not yet implemented.")
"""
Launch ``claude --permission-mode bypassPermissions --print "<task>"``
as a non-interactive subprocess.
Parameters
----------
task : Full task description (typically a JSON-serialised brief).
capability : Capability hint (not forwarded; Claude Code resolves its
own model from the local environment).
context : Optional keys:
workdir (str) — cwd for the subprocess. A fresh
temporary directory is created if omitted.
Returns
-------
A UUID job_id string that uniquely identifies this subprocess.
"""
workdir: str = context.get("workdir") or tempfile.mkdtemp(
prefix="agency-claude-"
)
job_id = str(uuid.uuid4())
logger.info("Spawning Claude Code job %s in %s", job_id, workdir)
proc = subprocess.Popen(
["claude", "--permission-mode", "bypassPermissions", "--print", task],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=workdir,
)
with self._lock:
self._jobs[job_id] = proc
return job_id
def get_result(self, agent_id: str, timeout_s: int) -> dict:
# TODO (Phase 2): Await the Claude Code agent session to complete.
# Parse the agent's final message for structured JSON output.
# Return dict with: {"status": ..., "output": ..., "artifacts": [...]}.
# Raise TimeoutError if timeout_s elapses.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.get_result is not yet implemented.")
"""
Wait for the Claude Code subprocess to complete and return its output.
Parameters
----------
agent_id : Job id returned by spawn().
timeout_s : Maximum seconds to wait before raising TimeoutError.
Returns
-------
dict with keys:
status ("completed" | "failed")
output (str — full stdout)
artifacts (list — always empty; callers must parse output)
stderr (str — full stderr)
Raises
------
KeyError
If agent_id does not correspond to a known job.
TimeoutError
If the subprocess does not finish within timeout_s seconds.
"""
with self._lock:
proc = self._jobs.get(agent_id)
if proc is None:
raise KeyError(f"No Claude Code job found for agent_id={agent_id!r}")
try:
stdout, stderr = proc.communicate(timeout=timeout_s)
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
raise TimeoutError(
f"Claude Code job {agent_id!r} did not complete within {timeout_s}s."
)
status = "completed" if proc.returncode == 0 else "failed"
logger.info(
"Claude Code job %s finished: status=%s returncode=%d",
agent_id,
status,
proc.returncode,
)
return {
"status": status,
"output": stdout,
"artifacts": [],
"stderr": stderr,
}
def kill(self, agent_id: str) -> None:
# TODO (Phase 2): Terminate the Claude Code agent session.
# Clean up any temporary files or session state.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.kill is not yet implemented.")
"""
Terminate a running Claude Code subprocess.
Silently succeeds if the job has already finished or the id is unknown.
Parameters
----------
agent_id : Job id returned by spawn().
"""
with self._lock:
proc = self._jobs.get(agent_id)
if proc is not None:
try:
proc.terminate()
logger.info("Terminated Claude Code job %s", agent_id)
except OSError:
pass # Process already gone — that is fine.

View File

@@ -1,48 +1,241 @@
"""
adapters/runtime/openclaw.py
OpenClaw agent runtime adapter — Phase 2 stub.
OpenClaw agent runtime adapter — Phase 2 implementation.
TODO (Phase 2):
- Implement spawn() to submit a task to an OpenClaw worker pool.
- Implement get_result() to poll or subscribe for agent completion.
- Implement kill() to cancel a running OpenClaw agent job.
- Read endpoint and credentials from environment (OPENCLAW_API_KEY, OPENCLAW_URL).
- Map capability hint to an appropriate worker class/queue.
Spawns sub-agents by shelling out to the ``openclaw`` CLI::
openclaw session spawn --task "<task>" --mode run
openclaw session get <session_id>
openclaw session kill <session_id>
If the ``openclaw`` binary is unavailable, all methods raise
``NotImplementedError`` with a helpful message rather than crashing with a
raw ``FileNotFoundError``.
"""
from __future__ import annotations
import json
import logging
import re
import subprocess
import time
from adapters.base.runtime import RuntimeAdapter
logger = logging.getLogger(__name__)
# Status strings from the openclaw CLI that indicate a session has finished.
_TERMINAL_STATUSES = frozenset(
{"done", "completed", "failed", "partial", "blocked", "error"}
)
class OpenClawRuntimeAdapter(RuntimeAdapter):
"""
Runtime adapter that dispatches agent tasks to OpenClaw workers.
Runtime adapter that dispatches agent tasks to OpenClaw worker sessions.
Expects environment variables:
OPENCLAW_API_KEY — authentication token
OPENCLAW_URL — base URL for the OpenClaw API
All interactions use the ``openclaw`` CLI. No additional credentials are
required beyond what OpenClaw manages in the local environment.
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract OPENCLAW_API_KEY and OPENCLAW_URL from environment.
# Initialise HTTP client and any job-tracking state.
raise NotImplementedError("OpenClawRuntimeAdapter.__init__ is not yet implemented.")
"""
Initialise the OpenClaw runtime adapter.
Parameters
----------
config : Loaded team.yaml config dict (reserved for future options).
"""
self._config = config
# ------------------------------------------------------------------
# RuntimeAdapter interface
# ------------------------------------------------------------------
def spawn(self, task: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Submit task to OpenClaw worker pool.
# Map capability ("reasoning-heavy" | "capable" | "fast-cheap") to
# an appropriate worker queue or model hint.
# Return an agent_id string that can be used to poll for results.
raise NotImplementedError("OpenClawRuntimeAdapter.spawn is not yet implemented.")
"""
Spawn an OpenClaw agent session for the given task.
Parameters
----------
task : Natural-language task description.
capability : Capability hint ("reasoning-heavy" | "capable" | "fast-cheap").
Passed informally; actual routing is handled by OpenClaw.
context : Arbitrary context bag (currently unused by this adapter).
Returns
-------
session_id string parsed from the CLI output.
Raises
------
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
RuntimeError
If the session_id cannot be parsed from the CLI output.
"""
# TODO: map capability to an openclaw worker tier / model hint if the
# openclaw CLI gains that flag in a future release.
cmd = ["openclaw", "session", "spawn", "--task", task, "--mode", "run"]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or configure a different runtime adapter "
"(e.g. adapters.runtime.claude_code.ClaudeCodeRuntimeAdapter)."
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(
f"openclaw session spawn failed (exit {exc.returncode}): "
f"{exc.stderr.strip()}"
) from exc
return self._parse_session_id(result.stdout)
def get_result(self, agent_id: str, timeout_s: int) -> dict:
# TODO (Phase 2): Poll or long-poll the OpenClaw API for job completion.
# Raise TimeoutError if timeout_s elapses before the job finishes.
# Return a dict with at minimum: {"status": ..., "output": ..., "artifacts": [...]}.
raise NotImplementedError("OpenClawRuntimeAdapter.get_result is not yet implemented.")
"""
Poll ``openclaw session get`` until the session reaches a terminal
state or *timeout_s* seconds elapse.
Parameters
----------
agent_id : Session ID returned by spawn().
timeout_s : Maximum seconds to wait before raising TimeoutError.
Returns
-------
dict with keys: ``status``, ``output``, ``artifacts``.
Raises
------
TimeoutError
If the session does not finish within timeout_s seconds.
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
"""
deadline = time.monotonic() + timeout_s
poll_interval = 2.0
while time.monotonic() < deadline:
try:
result = subprocess.run(
["openclaw", "session", "get", agent_id],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or switch to a different runtime adapter."
)
except subprocess.TimeoutExpired:
logger.debug("openclaw session get timed out; will retry")
time.sleep(poll_interval)
continue
if result.returncode == 0 and result.stdout.strip():
parsed = self._parse_get_output(result.stdout)
if parsed.get("status", "").lower() in _TERMINAL_STATUSES:
return parsed
else:
logger.debug(
"openclaw session get returned exit=%d; retrying. stderr=%s",
result.returncode,
result.stderr.strip(),
)
time.sleep(poll_interval)
raise TimeoutError(
f"Agent {agent_id!r} did not complete within {timeout_s}s."
)
def kill(self, agent_id: str) -> None:
# TODO (Phase 2): Send a cancellation request to the OpenClaw API.
# Silently succeed if the agent has already finished.
raise NotImplementedError("OpenClawRuntimeAdapter.kill is not yet implemented.")
"""
Terminate an OpenClaw session unconditionally.
Silently succeeds if the session has already finished.
Parameters
----------
agent_id : Session ID returned by spawn().
Raises
------
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
"""
try:
subprocess.run(
["openclaw", "session", "kill", agent_id],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or switch to a different runtime adapter."
)
except subprocess.TimeoutExpired:
logger.warning("openclaw session kill timed out for agent %s", agent_id)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _parse_session_id(self, output: str) -> str:
"""Extract a session_id from the raw stdout of ``openclaw session spawn``."""
output = output.strip()
# Prefer structured JSON output.
try:
data = json.loads(output)
for key in ("session_id", "sessionId", "id"):
if key in data:
return str(data[key])
except (json.JSONDecodeError, TypeError):
pass
# Regex: look for "session_id: <id>" or similar.
m = re.search(
r"(?:session[_\s]?id|sessionId)[:\s]+([a-zA-Z0-9_\-]+)",
output,
re.IGNORECASE,
)
if m:
return m.group(1)
# Last resort: return the first non-empty line.
lines = [ln.strip() for ln in output.splitlines() if ln.strip()]
if lines:
return lines[0]
raise RuntimeError(
f"Could not parse session_id from openclaw output: {output!r}"
)
def _parse_get_output(self, output: str) -> dict:
"""Parse the stdout of ``openclaw session get`` into a result dict."""
output = output.strip()
try:
data = json.loads(output)
return {
"status": data.get("status", "done"),
"output": data.get("output", output),
"artifacts": data.get("artifacts", []),
}
except (json.JSONDecodeError, TypeError):
# Non-JSON output — treat as completed with raw text output.
return {
"status": "done",
"output": output,
"artifacts": [],
}

View File

@@ -1,16 +1,30 @@
"""
adapters/vcs/github.py
GitHub VCS adapter — Phase 2 stub.
GitHub VCS adapter — Phase 2 implementation.
TODO (Phase 2):
- Implement create_branch() using PyGithub or gh CLI subprocess.
- Implement commit() — stage files and push via git subprocess or API.
- Implement create_pr() using GitHub REST API (POST /repos/{owner}/{repo}/pulls).
- Implement get_pr_status() using GET /repos/{owner}/{repo}/pulls/{pull_number}.
- Read repo and credentials from config/team.yaml and environment (GITHUB_TOKEN).
Uses PyGithub (``pip install PyGithub``) to interact with the GitHub REST API.
Reads the repository URL and base branch from the team.yaml config dict.
Note on commit() signature
--------------------------
The base class declares ``commit(files: list[str], message: str)``, which is
insufficient for the GitHub Contents API (which requires file *content*, not
just paths). This implementation extends the signature to accept either:
* ``dict[str, str]`` — ``{path: content}`` mapping (preferred; uses the API).
* ``list[str]`` — local file paths; content is read from disk and pushed.
The optional ``branch`` keyword argument targets a specific branch; it
defaults to the configured base branch.
"""
from __future__ import annotations
import os
import re
from typing import Union
from github import Github, GithubException
from adapters.base.vcs import VCSAdapter
@@ -18,34 +32,175 @@ class GitHubAdapter(VCSAdapter):
"""
VCS adapter for GitHub repositories.
Expects environment variable GITHUB_TOKEN and config values:
run.repo — SSH or HTTPS clone URL
run.base_branch — default base branch (e.g. "main")
Authenticates via GITHUB_TOKEN and interacts with the GitHub REST API
through PyGithub.
Environment variables
---------------------
GITHUB_TOKEN : Required. Personal access token or GitHub App installation token.
Config keys (from team.yaml)
----------------------------
run.repo : SSH or HTTPS clone URL (e.g. "git@github.com:org/repo.git").
run.base_branch : Default base branch (e.g. "main").
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract GITHUB_TOKEN from environment.
# Parse owner/repo from config.run.repo.
raise NotImplementedError("GitHubAdapter.__init__ is not yet implemented.")
"""
Initialise the GitHub adapter.
Parameters
----------
config : Loaded team.yaml config dict.
Raises
------
ValueError
If GITHUB_TOKEN is not set or the repo URL cannot be parsed.
"""
self._config = config
token = os.environ.get("GITHUB_TOKEN")
if not token:
raise ValueError(
"GITHUB_TOKEN environment variable is not set. "
"Create a personal access token and export it before running the-agency."
)
self._g = Github(token)
run_cfg: dict = config.get("run", {})
repo_url: str = run_cfg.get("repo", "")
self._base_branch: str = run_cfg.get("base_branch", "main")
self._owner, self._repo_name = self._parse_repo_url(repo_url)
self._repo = self._g.get_repo(f"{self._owner}/{self._repo_name}")
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _parse_repo_url(self, url: str) -> tuple[str, str]:
"""Parse *owner* and *repo* name from an SSH or HTTPS GitHub URL."""
# git@github.com:owner/repo.git
m = re.match(r"git@github\.com:([^/]+)/([^/]+?)(?:\.git)?$", url)
if m:
return m.group(1), m.group(2)
# https://github.com/owner/repo[.git]
m = re.match(r"https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?/?$", url)
if m:
return m.group(1), m.group(2)
raise ValueError(
f"Cannot parse GitHub owner/repo from URL: {url!r}. "
"Expected SSH (git@github.com:owner/repo.git) or "
"HTTPS (https://github.com/owner/repo.git) format."
)
# ------------------------------------------------------------------
# VCSAdapter interface
# ------------------------------------------------------------------
def create_branch(self, name: str) -> None:
# TODO (Phase 2): Create branch via GitHub API or local git subprocess.
# Use config.run.base_branch as the branch point.
raise NotImplementedError("GitHubAdapter.create_branch is not yet implemented.")
"""
Create a new branch off ``self._base_branch`` on the remote.
def commit(self, files: list[str], message: str) -> str:
# TODO (Phase 2): Stage files (git add), create commit (git commit), push.
# Return the resulting commit SHA.
raise NotImplementedError("GitHubAdapter.commit is not yet implemented.")
Parameters
----------
name : New branch name (e.g. "feat/webhook-ingestion").
"""
base_ref = self._repo.get_git_ref(f"heads/{self._base_branch}")
self._repo.create_git_ref(f"refs/heads/{name}", base_ref.object.sha)
def commit(
self,
files: Union[dict[str, str], list[str]],
message: str,
branch: str | None = None,
) -> str:
"""
Commit files to the repository via the GitHub Contents API.
Parameters
----------
files : Either a ``dict[path, content]`` mapping (preferred), or a
``list[path]`` of local file paths whose content is read from
disk.
message : Commit message.
branch : Target branch. Defaults to ``self._base_branch``.
Returns
-------
SHA of the last created/updated commit, or empty string if no files
were committed.
"""
target_branch = branch or self._base_branch
# Normalise to {path: content}
if isinstance(files, list):
files_dict: dict[str, str] = {}
for path in files:
with open(path, "r", encoding="utf-8") as fh:
files_dict[path] = fh.read()
else:
files_dict = files
last_sha: str = ""
for path, content in files_dict.items():
try:
existing = self._repo.get_contents(path, ref=target_branch)
result = self._repo.update_file(
path=path,
message=message,
content=content,
sha=existing.sha, # type: ignore[union-attr]
branch=target_branch,
)
except GithubException:
# File does not exist yet — create it
result = self._repo.create_file(
path=path,
message=message,
content=content,
branch=target_branch,
)
last_sha = result["commit"].sha
return last_sha
def create_pr(self, title: str, body: str, head: str, base: str) -> str:
# TODO (Phase 2): POST to GitHub API /repos/{owner}/{repo}/pulls.
# Return the HTML URL of the created PR.
raise NotImplementedError("GitHubAdapter.create_pr is not yet implemented.")
"""
Open a pull request on GitHub.
Parameters
----------
title : PR title.
body : PR description / body markdown.
head : Head branch name (the branch with changes).
base : Base branch name (e.g. "main").
Returns
-------
HTML URL of the created pull request.
"""
pr = self._repo.create_pull(
title=title,
body=body,
head=head,
base=base,
)
return pr.html_url
def get_pr_status(self, pr_id: str) -> str:
# TODO (Phase 2): GET /repos/{owner}/{repo}/pulls/{number}.
# Map GitHub PR state ("open", "closed") + merged flag to
# our schema: "open" | "merged" | "closed".
raise NotImplementedError("GitHubAdapter.get_pr_status is not yet implemented.")
"""
Fetch the current status of a pull request.
Parameters
----------
pr_id : Pull request number as a string (e.g. "42").
Returns
-------
One of: "open" | "merged" | "closed".
"""
pr = self._repo.get_pull(int(pr_id))
if pr.merged:
return "merged"
return pr.state # "open" or "closed"

2
agents

Submodule agents updated: 5c669c28e6...72a144406c

View File

@@ -2,28 +2,40 @@ t1:
default: agents/strategy/nexus-strategy.md
t2:
backend: agents/engineering/engineering-software-architect.md
frontend: agents/engineering/engineering-software-architect.md
backend: agents/engineering/engineering-backend-architect.md
frontend: agents/engineering/engineering-frontend-architect.md
infra: agents/engineering/engineering-devops-automator.md
data: agents/engineering/engineering-data-engineer.md
ai: agents/engineering/engineering-software-architect.md
security: agents/engineering/engineering-security-engineer.md
mobile: agents/engineering/engineering-software-architect.md
default: agents/engineering/engineering-software-architect.md
t3:
backend: agents/engineering/engineering-senior-developer.md
frontend: agents/engineering/engineering-senior-developer.md
backend: agents/engineering/engineering-senior-backend-developer.md
frontend: agents/engineering/engineering-senior-frontend-developer.md
infra: agents/engineering/engineering-sre.md
default: agents/engineering/engineering-senior-developer.md
data: agents/engineering/engineering-data-engineer.md
ai: agents/engineering/engineering-ai-engineer.md
security: agents/engineering/engineering-security-engineer.md
mobile: agents/engineering/engineering-mobile-app-builder.md
database: agents/engineering/engineering-database-optimizer.md
devops: agents/engineering/engineering-sre.md
docs: agents/engineering/engineering-technical-writer.md
default: agents/engineering/engineering-backend-developer.md
t4:
frontend: agents/engineering/engineering-frontend-developer.md
backend: agents/engineering/engineering-backend-architect.md
backend: agents/engineering/engineering-backend-developer.md
database: agents/engineering/engineering-database-optimizer.md
devops: agents/engineering/engineering-devops-automator.md
mobile: agents/engineering/engineering-mobile-app-builder.md
ai: agents/engineering/engineering-ai-engineer.md
security: agents/engineering/engineering-security-engineer.md
docs: agents/engineering/engineering-technical-writer.md
default: agents/engineering/engineering-senior-developer.md
data: agents/engineering/engineering-data-engineer.md
embedded: agents/engineering/engineering-embedded-firmware-engineer.md
default: agents/engineering/engineering-backend-developer.md
t5:
code: agents/engineering/engineering-code-reviewer.md
@@ -31,4 +43,8 @@ t5:
api: agents/testing/testing-api-tester.md
performance: agents/testing/testing-performance-benchmarker.md
security: agents/engineering/engineering-security-engineer.md
accessibility: agents/testing/testing-accessibility-auditor.md
e2e: agents/testing/testing-evidence-collector.md
frontend: agents/testing/testing-accessibility-auditor.md
data: agents/testing/testing-reality-checker.md
default: agents/engineering/engineering-code-reviewer.md

View File

@@ -7,10 +7,21 @@ adapters:
llm: anthropic
vcs: github
notify: openclaw
runtime: openclaw
adapter_registry:
llm:
anthropic: "adapters.llm.anthropic:AnthropicAdapter"
vcs:
github: "adapters.vcs.github:GitHubAdapter"
notify:
openclaw: "adapters.notify.openclaw:OpenClawNotifyAdapter"
runtime:
openclaw: "adapters.runtime.openclaw:OpenClawRuntimeAdapter"
claude_code: "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter"
models:
provider: anthropic
default_max_tokens: 4096
default_temperature: 0
capability_map:
reasoning-heavy:
anthropic: claude-opus-4-6

View File

@@ -1,99 +1,838 @@
"""
core/team_runner.py
Top-level orchestration entry point — Phase 2 stub.
Top-level orchestration entry point for the-agency pipeline.
The TeamRunner is responsible for:
1. Loading config/team.yaml and config/role_registry.yaml.
2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime).
3. Creating a Blackboard for the run.
4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary.
5. Recursively spawning T2→T5 briefs based on tier outputs.
6. Using EscalationHandler to manage retries, salvage, and escalation.
7. Writing final run status and summary to the Blackboard.
The TeamRunner loads team.yaml, builds the adapter registry, and drives the
full T1 → T2 → T3 → T4 → T5 dispatch loop with escalation handling.
TODO (Phase 2):
- Load and validate team.yaml configuration.
- Build adapter registry (map adapter keys → concrete adapter classes).
- Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5.
- Parse tier JSON outputs into child TaskBrief objects via make_child_brief().
- Integrate EscalationHandler into the dispatch loop.
- Support --dry-run flag (log actions without executing).
- Emit blackboard events at each stage (spawned, completed, failed, etc.).
- Expose a CLI entry point (argparse or click).
Runtime adapters are config-driven: every string-valued key in the top-level
``runtime:`` section of team.yaml is instantiated as a RuntimeAdapter and
stored in ``self._runtimes[name]``. Non-string values (e.g. ``native_teams:
false``) are silently skipped. Dispatch routing uses
``brief.preferred_runtime`` to look up the right adapter at call time.
CLI usage::
python -m core.team_runner --config config/team.yaml [--dry-run] [--verbose]
"""
from __future__ import annotations
# TODO (Phase 2): Uncomment and implement imports as adapters are built.
# import argparse
# import yaml
# from core.task_brief import TaskBrief
# from core.blackboard import Blackboard
# from core.escalation import EscalationHandler
# from adapters.llm.anthropic import AnthropicAdapter
# from adapters.vcs.github import GitHubAdapter
# from adapters.notify.openclaw import OpenClawNotifyAdapter
# from adapters.runtime.openclaw import OpenClawRuntimeAdapter
# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter
import argparse
import json
import logging
import os
import re
import uuid
from typing import Optional
import yaml
from core.blackboard import Blackboard
from core.escalation import EscalationHandler
from core.task_brief import TaskBrief
import importlib
from adapters.base.llm import LLMAdapter
from adapters.base.notify import NotifyAdapter
from adapters.base.runtime import RuntimeAdapter
from adapters.base.vcs import VCSAdapter
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Maps tier number → prompt file path (relative to project root).
_TIER_PROMPTS: dict[int, str] = {
1: "prompts/t1_visionary.md",
2: "prompts/t2_architect.md",
3: "prompts/t3_squad_lead.md",
4: "prompts/t4_implementer.md",
5: "prompts/t5_verifier.md",
}
# Maps tier number → LLM capability hint.
_TIER_CAPABILITIES: dict[int, str] = {
1: "reasoning-heavy",
2: "reasoning-heavy",
3: "capable",
4: "fast-cheap",
5: "fast-cheap",
}
# ---------------------------------------------------------------------------
# Adapter registries
#
# Values are "module.path:ClassName" strings resolved lazily via importlib.
# To add a new adapter, append an entry here — no changes to TeamRunner needed.
# team.yaml may also supply a full "module.path:ClassName" value directly,
# enabling third-party adapters without touching this file.
# ---------------------------------------------------------------------------
# Adapter registries are loaded from team.yaml at runtime (adapter_registry section).
# Fallback built-ins are used only if team.yaml doesn't define adapter_registry.
_BUILTIN_ADAPTER_REGISTRY: dict[str, dict[str, str]] = {
"llm": {"anthropic": "adapters.llm.anthropic:AnthropicAdapter"},
"vcs": {"github": "adapters.vcs.github:GitHubAdapter"},
"notify": {"openclaw": "adapters.notify.openclaw:OpenClawNotifyAdapter"},
"runtime": {
"openclaw": "adapters.runtime.openclaw:OpenClawRuntimeAdapter",
"claude_code": "adapters.runtime.claude_code:ClaudeCodeRuntimeAdapter",
},
}
def _load_adapter_class(key: str, registry: dict[str, str], label: str) -> type:
"""
Resolve a short name or dotted "module:ClassName" path to an adapter class.
Resolution order:
1. If *key* is in *registry*, use the mapped dotted path.
2. Otherwise, treat *key* itself as a dotted path (custom / third-party).
"""
dotted = registry.get(key, key)
if ":" not in dotted:
raise ValueError(
f"Unknown {label} adapter {key!r}. "
f"Built-in choices: {list(registry)}. "
f"Or supply a full 'module.path:ClassName' value in team.yaml."
)
module_path, class_name = dotted.rsplit(":", 1)
try:
module = importlib.import_module(module_path)
except ModuleNotFoundError as exc:
raise ImportError(
f"Cannot import {label} adapter module {module_path!r}: {exc}"
) from exc
try:
return getattr(module, class_name)
except AttributeError:
raise ImportError(
f"Module {module_path!r} has no class {class_name!r}"
)
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class EscalationError(RuntimeError):
"""Raised when a brief escalates past its retry budget with no recovery."""
# ---------------------------------------------------------------------------
# TeamRunner
# ---------------------------------------------------------------------------
class TeamRunner:
"""
Orchestrates a full T1→T5 agent pipeline run.
Usage (Phase 2)::
Usage::
runner = TeamRunner(config_path="config/team.yaml")
runner.run()
Dry-run mode logs all planned actions but skips LLM calls, VCS commits,
and notifications::
runner = TeamRunner(config_path="config/team.yaml", dry_run=True)
runner.run()
"""
def __init__(self, config_path: str = "config/team.yaml") -> None:
# TODO (Phase 2): Load YAML config.
# Instantiate adapters based on config.adapters keys.
# Create a Blackboard for this run.
raise NotImplementedError("TeamRunner.__init__ is not yet implemented.")
def __init__(
self,
config_path: str = "config/team.yaml",
dry_run: bool = False,
) -> None:
"""
Load configuration and instantiate adapters.
Parameters
----------
config_path : Path to team.yaml.
dry_run : When True, skip LLM calls, VCS commits, and notifications.
All planned actions are logged at INFO level.
Runtime adapters are built from the top-level ``runtime:`` section of
team.yaml. Each string-valued key becomes an entry in
``self._runtimes``; non-string values (e.g. ``native_teams: false``)
are ignored. Adding a new runtime type requires only a new key in
team.yaml — no changes to TeamRunner are needed.
"""
self._dry_run = dry_run
self._config = self._load_yaml(config_path)
self._role_registry = self._load_yaml("config/role_registry.yaml")
# Merge config-defined adapter_registry over built-in fallbacks.
self._adapter_registry: dict[str, dict[str, str]] = {
k: {**v} for k, v in _BUILTIN_ADAPTER_REGISTRY.items()
}
for kind, entries in self._config.get("adapter_registry", {}).items():
self._adapter_registry.setdefault(kind, {}).update(entries)
self._escalation = EscalationHandler()
run_id = str(uuid.uuid4())
self._bb = Blackboard(run_id=run_id)
# Build adapters — VCS and notify are optional and swallow init errors.
adapter_cfg: dict = self._config.get("adapters", {})
runtime_cfg: dict = self._config.get("runtime", {})
if dry_run:
# In dry-run mode the LLM adapter is never actually called, so we
# tolerate missing dependencies (e.g. 'anthropic' SDK not installed).
try:
self._llm: LLMAdapter = self._build_llm(adapter_cfg.get("llm", "anthropic"))
except (ImportError, ValueError) as exc:
logger.warning(
"LLM adapter unavailable in dry-run mode (%s) — continuing.", exc
)
self._llm = None # type: ignore[assignment]
else:
self._llm = self._build_llm(adapter_cfg.get("llm", "anthropic"))
self._vcs: Optional[VCSAdapter] = self._build_optional( # type: ignore[assignment]
self._adapter_registry.get("vcs", {}), adapter_cfg.get("vcs"), "VCS"
)
self._notify: Optional[NotifyAdapter] = self._build_optional( # type: ignore[assignment]
self._adapter_registry.get("notify", {}), adapter_cfg.get("notify"), "notify"
)
# Runtime adapters are fully config-driven — one entry per string-valued
# key in the top-level ``runtime:`` section of team.yaml.
self._runtimes: dict[str, RuntimeAdapter] = self._build_runtimes(runtime_cfg)
logger.info(
"TeamRunner initialised: run_id=%s dry_run=%s", run_id, dry_run
)
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
@staticmethod
def _load_yaml(path: str) -> dict:
with open(path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
@staticmethod
def _load_text(path: str) -> str:
with open(path, "r", encoding="utf-8") as fh:
return fh.read()
def _build_llm(self, key: str) -> LLMAdapter:
cls = _load_adapter_class(key, self._adapter_registry.get("llm", {}), "LLM")
return cls(self._config)
def _build_optional(
self,
registry: dict[str, str],
key: Optional[str],
label: str,
) -> Optional[object]:
"""Build an optional adapter, returning None on any init error."""
if not key:
return None
try:
cls = _load_adapter_class(key, registry, label)
return cls(self._config)
except (ImportError, ValueError) as exc:
logger.warning("Unknown %s adapter %r — skipping. (%s)", label, key, exc)
return None
except Exception as exc:
logger.warning(
"%s adapter %r could not be initialised (%s) — skipping.",
label,
key,
exc,
)
return None
def _build_runtime(self, key: str) -> RuntimeAdapter:
cls = _load_adapter_class(key, self._adapter_registry.get("runtime", {}), "runtime")
return cls(self._config)
def _build_runtimes(self, runtime_cfg: dict) -> dict[str, RuntimeAdapter]:
"""
Build a name → RuntimeAdapter mapping from the ``runtime:`` config block.
Every key whose value is a string is treated as a runtime adapter name
and instantiated via ``_build_runtime``. Non-string values (e.g.
``native_teams: false``) are skipped so that boolean/numeric control
flags can coexist in the same config section.
"""
runtimes: dict[str, RuntimeAdapter] = {}
for name, value in runtime_cfg.items():
if not isinstance(value, str):
continue
runtimes[name] = self._build_runtime(value)
return runtimes
# ------------------------------------------------------------------
# Role registry
# ------------------------------------------------------------------
def _resolve_personality(self, tier: int, role: str) -> Optional[str]:
"""Return the path to the agent persona .md file, or None."""
tier_key = f"t{tier}"
tier_map: dict = self._role_registry.get(tier_key, {})
path = tier_map.get(role) or tier_map.get("default")
if path and os.path.isfile(path):
return path
return None
# ------------------------------------------------------------------
# Prompt helpers
# ------------------------------------------------------------------
def _load_tier_prompt(self, tier: int) -> str:
"""Load the system prompt for a tier from the prompts/ directory."""
path = _TIER_PROMPTS.get(tier, "")
if path and os.path.isfile(path):
return self._load_text(path)
logger.warning("Tier %d prompt not found at %r", tier, path)
return ""
def _load_personality(self, path: Optional[str]) -> str:
if path and os.path.isfile(path):
return self._load_text(path)
return ""
@staticmethod
def _extract_json(text: str) -> dict:
"""
Extract a JSON object from a potentially markdown-wrapped LLM response.
Strips leading/trailing markdown fences (```json ... ```) then parses.
Falls back to a regex scan for the first ``{...}`` block if plain
parsing fails.
"""
text = text.strip()
# Strip markdown fences.
if text.startswith("```"):
text = re.sub(r"^```[a-z]*\n?", "", text)
text = re.sub(r"\n?```\s*$", "", text.strip())
try:
return json.loads(text)
except json.JSONDecodeError:
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
raise ValueError(
"Could not parse JSON from LLM response.\n"
f"Response (first 500 chars): {text[:500]}"
)
# ------------------------------------------------------------------
# Brief dispatch
# ------------------------------------------------------------------
def _dispatch_brief(self, brief: TaskBrief) -> dict:
"""
Send a TaskBrief to the appropriate agent and return the raw result dict.
Routing
-------
preferred_runtime == "standard" or empty/None → LLM adapter directly
Otherwise → look up self._runtimes[preferred_runtime]; falls back to
self._runtimes["default"] and then to LLM if no runtime is found.
Blackboard events emitted: spawned → completed | failed.
"""
if self._dry_run:
logger.info(
"[DRY-RUN] dispatch tier=%d role=%s task=%.80s",
brief.tier,
brief.role,
brief.task,
)
return {"status": "done", "output": "{}", "artifacts": []}
self._bb.update_brief_status(brief.brief_id, "active")
self._bb.log_event(
"spawned",
brief_id=brief.brief_id,
detail={"tier": brief.tier, "role": brief.role},
)
try:
pref = brief.preferred_runtime
if not pref or pref == "standard":
result = self._dispatch_via_llm(brief)
else:
runtime = self._runtimes.get(pref) or self._runtimes.get("default")
if runtime is None:
logger.warning(
"No runtime adapter found for %r (and no 'default') — "
"falling back to LLM for brief %s",
pref,
brief.brief_id,
)
result = self._dispatch_via_llm(brief)
else:
result = self._dispatch_via_runtime(brief, runtime)
self._bb.update_brief_result(brief.brief_id, result)
self._bb.log_event(
"completed",
brief_id=brief.brief_id,
detail={"status": result.get("status")},
)
return result
except Exception as exc:
self._bb.update_brief_status(brief.brief_id, "failed")
self._bb.log_event(
"failed",
brief_id=brief.brief_id,
detail={"error": str(exc)},
)
raise
def _dispatch_via_llm(self, brief: TaskBrief) -> dict:
"""Call the LLM adapter with the tier system prompt + brief JSON."""
tier_prompt = self._load_tier_prompt(brief.tier)
personality = self._load_personality(brief.agent_personality)
system_prompt = "\n\n".join(filter(None, [tier_prompt, personality]))
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
user_message = json.dumps(brief.to_dict(), indent=2)
raw = self._llm.complete(
prompt=user_message,
capability=capability,
context={
"system_prompt": system_prompt,
},
)
return self._extract_json(raw)
def _dispatch_via_runtime(self, brief: TaskBrief, runtime: RuntimeAdapter) -> dict:
"""Spawn an agent via *runtime* and collect its result."""
task_str = json.dumps(brief.to_dict(), indent=2)
capability = _TIER_CAPABILITIES.get(brief.tier, "capable")
timeout_s: int = brief.context.get("timeout_s", 300)
agent_id = runtime.spawn(
task=task_str,
capability=capability,
context=brief.context,
)
logger.info(
"Spawned runtime agent %s for brief %s", agent_id, brief.brief_id
)
result = runtime.get_result(agent_id, timeout_s=timeout_s)
# Attempt to parse JSON from the agent's text output.
if isinstance(result.get("output"), str) and result["output"].strip():
try:
parsed = self._extract_json(result["output"])
result.update(parsed)
except ValueError:
pass # Keep raw string output as-is.
return result
# ------------------------------------------------------------------
# Escalation loop
# ------------------------------------------------------------------
def _run_with_escalation(
self,
brief: TaskBrief,
workstream_id: Optional[str] = None,
) -> dict:
"""
Dispatch a brief and apply the escalation policy until done or exhausted.
On retry the amended brief is persisted to the Blackboard before
being re-submitted.
"""
while True:
result = self._dispatch_brief(brief)
decision = self._escalation.handle(brief, result)
if decision.action == "complete":
return result
if decision.action == "escalate":
self._bb.log_event(
"escalated",
brief_id=brief.brief_id,
detail={"reason": decision.reason},
)
raise EscalationError(
f"Brief {brief.brief_id} (tier={brief.tier} role={brief.role}) "
f"escalated: {decision.reason}"
)
# "retry" or "salvage_and_retry"
self._bb.log_event(
"retried",
brief_id=brief.brief_id,
detail={"reason": decision.reason, "action": decision.action},
)
amended = decision.amended_brief
if amended is None:
raise EscalationError(
f"Escalation returned action={decision.action!r} "
"but no amended_brief was provided."
)
# Persist the new brief and loop.
self._bb.create_brief(amended, workstream_id=workstream_id)
brief = amended
# ------------------------------------------------------------------
# Tier output parsers
# ------------------------------------------------------------------
def _parse_t1_output(
self, result: dict, root_brief: TaskBrief
) -> list[TaskBrief]:
"""Build T2 TaskBriefs from T1 (Visionary) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
workstreams: list[dict] = result.get("workstreams", [])
# T1 sets the canonical goal_anchor; propagate it back to root.
goal_anchor: str = result.get("goal_anchor") or root_brief.goal_anchor
root_brief.goal_anchor = goal_anchor
briefs: list[TaskBrief] = []
for ws in workstreams:
role = ws.get("role", "default")
brief = root_brief.make_child_brief(
tier=2,
role=role,
task=ws.get("task", ""),
workstream=ws.get("name", ""),
acceptance_criteria=ws.get("acceptance_criteria", []),
preferred_runtime="standard",
agent_personality=self._resolve_personality(2, role),
retry_budget=retry_bad,
)
briefs.append(brief)
return briefs
def _parse_t2_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T3 TaskBriefs from T2 (Architect) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
subtasks: list[dict] = result.get("subtasks", [])
arch_summary: str = result.get("architecture_summary", "")
briefs: list[TaskBrief] = []
for st in subtasks:
role = st.get("role", "default")
brief = parent.make_child_brief(
tier=3,
role=role,
task=st.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=st.get("acceptance_criteria", []),
preferred_runtime=st.get("preferred_runtime", "standard"),
agent_personality=self._resolve_personality(3, role),
retry_budget=retry_bad,
context={"architecture_summary": arch_summary},
)
briefs.append(brief)
return briefs
def _parse_t3_output(
self, result: dict, parent: TaskBrief
) -> list[TaskBrief]:
"""Build T4 TaskBriefs from T3 (Squad Lead) JSON output."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
tasks: list[dict] = result.get("tasks", [])
plan_summary: str = result.get("plan_summary", "")
briefs: list[TaskBrief] = []
for task in tasks:
role = task.get("role", "default")
# T4 is the coding/implementation tier; default to coding_agent
# so implementers use Claude Code unless T3 explicitly overrides.
pref_runtime = task.get("preferred_runtime", "coding_agent")
brief = parent.make_child_brief(
tier=4,
role=role,
task=task.get("task", ""),
workstream=parent.workstream,
acceptance_criteria=task.get("acceptance_criteria", []),
preferred_runtime=pref_runtime,
agent_personality=self._resolve_personality(4, role),
retry_budget=retry_bad,
context={
"plan_summary": plan_summary,
"depends_on": task.get("depends_on", []),
},
)
briefs.append(brief)
return briefs
# ------------------------------------------------------------------
# VCS helpers
# ------------------------------------------------------------------
def _commit_artifacts(
self,
artifacts: list[dict],
brief: TaskBrief,
) -> None:
"""Commit T4 *file* artifacts to the configured VCS adapter."""
if not self._vcs or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would commit %d artifact(s) for brief %s",
len(artifacts),
brief.brief_id,
)
return
file_map: dict[str, str] = {
a["path"]: a["content"]
for a in artifacts
if a.get("type") == "file"
and a.get("path")
and a.get("content") is not None
}
if not file_map:
return
branch: str = self._config.get("run", {}).get("base_branch", "main")
message = (
f"feat({brief.workstream}): artifacts from {brief.role} "
f"[brief {brief.brief_id[:8]}]"
)
try:
# GitHubAdapter.commit accepts dict[str, str] as files.
sha = self._vcs.commit(file_map, message) # type: ignore[call-arg]
logger.info(
"Committed %d artifact(s) → SHA %s", len(file_map), sha
)
except Exception as exc:
logger.warning("VCS commit failed: %s", exc)
# ------------------------------------------------------------------
# Notification
# ------------------------------------------------------------------
def _notify_run(self, outcome: str, goal: str, detail: dict) -> None:
if not self._notify or self._dry_run:
if self._dry_run:
logger.info(
"[DRY-RUN] Would notify outcome=%s goal=%.80s", outcome, goal
)
return
level = "info" if outcome == "complete" else "error"
if outcome == "complete":
message = f"Pipeline complete: {goal[:80]}"
else:
message = f"Pipeline failed: {detail.get('error', 'unknown error')[:120]}"
self._notify.send(
message,
context={
"level": level,
"run_id": self._bb.run_id,
"outcome": outcome,
**{k: str(v) for k, v in detail.items()},
},
)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run(self) -> None:
"""
Execute the full pipeline from T1 decomposition through T5 verification.
Execute the full T1→T5 pipeline.
TODO (Phase 2):
- Build root T1 brief from config.run.goal.
- Dispatch to T1 Visionary via LLM adapter.
- Parse workstreams from T1 output.
- For each workstream: dispatch T2 Architect.
- For each T2 subtask: dispatch T3 Squad Lead.
- For each T3 task: dispatch T4 Implementer.
- For each T4 artifact set: dispatch T5 Verifier.
- Run escalation handler at each tier on failure.
- Commit passing artifacts via VCS adapter.
- Notify on completion or terminal failure via notify adapter.
Steps
-----
1. Dispatch T1 Visionary to decompose the goal into workstreams.
2. For each workstream: T2 Architect → T3 Squad Lead →
T4 Implementer → T5 Verifier.
3. Commit passing T4 artifacts via VCS adapter (if configured).
4. Notify on completion or terminal failure via notify adapter.
"""
raise NotImplementedError("TeamRunner.run is not yet implemented.")
goal: str = self._config["run"]["goal"]
self._bb.create_run(goal=goal)
self._bb.update_run_status("active")
logger.info("Pipeline started — goal: %s", goal)
def _dispatch_brief(self, brief) -> dict:
"""
Send a single TaskBrief to the appropriate agent and return the result.
try:
self._orchestrate(goal)
self._bb.update_run_status("done")
summary = self._bb.get_run_summary()
logger.info("Pipeline complete. Summary: %s", summary)
self._notify_run("complete", goal, summary)
except Exception as exc:
self._bb.update_run_status("failed")
logger.error("Pipeline failed: %s", exc, exc_info=True)
self._notify_run("failed", goal, {"error": str(exc)})
raise
finally:
self._bb.close()
TODO (Phase 2):
- Select runtime based on brief.preferred_runtime.
- Load agent personality from brief.agent_personality (if set).
- Compose prompt from tier system prompt + brief payload.
- Spawn agent via runtime adapter.
- Await result via runtime.get_result().
- Log spawned/completed/failed events to Blackboard.
"""
raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.")
# ------------------------------------------------------------------
# Internal orchestration
# ------------------------------------------------------------------
def _orchestrate(self, goal: str) -> None:
"""Build the root T1 brief, dispatch it, and fan out per workstream."""
retry_bad: int = self._config.get("retry_defaults", {}).get("bad_output", 3)
# ---- T1: Visionary ----
t1_brief = TaskBrief(
run_id=self._bb.run_id,
tier=1,
role="default",
goal_anchor=goal,
task=(
"You are the T1 Visionary. "
"Decompose the following goal into parallel workstreams "
f"for the engineering team: {goal}"
),
workstream="root",
retry_budget=retry_bad,
preferred_runtime="standard",
agent_personality=self._resolve_personality(1, "default"),
)
self._bb.create_brief(t1_brief)
t1_result = self._run_with_escalation(t1_brief)
t2_briefs = self._parse_t1_output(t1_result, t1_brief)
logger.info("T1 produced %d workstream(s)", len(t2_briefs))
# ---- T2..T5: per workstream ----
for t2_brief in t2_briefs:
ws_id = self._bb.create_workstream(
name=t2_brief.workstream, tier=2
)
self._bb.create_brief(t2_brief, workstream_id=ws_id)
self._bb.update_workstream_status(ws_id, "active")
try:
self._run_workstream(t2_brief, ws_id)
self._bb.update_workstream_status(ws_id, "done")
except EscalationError as exc:
self._bb.update_workstream_status(ws_id, "failed")
self._bb.log_event(
"failed",
detail={"error": str(exc), "workstream": t2_brief.workstream},
)
logger.error(
"Workstream %r failed: %s", t2_brief.workstream, exc
)
def _run_workstream(self, t2_brief: TaskBrief, ws_id: str) -> None:
"""Drive T2 → T3 → T4 → T5 for a single workstream."""
# T2: Architect
t2_result = self._run_with_escalation(t2_brief, workstream_id=ws_id)
t3_briefs = self._parse_t2_output(t2_result, t2_brief)
logger.info(
"T2 (%s) produced %d subtask(s)", t2_brief.workstream, len(t3_briefs)
)
for t3_brief in t3_briefs:
self._bb.create_brief(t3_brief, workstream_id=ws_id)
try:
# T3: Squad Lead
t3_result = self._run_with_escalation(t3_brief, workstream_id=ws_id)
t4_briefs = self._parse_t3_output(t3_result, t3_brief)
logger.info(
"T3 (%s) produced %d task(s)", t3_brief.role, len(t4_briefs)
)
for t4_brief in t4_briefs:
self._bb.create_brief(t4_brief, workstream_id=ws_id)
try:
# T4: Implementer
t4_result = self._run_with_escalation(
t4_brief, workstream_id=ws_id
)
artifacts: list[dict] = t4_result.get("artifacts", [])
# T5: Verifier
t5_brief = t4_brief.make_child_brief(
tier=5,
role="code",
task=(
"Verify the following T4 implementation artifacts "
"against all acceptance criteria. "
f"T4 output: {json.dumps(t4_result)[:2000]}"
),
workstream=t4_brief.workstream,
acceptance_criteria=t4_brief.acceptance_criteria,
preferred_runtime="standard",
agent_personality=self._resolve_personality(5, "code"),
retry_budget=self._config.get(
"retry_defaults", {}
).get("bad_output", 3),
context={"t4_result": t4_result},
)
self._bb.create_brief(t5_brief, workstream_id=ws_id)
t5_result = self._run_with_escalation(
t5_brief, workstream_id=ws_id
)
# Commit on verified pass.
if t5_result.get("status") in ("passed", "done"):
self._commit_artifacts(artifacts, t4_brief)
except EscalationError as exc:
logger.error(
"T4/T5 escalation in %s: %s", t4_brief.role, exc
)
except EscalationError as exc:
logger.error("T3 escalation in %s: %s", t3_brief.role, exc)
# ---------------------------------------------------------------------------
# CLI entry point (Phase 2)
# CLI entry point
# ---------------------------------------------------------------------------
# TODO (Phase 2): Implement argparse CLI.
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(description="Run the-agency pipeline.")
# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml")
# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing")
# args = parser.parse_args()
# runner = TeamRunner(config_path=args.config)
# runner.run()
def _configure_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the-agency T1→T5 pipeline.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config",
default="config/team.yaml",
help="Path to team.yaml configuration file.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help=(
"Log all planned actions without executing LLM calls, "
"VCS commits, or notifications."
),
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable DEBUG-level logging.",
)
args = parser.parse_args()
_configure_logging(args.verbose)
runner = TeamRunner(config_path=args.config, dry_run=args.dry_run)
runner.run()

View File

@@ -10,6 +10,9 @@ pyyaml
# Environment variable management
python-dotenv
# GitHub VCS adapter
PyGithub
# --- stdlib-only (no pip install needed) ---
# sqlite3 — blackboard persistence
# dataclasses — task_brief schema