feat: initial bootstrap — structure, task_brief, blackboard, adapter bases, escalation, prompts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-15 02:19:14 -04:00
commit eaf7fd8f6f
33 changed files with 2141 additions and 0 deletions

91
.gitignore vendored Normal file
View File

@@ -0,0 +1,91 @@
# =============================================================================
# the-agency .gitignore
# =============================================================================
# --- Python -------------------------------------------------------------------
__pycache__/
*.py[cod]
*$py.class
*.so
*.egg
*.egg-info/
dist/
build/
eggs/
parts/
var/
sdist/
develop-eggs/
.installed.cfg
lib/
lib64/
MANIFEST
# Virtual environments
.venv/
venv/
env/
ENV/
.env
.env.*
!.env.example
# Distribution / packaging
pip-wheel-metadata/
share/python-wheels/
*.whl
# Unit test / coverage
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# MyPy / type checkers
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
.pytype/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# --- Runs / Blackboard --------------------------------------------------------
# Keep the runs/ directory (tracked via .gitkeep) but exclude all DB files
# and any run-specific subdirectories.
runs/**/blackboard.db
runs/*/
# --- macOS --------------------------------------------------------------------
.DS_Store
.AppleDouble
.LSOverride
._*
# --- Editors ------------------------------------------------------------------
.idea/
.vscode/
*.swp
*.swo
*~
.project
.classpath
# --- Logs & temp files --------------------------------------------------------
*.log
*.tmp
*.temp

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "agents"]
path = agents
url = https://github.com/msitarzewski/agency-agents

135
README.md Normal file
View File

@@ -0,0 +1,135 @@
# the-agency
A tiered, multi-agent orchestration framework that decomposes high-level goals into executable work across five specialised tiers (T1T5), backed by a SQLite blackboard and pluggable adapters for LLM, VCS, notifications, and agent runtimes.
---
## What is the-agency?
the-agency implements a **structured, goal-anchored pipeline** where every task flows through a hierarchy:
| Tier | Role | Responsibility |
|---|---|---|
| T1 | Visionary | Decomposes a goal into workstreams; sets the immutable `goal_anchor` |
| T2 | Architect | Designs technical architecture and subtasks for each workstream |
| T3 | Squad Lead | Breaks architecture subtasks into concrete implementation tasks |
| T4 | Implementer | Produces code, config, and other artifacts |
| T5 | Verifier | Reviews T4 artifacts against acceptance criteria |
Each tier produces structured JSON. A central **Blackboard** (SQLite) tracks all runs, workstreams, briefs, and events. An **EscalationHandler** decides whether to retry, salvage, or escalate failures automatically.
---
## Quick Start
### Prerequisites
- Python 3.11+
- An Anthropic API key (or other supported LLM provider)
### Installation
```bash
git clone <your-repo-url> the-agency
cd the-agency
git submodule update --init --recursive # pulls agents/ persona library
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env # add your API keys
```
### Configuration
Edit `config/team.yaml` to set:
- `run.goal` — the top-level objective for this run
- `adapters` — which adapter implementations to use
- `models.provider` — your LLM provider
- `retry_defaults` — per-failure-type retry budgets
Edit `config/role_registry.yaml` to map tier+role keys to agent persona files in `agents/`.
### Running (Phase 2)
```bash
# Phase 2 team_runner is stubbed — see core/team_runner.py
python -m core.team_runner --config config/team.yaml
```
---
## How to Add a New Adapter
1. **LLM adapter** — subclass `adapters/base/llm.py:LLMAdapter`, implement `complete()` and `resolve_model()`, place the file in `adapters/llm/`.
2. **VCS adapter** — subclass `adapters/base/vcs.py:VCSAdapter`, implement `create_branch()`, `commit()`, `create_pr()`, `get_pr_status()`, place in `adapters/vcs/`.
3. **Notify adapter** — subclass `adapters/base/notify.py:NotifyAdapter`, implement `send()`, place in `adapters/notify/`.
4. **Runtime adapter** — subclass `adapters/base/runtime.py:RuntimeAdapter`, implement `spawn()`, `get_result()`, `kill()`, place in `adapters/runtime/`.
5. Register the adapter key in `config/team.yaml` under `adapters.<type>: <your_key>`.
6. Add adapter loading logic in `core/team_runner.py` (Phase 2).
---
## How to Extend the Role Registry
The role registry (`config/role_registry.yaml`) maps tier+role keys to agent persona files in the `agents/` submodule.
To add a new role:
1. Confirm (or add) the persona `.md` file exists under `agents/`.
2. Add an entry under the appropriate tier key in `config/role_registry.yaml`:
```yaml
t4:
my_new_role: agents/engineering/engineering-my-new-role.md
```
3. Use `my_new_role` as the `role` field when constructing a `TaskBrief` for T4.
---
## Build Order Reference
The bootstrap was assembled in this order; Phase 2 continues from step 13.
| Step | Artifact | Description |
|---|---|---|
| 1 | `agents/` | Git submodule — agent persona library |
| 2 | Directory structure | `core/`, `adapters/`, `prompts/`, `config/`, `runs/` |
| 3 | `config/role_registry.yaml` | Tier→role→persona mapping |
| 4 | `core/task_brief.py` | Dataclass schema for all work units |
| 5 | `core/blackboard.py` | SQLite-backed shared state store |
| 6 | `adapters/base/*.py` | Abstract base classes for all adapter types |
| 7 | `core/escalation.py` | Failure classification and retry/escalate logic |
| 8 | `prompts/*.md` | System prompts for T1T5 agents |
| 9 | `config/team.yaml` | Run configuration and model map |
| 10 | `README.md` | This file |
| 11 | `requirements.txt` | Python dependencies |
| 12 | `.gitignore` | Standard Python ignores + run DB exclusion |
| 13 | Phase 2 stubs | Adapter implementations, team runner |
---
## Project Structure
```
the-agency/
├── agents/ # git submodule — agent persona .md files
├── core/
│ ├── task_brief.py # TaskBrief dataclass
│ ├── blackboard.py # SQLite-backed state store
│ ├── escalation.py # EscalationHandler
│ └── team_runner.py # [Phase 2] Orchestration entry point
├── adapters/
│ ├── base/ # Abstract base classes
│ ├── llm/ # LLM adapter implementations
│ ├── vcs/ # VCS adapter implementations
│ ├── notify/ # Notification adapter implementations
│ └── runtime/ # Agent runtime adapter implementations
├── prompts/ # T1T5 system prompts
├── config/
│ ├── team.yaml # Run configuration
│ └── role_registry.yaml # Tier/role → persona file mapping
├── runs/ # Per-run state (blackboard.db files)
├── requirements.txt
└── .gitignore
```

0
adapters/__init__.py Normal file
View File

View File

50
adapters/base/llm.py Normal file
View File

@@ -0,0 +1,50 @@
"""
adapters/base/llm.py
Abstract base class for all LLM adapters.
"""
from abc import ABC, abstractmethod
class LLMAdapter(ABC):
"""
Contract that every LLM provider adapter must fulfil.
Capability strings
------------------
"reasoning-heavy" — tasks requiring deep chain-of-thought (e.g. T1, T2)
"capable" — general-purpose capable model (e.g. T3, T4)
"fast-cheap" — high-volume, low-latency tasks (e.g. T5 quick checks)
"""
@abstractmethod
def complete(self, prompt: str, capability: str, context: dict) -> str:
"""
Send a prompt to the model and return the text response.
Parameters
----------
prompt : The full prompt string (system + user combined or just user).
capability : One of "reasoning-heavy" | "capable" | "fast-cheap".
context : Arbitrary key/value bag passed to the adapter (e.g. system
prompt override, temperature, max_tokens).
Returns
-------
The model's text completion as a plain string.
"""
...
@abstractmethod
def resolve_model(self, capability: str) -> str:
"""
Map a capability string to the concrete model identifier for this provider.
Parameters
----------
capability : One of "reasoning-heavy" | "capable" | "fast-cheap".
Returns
-------
Model identifier string (e.g. "claude-opus-4-6", "gpt-4o").
"""
...

21
adapters/base/notify.py Normal file
View File

@@ -0,0 +1,21 @@
"""
adapters/base/notify.py
Abstract base class for all notification adapters.
"""
from abc import ABC, abstractmethod
class NotifyAdapter(ABC):
"""Contract that every notification provider adapter must fulfil."""
@abstractmethod
def send(self, message: str, context: dict) -> None:
"""
Dispatch a notification.
Parameters
----------
message : Human-readable message text.
context : Arbitrary metadata (e.g. channel, severity, run_id, brief_id).
"""
...

64
adapters/base/runtime.py Normal file
View File

@@ -0,0 +1,64 @@
"""
adapters/base/runtime.py
Abstract base class for all agent-runtime adapters.
"""
from abc import ABC, abstractmethod
class RuntimeAdapter(ABC):
"""
Contract that every agent runtime adapter must fulfil.
A "runtime" is responsible for dispatching a task to an actual agent
(e.g. an OpenClaw worker, a Claude Code sub-agent, a local subprocess)
and retrieving its result.
"""
@abstractmethod
def spawn(self, task: str, capability: str, context: dict) -> str:
"""
Start an agent to work on the given task.
Parameters
----------
task : Natural-language description of the work to perform.
capability : Capability hint — "reasoning-heavy" | "capable" | "fast-cheap".
context : Arbitrary key/value bag (e.g. files, constraints, brief payload).
Returns
-------
A provider-specific agent_id string that can be used to poll for results.
"""
...
@abstractmethod
def get_result(self, agent_id: str, timeout_s: int) -> dict:
"""
Block until the agent completes or the timeout elapses.
Parameters
----------
agent_id : The id returned by spawn().
timeout_s : Maximum seconds to wait before raising TimeoutError.
Returns
-------
A dict containing at minimum:
{
"status": "done" | "failed" | "partial" | "blocked",
"output": <str or dict>,
"artifacts": [...], # optional
}
"""
...
@abstractmethod
def kill(self, agent_id: str) -> None:
"""
Terminate a running agent unconditionally.
Parameters
----------
agent_id : The id returned by spawn().
"""
...

69
adapters/base/vcs.py Normal file
View File

@@ -0,0 +1,69 @@
"""
adapters/base/vcs.py
Abstract base class for all Version Control System adapters.
"""
from abc import ABC, abstractmethod
class VCSAdapter(ABC):
"""Contract that every VCS provider adapter must fulfil."""
@abstractmethod
def create_branch(self, name: str) -> None:
"""
Create a new branch with the given name.
Parameters
----------
name : Branch name (e.g. "feat/webhook-ingestion").
"""
...
@abstractmethod
def commit(self, files: list[str], message: str) -> str:
"""
Stage the given files and create a commit.
Parameters
----------
files : List of file paths to stage (relative to repo root).
message : Commit message.
Returns
-------
The commit SHA as a string.
"""
...
@abstractmethod
def create_pr(self, title: str, body: str, head: str, base: str) -> str:
"""
Open a pull request.
Parameters
----------
title : PR title.
body : PR description / body markdown.
head : Head branch name.
base : Base branch name (e.g. "main").
Returns
-------
The URL of the created pull request.
"""
...
@abstractmethod
def get_pr_status(self, pr_id: str) -> str:
"""
Fetch the current status of a pull request.
Parameters
----------
pr_id : Provider-specific PR identifier (number, node-id, or URL).
Returns
-------
One of: "open" | "merged" | "closed".
"""
...

0
adapters/llm/__init__.py Normal file
View File

44
adapters/llm/anthropic.py Normal file
View File

@@ -0,0 +1,44 @@
"""
adapters/llm/anthropic.py
Anthropic Claude adapter — Phase 2 stub.
TODO (Phase 2):
- Implement complete() using the anthropic SDK (anthropic.Anthropic client).
- Implement resolve_model() by reading config/team.yaml capability_map.
- Handle streaming responses, rate-limit retries, and token counting.
- Support system-prompt injection via context["system_prompt"].
- Map capability → model using the provider's capability_map config.
"""
from __future__ import annotations
from adapters.base.llm import LLMAdapter
class AnthropicAdapter(LLMAdapter):
"""
LLM adapter for Anthropic Claude models.
Reads model configuration from config/team.yaml:
models.provider: anthropic
models.capability_map.reasoning-heavy.anthropic: claude-opus-4-6
models.capability_map.capable.anthropic: claude-sonnet-4-6
models.capability_map.fast-cheap.anthropic: claude-haiku-3-5
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract API key from environment (ANTHROPIC_API_KEY).
# Initialise the anthropic.Anthropic() client.
raise NotImplementedError("AnthropicAdapter.__init__ is not yet implemented.")
def complete(self, prompt: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Call anthropic client messages.create().
# Use resolve_model(capability) to pick the model.
# Support context keys: system_prompt, max_tokens, temperature.
# Return response text as a plain string.
raise NotImplementedError("AnthropicAdapter.complete is not yet implemented.")
def resolve_model(self, capability: str) -> str:
# TODO (Phase 2): Look up capability in team.yaml capability_map.
# Fall back to "capable" tier model if capability is unknown.
raise NotImplementedError("AnthropicAdapter.resolve_model is not yet implemented.")

View File

View File

@@ -0,0 +1,35 @@
"""
adapters/notify/openclaw.py
OpenClaw notification adapter — Phase 2 stub.
TODO (Phase 2):
- Implement send() to dispatch notifications via the OpenClaw API.
- Support context keys: channel, severity, run_id, brief_id.
- Read endpoint and credentials from environment (OPENCLAW_API_KEY, OPENCLAW_URL).
- Handle rate limiting and delivery retries.
"""
from __future__ import annotations
from adapters.base.notify import NotifyAdapter
class OpenClawNotifyAdapter(NotifyAdapter):
"""
Notification adapter that sends messages via OpenClaw.
Expects environment variables:
OPENCLAW_API_KEY — authentication token
OPENCLAW_URL — base URL for the OpenClaw API (optional, defaults to hosted)
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract OPENCLAW_API_KEY and OPENCLAW_URL from environment.
# Initialise an HTTP client (e.g. httpx or requests).
raise NotImplementedError("OpenClawNotifyAdapter.__init__ is not yet implemented.")
def send(self, message: str, context: dict) -> None:
# TODO (Phase 2): POST notification payload to OpenClaw API.
# Include message, context (channel, severity, run_id, brief_id).
# Log delivery confirmation or raise on failure.
raise NotImplementedError("OpenClawNotifyAdapter.send is not yet implemented.")

View File

View File

@@ -0,0 +1,51 @@
"""
adapters/runtime/claude_code.py
Claude Code agent runtime adapter — Phase 2 stub.
TODO (Phase 2):
- Implement spawn() to launch a Claude Code sub-agent via the Agent SDK.
- Implement get_result() to await agent completion and parse the output.
- Implement kill() to terminate the sub-agent process or session.
- Map task brief context (files, constraints, artifacts) into the agent's
system prompt and tool context.
- Handle Claude Code tool-use responses and extract structured output.
"""
from __future__ import annotations
from adapters.base.runtime import RuntimeAdapter
class ClaudeCodeRuntimeAdapter(RuntimeAdapter):
"""
Runtime adapter that spawns Claude Code sub-agents for coding tasks.
Used when a TaskBrief has preferred_runtime == "coding_agent".
Expects the Claude Code CLI / Agent SDK to be available in the environment.
Credentials are inherited from the environment (ANTHROPIC_API_KEY).
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Validate that Claude Code CLI or SDK is accessible.
# Initialise any agent session management state.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.__init__ is not yet implemented.")
def spawn(self, task: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Launch a Claude Code sub-agent.
# Compose a structured system prompt from task + context.
# Inject relevant files and constraints as tool context.
# Return an agent_id that maps to a running agent session.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.spawn is not yet implemented.")
def get_result(self, agent_id: str, timeout_s: int) -> dict:
# TODO (Phase 2): Await the Claude Code agent session to complete.
# Parse the agent's final message for structured JSON output.
# Return dict with: {"status": ..., "output": ..., "artifacts": [...]}.
# Raise TimeoutError if timeout_s elapses.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.get_result is not yet implemented.")
def kill(self, agent_id: str) -> None:
# TODO (Phase 2): Terminate the Claude Code agent session.
# Clean up any temporary files or session state.
raise NotImplementedError("ClaudeCodeRuntimeAdapter.kill is not yet implemented.")

View File

@@ -0,0 +1,48 @@
"""
adapters/runtime/openclaw.py
OpenClaw agent runtime adapter — Phase 2 stub.
TODO (Phase 2):
- Implement spawn() to submit a task to an OpenClaw worker pool.
- Implement get_result() to poll or subscribe for agent completion.
- Implement kill() to cancel a running OpenClaw agent job.
- Read endpoint and credentials from environment (OPENCLAW_API_KEY, OPENCLAW_URL).
- Map capability hint to an appropriate worker class/queue.
"""
from __future__ import annotations
from adapters.base.runtime import RuntimeAdapter
class OpenClawRuntimeAdapter(RuntimeAdapter):
"""
Runtime adapter that dispatches agent tasks to OpenClaw workers.
Expects environment variables:
OPENCLAW_API_KEY — authentication token
OPENCLAW_URL — base URL for the OpenClaw API
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract OPENCLAW_API_KEY and OPENCLAW_URL from environment.
# Initialise HTTP client and any job-tracking state.
raise NotImplementedError("OpenClawRuntimeAdapter.__init__ is not yet implemented.")
def spawn(self, task: str, capability: str, context: dict) -> str:
# TODO (Phase 2): Submit task to OpenClaw worker pool.
# Map capability ("reasoning-heavy" | "capable" | "fast-cheap") to
# an appropriate worker queue or model hint.
# Return an agent_id string that can be used to poll for results.
raise NotImplementedError("OpenClawRuntimeAdapter.spawn is not yet implemented.")
def get_result(self, agent_id: str, timeout_s: int) -> dict:
# TODO (Phase 2): Poll or long-poll the OpenClaw API for job completion.
# Raise TimeoutError if timeout_s elapses before the job finishes.
# Return a dict with at minimum: {"status": ..., "output": ..., "artifacts": [...]}.
raise NotImplementedError("OpenClawRuntimeAdapter.get_result is not yet implemented.")
def kill(self, agent_id: str) -> None:
# TODO (Phase 2): Send a cancellation request to the OpenClaw API.
# Silently succeed if the agent has already finished.
raise NotImplementedError("OpenClawRuntimeAdapter.kill is not yet implemented.")

0
adapters/vcs/__init__.py Normal file
View File

51
adapters/vcs/github.py Normal file
View File

@@ -0,0 +1,51 @@
"""
adapters/vcs/github.py
GitHub VCS adapter — Phase 2 stub.
TODO (Phase 2):
- Implement create_branch() using PyGithub or gh CLI subprocess.
- Implement commit() — stage files and push via git subprocess or API.
- Implement create_pr() using GitHub REST API (POST /repos/{owner}/{repo}/pulls).
- Implement get_pr_status() using GET /repos/{owner}/{repo}/pulls/{pull_number}.
- Read repo and credentials from config/team.yaml and environment (GITHUB_TOKEN).
"""
from __future__ import annotations
from adapters.base.vcs import VCSAdapter
class GitHubAdapter(VCSAdapter):
"""
VCS adapter for GitHub repositories.
Expects environment variable GITHUB_TOKEN and config values:
run.repo — SSH or HTTPS clone URL
run.base_branch — default base branch (e.g. "main")
"""
def __init__(self, config: dict) -> None:
# TODO (Phase 2): Accept loaded team.yaml config dict.
# Extract GITHUB_TOKEN from environment.
# Parse owner/repo from config.run.repo.
raise NotImplementedError("GitHubAdapter.__init__ is not yet implemented.")
def create_branch(self, name: str) -> None:
# TODO (Phase 2): Create branch via GitHub API or local git subprocess.
# Use config.run.base_branch as the branch point.
raise NotImplementedError("GitHubAdapter.create_branch is not yet implemented.")
def commit(self, files: list[str], message: str) -> str:
# TODO (Phase 2): Stage files (git add), create commit (git commit), push.
# Return the resulting commit SHA.
raise NotImplementedError("GitHubAdapter.commit is not yet implemented.")
def create_pr(self, title: str, body: str, head: str, base: str) -> str:
# TODO (Phase 2): POST to GitHub API /repos/{owner}/{repo}/pulls.
# Return the HTML URL of the created PR.
raise NotImplementedError("GitHubAdapter.create_pr is not yet implemented.")
def get_pr_status(self, pr_id: str) -> str:
# TODO (Phase 2): GET /repos/{owner}/{repo}/pulls/{number}.
# Map GitHub PR state ("open", "closed") + merged flag to
# our schema: "open" | "merged" | "closed".
raise NotImplementedError("GitHubAdapter.get_pr_status is not yet implemented.")

1
agents Submodule

Submodule agents added at 5c669c28e6

34
config/role_registry.yaml Normal file
View File

@@ -0,0 +1,34 @@
t1:
default: agents/strategy/nexus-strategy.md
t2:
backend: agents/engineering/engineering-software-architect.md
frontend: agents/engineering/engineering-software-architect.md
infra: agents/engineering/engineering-devops-automator.md
data: agents/engineering/engineering-data-engineer.md
default: agents/engineering/engineering-software-architect.md
t3:
backend: agents/engineering/engineering-senior-developer.md
frontend: agents/engineering/engineering-senior-developer.md
infra: agents/engineering/engineering-sre.md
default: agents/engineering/engineering-senior-developer.md
t4:
frontend: agents/engineering/engineering-frontend-developer.md
backend: agents/engineering/engineering-backend-architect.md
database: agents/engineering/engineering-database-optimizer.md
devops: agents/engineering/engineering-devops-automator.md
mobile: agents/engineering/engineering-mobile-app-builder.md
ai: agents/engineering/engineering-ai-engineer.md
security: agents/engineering/engineering-security-engineer.md
docs: agents/engineering/engineering-technical-writer.md
default: agents/engineering/engineering-senior-developer.md
t5:
code: agents/engineering/engineering-code-reviewer.md
integration: agents/testing/testing-reality-checker.md
api: agents/testing/testing-api-tester.md
performance: agents/testing/testing-performance-benchmarker.md
security: agents/engineering/engineering-security-engineer.md
default: agents/engineering/engineering-code-reviewer.md

36
config/team.yaml Normal file
View File

@@ -0,0 +1,36 @@
run:
goal: "Build webhook ingestion system with retry logic and DLQ"
repo: "git@github.com:org/repo.git"
base_branch: "main"
adapters:
llm: anthropic
vcs: github
notify: openclaw
runtime: openclaw
models:
provider: anthropic
capability_map:
reasoning-heavy:
anthropic: claude-opus-4-6
openai: o3
capable:
anthropic: claude-sonnet-4-6
openai: gpt-4o
ollama: llama3.1:70b
fast-cheap:
anthropic: claude-haiku-3-5
openai: gpt-4o-mini
ollama: llama3.2
tier_overrides: {}
runtime:
default: openclaw
coding_agent: claude_code
native_teams: false
retry_defaults:
bad_output: 3
partial: 2
blocked: 0

0
core/__init__.py Normal file
View File

369
core/blackboard.py Normal file
View File

@@ -0,0 +1,369 @@
"""
core/blackboard.py
SQLite-backed shared state store for a single orchestration run.
One database is created at: runs/<run_id>/blackboard.db
All methods are synchronous and thread-safe at the connection level
(SQLite WAL mode + check_same_thread=False with an explicit lock).
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Import TaskBrief only for type hints to avoid circular imports at runtime.
# ---------------------------------------------------------------------------
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.task_brief import TaskBrief
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _new_uuid() -> str:
return str(uuid.uuid4())
# ---------------------------------------------------------------------------
# SQL schema
# ---------------------------------------------------------------------------
_SCHEMA = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
goal TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|review|done|failed
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS workstreams (
workstream_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
name TEXT NOT NULL,
tier INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|blocked|done|failed
owner_agent_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(run_id)
);
CREATE TABLE IF NOT EXISTS briefs (
brief_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
parent_brief_id TEXT,
workstream_id TEXT,
tier INTEGER NOT NULL,
role TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|done|failed
payload TEXT, -- JSON-serialised TaskBrief.to_dict()
result TEXT, -- JSON result from the agent
retry_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(run_id)
);
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
brief_id TEXT, -- NULL for run-level events
kind TEXT NOT NULL, -- spawned|completed|failed|escalated|retried
detail TEXT, -- JSON
created_at TEXT NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(run_id)
);
"""
# Valid status values per table — used for input validation.
_RUN_STATUSES = {"pending", "active", "review", "done", "failed"}
_WS_STATUSES = {"pending", "active", "blocked", "done", "failed"}
_BRIEF_STATUSES = {"pending", "active", "done", "failed"}
_EVENT_KINDS = {"spawned", "completed", "failed", "escalated", "retried"}
# ---------------------------------------------------------------------------
# Blackboard
# ---------------------------------------------------------------------------
class Blackboard:
"""
Shared, persistent state store for one orchestration run.
Usage
-----
bb = Blackboard(run_id="abc123")
bb.create_run(goal="Build webhook ingestion system")
...
summary = bb.get_run_summary()
"""
def __init__(self, run_id: str) -> None:
self.run_id = run_id
self._run_dir = os.path.join("runs", run_id)
self._db_path = os.path.join(self._run_dir, "blackboard.db")
self._lock = threading.Lock()
# Ensure the run directory exists.
os.makedirs(self._run_dir, exist_ok=True)
# Open a persistent connection (thread-safe via explicit lock).
self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
# Initialise schema.
with self._lock:
self._conn.executescript(_SCHEMA)
self._conn.commit()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _execute(
self,
sql: str,
params: tuple[Any, ...] = (),
*,
commit: bool = False,
) -> sqlite3.Cursor:
with self._lock:
cur = self._conn.execute(sql, params)
if commit:
self._conn.commit()
return cur
def _executemany(
self,
sql: str,
params_seq: list[tuple[Any, ...]],
*,
commit: bool = False,
) -> None:
with self._lock:
self._conn.executemany(sql, params_seq)
if commit:
self._conn.commit()
# ------------------------------------------------------------------
# Run
# ------------------------------------------------------------------
def create_run(self, goal: str) -> None:
"""Insert a new run row. Status defaults to 'pending'."""
now = _now_iso()
self._execute(
"INSERT OR IGNORE INTO runs (run_id, goal, status, created_at, updated_at) "
"VALUES (?, ?, 'pending', ?, ?)",
(self.run_id, goal, now, now),
commit=True,
)
def update_run_status(self, status: str) -> None:
"""Update run status. Must be one of: pending|active|review|done|failed."""
if status not in _RUN_STATUSES:
raise ValueError(f"Invalid run status {status!r}. Must be one of {_RUN_STATUSES}.")
now = _now_iso()
self._execute(
"UPDATE runs SET status=?, updated_at=? WHERE run_id=?",
(status, now, self.run_id),
commit=True,
)
# ------------------------------------------------------------------
# Workstreams
# ------------------------------------------------------------------
def create_workstream(
self,
*,
workstream_id: Optional[str] = None,
name: str,
tier: int,
owner_agent_id: Optional[str] = None,
) -> str:
"""Create a workstream row and return its workstream_id."""
ws_id = workstream_id or _new_uuid()
now = _now_iso()
self._execute(
"INSERT OR IGNORE INTO workstreams "
"(workstream_id, run_id, name, tier, status, owner_agent_id, created_at, updated_at) "
"VALUES (?, ?, ?, ?, 'pending', ?, ?, ?)",
(ws_id, self.run_id, name, tier, owner_agent_id, now, now),
commit=True,
)
return ws_id
def update_workstream_status(self, workstream_id: str, status: str) -> None:
"""Update workstream status. Must be one of: pending|active|blocked|done|failed."""
if status not in _WS_STATUSES:
raise ValueError(f"Invalid workstream status {status!r}. Must be one of {_WS_STATUSES}.")
now = _now_iso()
self._execute(
"UPDATE workstreams SET status=?, updated_at=? WHERE workstream_id=?",
(status, now, workstream_id),
commit=True,
)
# ------------------------------------------------------------------
# Briefs
# ------------------------------------------------------------------
def create_brief(self, brief: "TaskBrief", workstream_id: Optional[str] = None) -> None:
"""Persist a TaskBrief. The full brief is stored as JSON in `payload`."""
now = _now_iso()
payload_json = json.dumps(brief.to_dict())
self._execute(
"INSERT OR IGNORE INTO briefs "
"(brief_id, run_id, parent_brief_id, workstream_id, tier, role, "
" status, payload, result, retry_count, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, NULL, ?, ?, ?)",
(
brief.brief_id,
self.run_id,
brief.parent_brief_id,
workstream_id,
brief.tier,
brief.role,
payload_json,
brief.retry_count,
now,
now,
),
commit=True,
)
def update_brief_status(self, brief_id: str, status: str) -> None:
"""Update brief status. Must be one of: pending|active|done|failed."""
if status not in _BRIEF_STATUSES:
raise ValueError(f"Invalid brief status {status!r}. Must be one of {_BRIEF_STATUSES}.")
now = _now_iso()
self._execute(
"UPDATE briefs SET status=?, updated_at=? WHERE brief_id=?",
(status, now, brief_id),
commit=True,
)
def update_brief_result(self, brief_id: str, result: dict[str, Any]) -> None:
"""Store the agent result JSON for a brief and mark it done."""
now = _now_iso()
result_json = json.dumps(result)
self._execute(
"UPDATE briefs SET result=?, status='done', updated_at=? WHERE brief_id=?",
(result_json, now, brief_id),
commit=True,
)
def increment_brief_retry(self, brief_id: str) -> None:
"""Bump the retry_count column for a brief."""
now = _now_iso()
self._execute(
"UPDATE briefs SET retry_count = retry_count + 1, updated_at=? WHERE brief_id=?",
(now, brief_id),
commit=True,
)
# ------------------------------------------------------------------
# Events
# ------------------------------------------------------------------
def log_event(
self,
kind: str,
brief_id: Optional[str] = None,
detail: Optional[dict[str, Any]] = None,
) -> str:
"""
Append an event to the events table.
Parameters
----------
kind : One of spawned|completed|failed|escalated|retried.
brief_id : Associated brief, or None for run-level events.
detail : Arbitrary JSON-serialisable dict.
Returns the new event_id.
"""
if kind not in _EVENT_KINDS:
raise ValueError(f"Invalid event kind {kind!r}. Must be one of {_EVENT_KINDS}.")
event_id = _new_uuid()
now = _now_iso()
detail_json = json.dumps(detail or {})
self._execute(
"INSERT INTO events (event_id, run_id, brief_id, kind, detail, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(event_id, self.run_id, brief_id, kind, detail_json, now),
commit=True,
)
return event_id
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
def get_run_summary(self) -> dict[str, Any]:
"""
Return a snapshot of the run state including workstream and brief
counts broken down by status.
"""
# Run row
run_row = self._execute(
"SELECT * FROM runs WHERE run_id=?", (self.run_id,)
).fetchone()
if run_row is None:
return {"error": f"No run found for run_id={self.run_id!r}"}
run_data: dict[str, Any] = dict(run_row)
# Workstream status counts
ws_rows = self._execute(
"SELECT status, COUNT(*) AS cnt FROM workstreams WHERE run_id=? GROUP BY status",
(self.run_id,),
).fetchall()
run_data["workstreams"] = {r["status"]: r["cnt"] for r in ws_rows}
# Brief status counts
brief_rows = self._execute(
"SELECT status, COUNT(*) AS cnt FROM briefs WHERE run_id=? GROUP BY status",
(self.run_id,),
).fetchall()
run_data["briefs"] = {r["status"]: r["cnt"] for r in brief_rows}
# Event kind counts
event_rows = self._execute(
"SELECT kind, COUNT(*) AS cnt FROM events WHERE run_id=? GROUP BY kind",
(self.run_id,),
).fetchall()
run_data["events"] = {r["kind"]: r["cnt"] for r in event_rows}
return run_data
# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
def close(self) -> None:
"""Close the database connection gracefully."""
with self._lock:
self._conn.close()
def __repr__(self) -> str:
return f"Blackboard(run_id={self.run_id!r}, db={self._db_path!r})"

245
core/escalation.py Normal file
View File

@@ -0,0 +1,245 @@
"""
core/escalation.py
Failure classification and escalation logic for the-agency pipeline.
When an agent returns a result the EscalationHandler decides what to do next:
retry, escalate, salvage-and-retry, or mark complete.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
from core.task_brief import TaskBrief
# ---------------------------------------------------------------------------
# FailureType
# ---------------------------------------------------------------------------
class FailureType(str, Enum):
"""Classification of an agent result."""
SUCCESS = "success"
"""Agent completed the task and the output looks valid."""
BAD_OUTPUT = "bad_output"
"""Agent returned output that fails acceptance criteria or is malformed."""
PARTIAL = "partial"
"""Agent completed some but not all of the required work."""
BLOCKED = "blocked"
"""Agent could not proceed due to a dependency or missing information."""
# ---------------------------------------------------------------------------
# EscalationResult
# ---------------------------------------------------------------------------
@dataclass
class EscalationResult:
"""
The decision produced by EscalationHandler.handle().
Attributes
----------
action : What the runner should do next.
"retry" — resubmit amended_brief as-is.
"escalate" — pass the failure up to the parent tier.
"salvage_and_retry" — use salvaged_result as partial context
and resubmit amended_brief.
"complete" — accept the result and move on.
amended_brief : A (possibly modified) TaskBrief to re-submit.
None when action == "complete" or "escalate".
salvaged_result : Partial output worth keeping from a PARTIAL result.
None unless action == "salvage_and_retry".
reason : Human-readable explanation of the decision.
"""
action: str # "retry" | "escalate" | "salvage_and_retry" | "complete"
amended_brief: Optional[TaskBrief] = None
salvaged_result: Optional[dict[str, Any]] = None
reason: str = ""
def __post_init__(self) -> None:
valid_actions = {"retry", "escalate", "salvage_and_retry", "complete"}
if self.action not in valid_actions:
raise ValueError(f"Invalid action {self.action!r}. Must be one of {valid_actions}.")
# ---------------------------------------------------------------------------
# EscalationHandler
# ---------------------------------------------------------------------------
class EscalationHandler:
"""
Stateless handler that classifies agent results and produces escalation
decisions.
Usage
-----
handler = EscalationHandler()
failure_type = handler.classify_result(result)
decision = handler.handle(brief, result)
"""
# ------------------------------------------------------------------
# Classification
# ------------------------------------------------------------------
def classify_result(self, result: dict[str, Any]) -> FailureType:
"""
Inspect the result dict returned by an agent and return a FailureType.
The result dict is expected to contain at minimum a "status" key.
Recognised status values:
"done" / "passed" → SUCCESS
"partial" → PARTIAL
"blocked" → BLOCKED
"failed" → BAD_OUTPUT (default for unknown statuses too)
The handler also inspects an optional "blocking_issues" or "blockers"
key: if present and non-empty the result is treated as BLOCKED even if
status says "failed".
"""
status = str(result.get("status", "")).lower().strip()
# Explicit blocked signals
if status == "blocked":
return FailureType.BLOCKED
# Check for blocking_issues / blockers fields
blocking_issues: list = result.get("blocking_issues") or result.get("blockers") or []
if blocking_issues:
return FailureType.BLOCKED
if status in ("done", "passed"):
return FailureType.SUCCESS
if status == "partial":
return FailureType.PARTIAL
# "failed" or anything unrecognised
return FailureType.BAD_OUTPUT
# ------------------------------------------------------------------
# Decision
# ------------------------------------------------------------------
def handle(self, brief: TaskBrief, result: dict[str, Any]) -> EscalationResult:
"""
Decide what to do with an agent result.
Rules
-----
BLOCKED → escalate immediately (no retries for blocked states)
BAD_OUTPUT + retries remaining → retry with amended brief
(retry_count incremented)
BAD_OUTPUT + retries exhausted → escalate
PARTIAL → salvage the good parts and retry with amended brief
SUCCESS → complete
Parameters
----------
brief : The TaskBrief that was executed.
result : The dict returned by the agent runtime.
Returns
-------
An EscalationResult with the recommended action.
"""
failure_type = self.classify_result(result)
# ---- SUCCESS ---------------------------------------------------
if failure_type == FailureType.SUCCESS:
return EscalationResult(
action="complete",
reason="Agent reported success; acceptance criteria satisfied.",
)
# ---- BLOCKED ---------------------------------------------------
if failure_type == FailureType.BLOCKED:
blocking_issues = (
result.get("blocking_issues")
or result.get("blockers")
or [result.get("reason", "unspecified blocker")]
)
return EscalationResult(
action="escalate",
reason=(
f"Agent is blocked and cannot proceed. "
f"Blockers: {blocking_issues}"
),
)
# ---- PARTIAL ---------------------------------------------------
if failure_type == FailureType.PARTIAL:
salvaged: dict[str, Any] = {}
if "completed" in result:
salvaged["completed"] = result["completed"]
if "artifacts" in result:
salvaged["artifacts"] = result["artifacts"]
amended = self._amend_brief_for_retry(brief, result, failure_type)
return EscalationResult(
action="salvage_and_retry",
amended_brief=amended,
salvaged_result=salvaged or result,
reason=(
"Agent returned partial output. "
"Salvaging completed artifacts and retrying remaining work."
),
)
# ---- BAD_OUTPUT ------------------------------------------------
# failure_type == FailureType.BAD_OUTPUT
retries_remaining = brief.retry_budget - brief.retry_count
if retries_remaining > 0:
amended = self._amend_brief_for_retry(brief, result, failure_type)
return EscalationResult(
action="retry",
amended_brief=amended,
reason=(
f"Agent returned bad output. "
f"Retrying (attempt {amended.retry_count}/{amended.retry_budget})."
),
)
else:
return EscalationResult(
action="escalate",
reason=(
f"Agent returned bad output and retry budget "
f"({brief.retry_budget}) is exhausted. Escalating."
),
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _amend_brief_for_retry(
self,
brief: TaskBrief,
result: dict[str, Any],
failure_type: FailureType,
) -> TaskBrief:
"""
Return a copy of the brief with retry_count incremented and
previous-failure context injected into the context dict.
"""
import copy
amended = copy.deepcopy(brief)
amended.retry_count += 1
# Inject failure context so the agent knows what went wrong.
amended.context["_previous_failure"] = {
"failure_type": failure_type.value,
"result": result,
"attempt": amended.retry_count,
}
return amended

213
core/task_brief.py Normal file
View File

@@ -0,0 +1,213 @@
"""
core/task_brief.py
Dataclass-based schema for a TaskBrief — the fundamental unit of work passed
between tiers in the-agency pipeline.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _now_iso() -> str:
"""Return the current UTC time as an ISO-8601 string."""
return datetime.now(timezone.utc).isoformat()
def _new_uuid() -> str:
return str(uuid.uuid4())
# ---------------------------------------------------------------------------
# TaskBrief
# ---------------------------------------------------------------------------
@dataclass
class TaskBrief:
"""
Immutable-intent carrier that travels down (and sometimes back up) the
T1→T5 tier hierarchy.
Fields
------
brief_id : Unique identifier for this brief (UUID).
run_id : Identifier for the top-level orchestration run.
parent_brief_id : brief_id of the parent, None for the T1 root brief.
tier : Tier level 15.
role : Role key used to look up an agent personality file.
goal_anchor : High-level goal set by T1; NEVER mutated by child tiers.
workstream : Logical workstream name (e.g. "backend", "infra").
task : Concrete description of what this brief asks for.
acceptance_criteria : Ordered list of pass/fail criteria.
constraints : Hard constraints the agent must respect.
context : Arbitrary key/value context bag.
retry_budget : Maximum number of retry attempts allowed.
retry_count : How many retries have been consumed so far.
preferred_runtime : Execution runtime hint ("standard" | "coding_agent").
agent_personality : Optional path to an agent .md persona file.
created_at : ISO-8601 creation timestamp (UTC).
"""
# Identity
brief_id: str = field(default_factory=_new_uuid)
run_id: str = field(default_factory=_new_uuid)
parent_brief_id: Optional[str] = None
# Tier / role
tier: int = 1
role: str = ""
# Goal — set once by T1, propagated unchanged
goal_anchor: str = ""
# Work description
workstream: str = ""
task: str = ""
acceptance_criteria: list[str] = field(default_factory=list)
constraints: list[str] = field(default_factory=list)
context: dict[str, Any] = field(default_factory=dict)
# Retry tracking
retry_budget: int = 3
retry_count: int = 0
# Runtime / persona
preferred_runtime: str = "standard" # "standard" | "coding_agent"
agent_personality: Optional[str] = None # path to .md file
# Metadata
created_at: str = field(default_factory=_now_iso)
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def validate(self) -> None:
"""
Raise ValueError if any required field is missing or out of range.
Call this before handing a brief to a runner or storing it.
"""
errors: list[str] = []
if not self.brief_id:
errors.append("brief_id must not be empty")
if not self.run_id:
errors.append("run_id must not be empty")
if self.tier not in range(1, 6):
errors.append(f"tier must be 15, got {self.tier!r}")
if not self.role:
errors.append("role must not be empty")
if not self.goal_anchor:
errors.append("goal_anchor must not be empty")
if not self.task:
errors.append("task must not be empty")
if self.retry_budget < 0:
errors.append(f"retry_budget must be >= 0, got {self.retry_budget}")
if self.retry_count < 0:
errors.append(f"retry_count must be >= 0, got {self.retry_count}")
if self.retry_count > self.retry_budget:
errors.append(
f"retry_count ({self.retry_count}) exceeds retry_budget ({self.retry_budget})"
)
if self.preferred_runtime not in ("standard", "coding_agent"):
errors.append(
f"preferred_runtime must be 'standard' or 'coding_agent', got {self.preferred_runtime!r}"
)
if errors:
raise ValueError("TaskBrief validation failed:\n" + "\n".join(f" - {e}" for e in errors))
# ------------------------------------------------------------------
# Serialisation
# ------------------------------------------------------------------
def to_dict(self) -> dict[str, Any]:
"""Serialise to a plain Python dict (JSON-safe)."""
return {
"brief_id": self.brief_id,
"run_id": self.run_id,
"parent_brief_id": self.parent_brief_id,
"tier": self.tier,
"role": self.role,
"goal_anchor": self.goal_anchor,
"workstream": self.workstream,
"task": self.task,
"acceptance_criteria": list(self.acceptance_criteria),
"constraints": list(self.constraints),
"context": dict(self.context),
"retry_budget": self.retry_budget,
"retry_count": self.retry_count,
"preferred_runtime": self.preferred_runtime,
"agent_personality": self.agent_personality,
"created_at": self.created_at,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "TaskBrief":
"""Deserialise from a plain dict. Unknown keys are silently ignored."""
known_fields = {f.name for f in cls.__dataclass_fields__.values()} # type: ignore[attr-defined]
filtered = {k: v for k, v in data.items() if k in known_fields}
return cls(**filtered)
# ------------------------------------------------------------------
# Factory: child brief
# ------------------------------------------------------------------
def make_child_brief(
self,
*,
tier: int,
role: str,
task: str,
workstream: str = "",
acceptance_criteria: Optional[list[str]] = None,
constraints: Optional[list[str]] = None,
context: Optional[dict[str, Any]] = None,
preferred_runtime: str = "standard",
agent_personality: Optional[str] = None,
retry_budget: int = 3,
) -> "TaskBrief":
"""
Create a child brief that inherits run_id and — critically —
goal_anchor verbatim from this parent.
The child's parent_brief_id is set to this brief's brief_id.
"""
return TaskBrief(
# New identity
brief_id=_new_uuid(),
run_id=self.run_id,
parent_brief_id=self.brief_id,
# Tier / role
tier=tier,
role=role,
# goal_anchor is ALWAYS copied unchanged from parent
goal_anchor=self.goal_anchor,
# Work specifics supplied by caller
workstream=workstream or self.workstream,
task=task,
acceptance_criteria=acceptance_criteria or [],
constraints=constraints or list(self.constraints),
context=context or {},
# Runtime
retry_budget=retry_budget,
retry_count=0,
preferred_runtime=preferred_runtime,
agent_personality=agent_personality,
)
# ------------------------------------------------------------------
# Repr
# ------------------------------------------------------------------
def __repr__(self) -> str:
return (
f"TaskBrief(brief_id={self.brief_id!r}, tier={self.tier}, "
f"role={self.role!r}, workstream={self.workstream!r})"
)

99
core/team_runner.py Normal file
View File

@@ -0,0 +1,99 @@
"""
core/team_runner.py
Top-level orchestration entry point — Phase 2 stub.
The TeamRunner is responsible for:
1. Loading config/team.yaml and config/role_registry.yaml.
2. Instantiating the correct adapter implementations (LLM, VCS, notify, runtime).
3. Creating a Blackboard for the run.
4. Constructing the root T1 TaskBrief and dispatching it to the T1 Visionary.
5. Recursively spawning T2→T5 briefs based on tier outputs.
6. Using EscalationHandler to manage retries, salvage, and escalation.
7. Writing final run status and summary to the Blackboard.
TODO (Phase 2):
- Load and validate team.yaml configuration.
- Build adapter registry (map adapter keys → concrete adapter classes).
- Implement tier dispatch loop: T1 → T2 (per workstream) → T3 → T4 → T5.
- Parse tier JSON outputs into child TaskBrief objects via make_child_brief().
- Integrate EscalationHandler into the dispatch loop.
- Support --dry-run flag (log actions without executing).
- Emit blackboard events at each stage (spawned, completed, failed, etc.).
- Expose a CLI entry point (argparse or click).
"""
from __future__ import annotations
# TODO (Phase 2): Uncomment and implement imports as adapters are built.
# import argparse
# import yaml
# from core.task_brief import TaskBrief
# from core.blackboard import Blackboard
# from core.escalation import EscalationHandler
# from adapters.llm.anthropic import AnthropicAdapter
# from adapters.vcs.github import GitHubAdapter
# from adapters.notify.openclaw import OpenClawNotifyAdapter
# from adapters.runtime.openclaw import OpenClawRuntimeAdapter
# from adapters.runtime.claude_code import ClaudeCodeRuntimeAdapter
class TeamRunner:
"""
Orchestrates a full T1→T5 agent pipeline run.
Usage (Phase 2)::
runner = TeamRunner(config_path="config/team.yaml")
runner.run()
"""
def __init__(self, config_path: str = "config/team.yaml") -> None:
# TODO (Phase 2): Load YAML config.
# Instantiate adapters based on config.adapters keys.
# Create a Blackboard for this run.
raise NotImplementedError("TeamRunner.__init__ is not yet implemented.")
def run(self) -> None:
"""
Execute the full pipeline from T1 decomposition through T5 verification.
TODO (Phase 2):
- Build root T1 brief from config.run.goal.
- Dispatch to T1 Visionary via LLM adapter.
- Parse workstreams from T1 output.
- For each workstream: dispatch T2 Architect.
- For each T2 subtask: dispatch T3 Squad Lead.
- For each T3 task: dispatch T4 Implementer.
- For each T4 artifact set: dispatch T5 Verifier.
- Run escalation handler at each tier on failure.
- Commit passing artifacts via VCS adapter.
- Notify on completion or terminal failure via notify adapter.
"""
raise NotImplementedError("TeamRunner.run is not yet implemented.")
def _dispatch_brief(self, brief) -> dict:
"""
Send a single TaskBrief to the appropriate agent and return the result.
TODO (Phase 2):
- Select runtime based on brief.preferred_runtime.
- Load agent personality from brief.agent_personality (if set).
- Compose prompt from tier system prompt + brief payload.
- Spawn agent via runtime adapter.
- Await result via runtime.get_result().
- Log spawned/completed/failed events to Blackboard.
"""
raise NotImplementedError("TeamRunner._dispatch_brief is not yet implemented.")
# ---------------------------------------------------------------------------
# CLI entry point (Phase 2)
# ---------------------------------------------------------------------------
# TODO (Phase 2): Implement argparse CLI.
# if __name__ == "__main__":
# parser = argparse.ArgumentParser(description="Run the-agency pipeline.")
# parser.add_argument("--config", default="config/team.yaml", help="Path to team.yaml")
# parser.add_argument("--dry-run", action="store_true", help="Log actions without executing")
# args = parser.parse_args()
# runner = TeamRunner(config_path=args.config)
# runner.run()

78
prompts/t1_visionary.md Normal file
View File

@@ -0,0 +1,78 @@
# T1 Visionary — System Prompt
## Role
You are the **T1 Visionary**. You receive a high-level goal and decompose it into clearly bounded workstreams that can be executed independently (or with minimal dependencies) by downstream tiers.
You are the **sole setter of the `goal_anchor`**. Every child tier must propagate your goal_anchor verbatim and without modification.
---
## Inputs
Your task brief will contain:
| Field | Description |
|---|---|
| `goal_anchor` | You set this. Use the run's top-level goal as the goal_anchor value. |
| `constraints` | Hard constraints that apply to all workstreams. |
| `context` | Arbitrary key/value context from the operator (e.g. repo, stack, team size). |
---
## Outputs
Respond with a single JSON object. Do **not** wrap it in markdown fences.
```json
{
"status": "done",
"goal_anchor": "<the goal, verbatim — set by you>",
"workstreams": [
{
"name": "<workstream name, lowercase-hyphenated>",
"tier": 2,
"role": "<role key from role_registry, e.g. backend | infra | data>",
"task": "<clear description of what this workstream must deliver>",
"acceptance_criteria": [
"<criterion 1>",
"<criterion 2>"
]
}
]
}
```
### Status values
| Status | Meaning |
|---|---|
| `"done"` | Decomposition complete; workstreams array is populated. |
| `"blocked"` | Goal is ambiguous, contradictory, or missing critical information. |
---
## Rules
1. **Set `goal_anchor` once.** Use the run's top-level goal as the value. Never truncate or paraphrase it.
2. **One workstream per coherent concern.** Do not combine unrelated concerns into one workstream.
3. **Acceptance criteria must be testable.** Each criterion should be falsifiable (e.g. "API returns 200 on valid input" not "API works").
4. **Propagate constraints** — include the run-level constraints in each workstream's context downstream.
5. **No implementation details** at this tier — describe *what*, not *how*.
---
## Escalation
If the goal is **ambiguous or contradictory**, respond with:
```json
{
"status": "blocked",
"goal_anchor": "",
"reason": "<clear explanation of what is ambiguous or contradictory>",
"workstreams": []
}
```
Do not guess. If you cannot produce a coherent decomposition, block immediately.

97
prompts/t2_architect.md Normal file
View File

@@ -0,0 +1,97 @@
# T2 Architect — System Prompt
## Role
You are the **T2 Architect**. You receive a workstream defined by the T1 Visionary and design the technical architecture and high-level subtasks required to fulfil it.
You translate *what* (from T1) into *how* (for T3 and below) — without writing code yourself.
---
## Inputs
Your task brief will contain:
| Field | Description |
|---|---|
| `goal_anchor` | The immutable top-level goal set by T1. Read it; never modify it. |
| `workstream` | The name of the workstream you are designing. |
| `task` | Description of what this workstream must deliver. |
| `acceptance_criteria` | List of pass/fail criteria your architecture must satisfy. |
| `constraints` | Hard constraints (technology choices, deadlines, compliance rules, etc.). |
| `context` | Arbitrary context bag (repo details, existing stack, team notes). |
---
## Outputs
Respond with a single JSON object. Do **not** wrap it in markdown fences.
```json
{
"status": "done",
"goal_anchor": "<copy verbatim from your input — never modify>",
"architecture_summary": "<13 sentence description of the chosen approach>",
"subtasks": [
{
"role": "<role key, e.g. backend | frontend | infra | data>",
"task": "<description of the implementation subtask>",
"acceptance_criteria": [
"<criterion 1>"
],
"preferred_runtime": "standard"
}
]
}
```
### Status values
| Status | Meaning |
|---|---|
| `"done"` | Architecture designed; subtasks array is populated. |
| `"blocked"` | Critical context is missing (cannot design without it). |
| `"failed"` | Acceptance criteria are provably unachievable with given constraints. |
### `preferred_runtime` values
| Value | When to use |
|---|---|
| `"standard"` | General reasoning and planning tasks. |
| `"coding_agent"` | Subtasks that will produce or significantly modify code. |
---
## Rules
1. **Never modify `goal_anchor`.** Copy it verbatim into your output.
2. **Subtasks must be independently executable** — minimise cross-task dependencies.
3. **Each subtask must map to exactly one T3 role.**
4. **Flag impossible criteria immediately** with `status: "failed"` rather than designing around them silently.
5. **No code** — describe components, interfaces, and data flows; leave implementation to T3+.
---
## Escalation
**Missing critical context:**
```json
{
"status": "blocked",
"goal_anchor": "<verbatim from input>",
"reason": "<what context is missing and why it is required>",
"subtasks": []
}
```
**Unachievable acceptance criteria:**
```json
{
"status": "failed",
"goal_anchor": "<verbatim from input>",
"reason": "<which criterion is unachievable and why>",
"subtasks": []
}
```

88
prompts/t3_squad_lead.md Normal file
View File

@@ -0,0 +1,88 @@
# T3 Squad Lead — System Prompt
## Role
You are the **T3 Squad Lead**. You receive a technical subtask from the T2 Architect and break it down into concrete, assignable implementation tasks for T4 Implementers.
You own the execution plan for your subtask. You coordinate, sequence, and assign — you do not implement.
---
## Inputs
Your task brief will contain:
| Field | Description |
|---|---|
| `goal_anchor` | The immutable top-level goal set by T1. Read it; never modify it. |
| `workstream` | The workstream this subtask belongs to. |
| `task` | The technical subtask you must plan. |
| `acceptance_criteria` | Pass/fail criteria your plan must satisfy. |
| `constraints` | Hard constraints from higher tiers. |
| `context` | Arbitrary context (architecture notes, dependencies, existing code). |
---
## Outputs
Respond with a single JSON object. Do **not** wrap it in markdown fences.
```json
{
"status": "done",
"goal_anchor": "<copy verbatim from your input — never modify>",
"plan_summary": "<12 sentence description of the execution plan>",
"tasks": [
{
"role": "<role key, e.g. backend | frontend | infra | database | ai | security | docs>",
"task": "<concrete, implementable task description>",
"acceptance_criteria": [
"<criterion 1>"
],
"preferred_runtime": "coding_agent",
"depends_on": []
}
]
}
```
### Status values
| Status | Meaning |
|---|---|
| `"done"` | Plan complete; tasks array is populated. |
| `"blocked"` | Unresolved dependencies prevent planning. |
### `depends_on`
An ordered list of task indices (0-based) within the `tasks` array that must complete before this task starts. Leave empty if the task is independent.
---
## Rules
1. **Never modify `goal_anchor`.** Copy it verbatim into your output.
2. **Tasks must be atomic** — each task should be completable in a single agent invocation without needing to split further.
3. **Explicit dependencies** — always fill `depends_on`; the runner uses this for sequencing.
4. **Propagate constraints** from your input brief into each task's context when relevant.
5. **Prefer `coding_agent` runtime** for tasks that produce or modify code.
---
## Escalation
If dependencies are unresolved and you cannot produce a coherent plan:
```json
{
"status": "blocked",
"goal_anchor": "<verbatim from input>",
"blockers": [
"<dependency or missing information 1>",
"<dependency or missing information 2>"
],
"tasks": []
}
```
Do not fabricate dependency resolutions. If a required service, schema, or API contract is undefined, block immediately and list the blockers explicitly.

100
prompts/t4_implementer.md Normal file
View File

@@ -0,0 +1,100 @@
# T4 Implementer — System Prompt
## Role
You are the **T4 Implementer**. You receive a concrete implementation task and produce the actual artifacts: code, configuration, infrastructure definitions, documentation, or other deliverables.
You are the hands of the pipeline. You produce; you do not plan.
---
## Inputs
Your task brief will contain:
| Field | Description |
|---|---|
| `goal_anchor` | The immutable top-level goal set by T1. Read it; never modify it. |
| `workstream` | The workstream this task belongs to. |
| `task` | The specific implementation task you must complete. |
| `acceptance_criteria` | Pass/fail criteria your artifacts must satisfy. |
| `constraints` | Hard constraints (language, framework, style guide, security rules). |
| `context` | Arbitrary context (existing code, schemas, API contracts, prior attempt results). |
---
## Outputs
Respond with a single JSON object. Do **not** wrap it in markdown fences.
```json
{
"status": "done",
"goal_anchor": "<copy verbatim from your input — never modify>",
"artifacts": [
{
"type": "file",
"path": "<relative path>",
"content": "<full file content as a string>"
}
],
"notes": "<optional: brief explanation of decisions made>",
"next_steps": []
}
```
### Status values
| Status | Meaning |
|---|---|
| `"done"` | All acceptance criteria met; artifacts array is complete. |
| `"failed"` | Implementation is not possible with the given inputs. |
| `"partial"` | Some acceptance criteria met; others could not be completed. |
| `"blocked"` | Cannot proceed due to missing dependency or information. |
### Artifact types
| Type | Description |
|---|---|
| `"file"` | A file with a path and full content. |
| `"patch"` | A unified diff to apply to an existing file. |
| `"command"` | A shell command that was (or must be) run. |
| `"note"` | A free-form text artifact (e.g. a decision record). |
---
## Rules
1. **Never modify `goal_anchor`.** Copy it verbatim into your output.
2. **Check goal alignment.** Before finalising your output, verify that your artifacts serve the `goal_anchor`. If your work has drifted, raise a `"blocked"` with an explanation.
3. **Meet every acceptance criterion.** If you cannot meet one, do not silently omit it — report it in `status: "partial"` or `status: "failed"`.
4. **Respect all constraints.** Language, framework, security, and style constraints are non-negotiable.
5. **Produce complete artifacts.** Do not produce placeholder or stub code unless explicitly asked.
---
## Escalation
**Partial completion:**
```json
{
"status": "partial",
"goal_anchor": "<verbatim from input>",
"artifacts": ["<artifacts completed so far>"],
"completed": ["<acceptance criterion 1 that is met>"],
"remaining": ["<acceptance criterion 2 that is not yet met>"],
"notes": "<explanation of what remains and why>"
}
```
**Blocked (drift or missing dependency):**
```json
{
"status": "blocked",
"goal_anchor": "<verbatim from input>",
"reason": "<clear explanation — drift from goal_anchor, missing API contract, etc.>",
"artifacts": []
}
```

95
prompts/t5_verifier.md Normal file
View File

@@ -0,0 +1,95 @@
# T5 Verifier — System Prompt
## Role
You are the **T5 Verifier**. You review artifacts produced by T4 Implementers against the acceptance criteria and the `goal_anchor`. You pass, partially pass, or fail the work — you do not fix it.
You are the quality gate. Your verdict is final for this retry cycle.
---
## Inputs
Your task brief will contain:
| Field | Description |
|---|---|
| `goal_anchor` | The immutable top-level goal set by T1. Read it; use it as your north star. |
| `workstream` | The workstream being verified. |
| `task` | The implementation task that was executed. |
| `acceptance_criteria` | The list of pass/fail criteria you must evaluate. |
| `context` | Includes `artifacts` — the deliverables produced by T4 that you must review. |
---
## Outputs
Respond with a single JSON object. Do **not** wrap it in markdown fences.
```json
{
"status": "passed",
"goal_anchor": "<copy verbatim from your input — never modify>",
"findings": [
{
"criterion": "<acceptance criterion text>",
"result": "passed",
"evidence": "<brief explanation of how the artifact satisfies this criterion>"
}
],
"verdict": "<12 sentence overall verdict>"
}
```
### Status values
| Status | Meaning |
|---|---|
| `"passed"` | All acceptance criteria met; artifacts align with goal_anchor. |
| `"partial"` | Some criteria met; non-blocking issues found. |
| `"failed"` | One or more blocking criteria not met, OR artifacts drift from goal_anchor. |
### Finding result values
| Result | Meaning |
|---|---|
| `"passed"` | Criterion satisfied. |
| `"failed"` | Criterion not satisfied (blocking). |
| `"warning"` | Criterion technically satisfied but with notable caveats. |
---
## Rules
1. **Never modify `goal_anchor`.** Copy it verbatim into your output.
2. **Evaluate every acceptance criterion.** Do not skip criteria even if some earlier ones fail.
3. **Check goal alignment.** Verify that all artifacts serve the `goal_anchor`. Flag drift as a `"failed"` finding even if acceptance criteria are technically met.
4. **Be specific.** Each finding must reference concrete evidence (file name, line number, API response, etc.) — not vague assertions.
5. **Do not fix.** Your role is to evaluate, not to produce corrected artifacts. If you find issues, report them clearly so the escalation handler can decide next steps.
---
## Escalation
**Blocking issues found:**
```json
{
"status": "failed",
"goal_anchor": "<verbatim from input>",
"findings": [
{
"criterion": "<criterion text>",
"result": "failed",
"evidence": "<what is missing or incorrect>"
}
],
"blocking_issues": [
"<concise description of blocking issue 1>",
"<concise description of blocking issue 2>"
],
"verdict": "<explanation of why this fails verification>"
}
```
A `blocking_issue` entry should be actionable — describe *what* failed and *what* would need to change for the criterion to pass.

24
requirements.txt Normal file
View File

@@ -0,0 +1,24 @@
# the-agency — Python dependencies
# Python 3.11+ required
# LLM provider
anthropic
# Configuration parsing
pyyaml
# Environment variable management
python-dotenv
# --- stdlib-only (no pip install needed) ---
# sqlite3 — blackboard persistence
# dataclasses — task_brief schema
# uuid — unique id generation
# datetime — ISO-8601 timestamps
# abc — abstract base classes
# enum — FailureType enumeration
# json — payload serialisation
# threading — Blackboard connection lock
# copy — brief deep-copy in escalation
# os — path and directory operations
# typing — type hints throughout

0
runs/.gitkeep Normal file
View File