- core/blackboard.py: add t3_task_lists table, extend event kinds to
include full visibility vocabulary (gate_pending, gate_approved,
gate_rejected, gate_paused, gate_resumed, path_amendment, log), and
add query methods (get_events, get_latest_gate_event, get_t3_task_lists,
all_t3_committed, get_briefs, get_workstreams, etc.)
- core/team_runner.py: full run lifecycle orchestrator
- Loads team.yaml + role_registry.yaml, instantiates all four adapter types
- T1 Plan (two-phase: plan + accept), T2 Lead/Specialist/Synthesis,
T3 Squad Lead with mesh draft/commit cycle and mesh-timeout escalation,
T4 swarm+pipeline with dep-ordering, T5 fan-out + joint verdict
- Async (asyncio.gather) for parallel workstream and T4/T5 fan-out
- Gate logic: gate_pending → notify → poll blackboard → gate_approved/rejected
- Path amendment monitor (background task)
- EscalationHandler integrated into _dispatch_with_retry
- T3 mesh timeout → T2 re-scope escalation
- Terminal failure → notify + run status=failed
- VCS branch creation + PR at T1 Accept phase
- Runtime selection: coding_agent runtime for preferred_runtime="coding_agent",
tier_runtime_map overrides, fallback to default runtime
- cli/agency.py: agency CLI with subcommands
- run: start pipeline, prints run_id + watch/inspect hints
- watch: tails blackboard events live with ANSI-coloured output
- inspect: run tree, --tier filter, --brief detail view
- approve: write gate_approved directly to blackboard (universal gate path)
- reject: write gate_rejected with --reason
- pause: write gate_paused signal
- resume: write gate_resumed signal
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
577 lines
20 KiB
Python
577 lines
20 KiB
Python
"""
|
|
core/blackboard.py
|
|
SQLite-backed shared state store for a single orchestration run.
|
|
|
|
One database is created at: runs/<run_id>/blackboard.db
|
|
|
|
All methods are synchronous and thread-safe at the connection level
|
|
(SQLite WAL mode + check_same_thread=False with an explicit lock).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import TaskBrief only for type hints to avoid circular imports at runtime.
|
|
# ---------------------------------------------------------------------------
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from core.task_brief import TaskBrief
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _new_uuid() -> str:
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SCHEMA = """
|
|
PRAGMA journal_mode=WAL;
|
|
|
|
CREATE TABLE IF NOT EXISTS runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
goal TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|review|done|failed
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS workstreams (
|
|
workstream_id TEXT PRIMARY KEY,
|
|
run_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
tier INTEGER NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|blocked|done|failed
|
|
owner_agent_id TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY (run_id) REFERENCES runs(run_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS briefs (
|
|
brief_id TEXT PRIMARY KEY,
|
|
run_id TEXT NOT NULL,
|
|
parent_brief_id TEXT,
|
|
workstream_id TEXT,
|
|
tier INTEGER NOT NULL,
|
|
role TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending', -- pending|active|done|failed
|
|
payload TEXT, -- JSON-serialised TaskBrief.to_dict()
|
|
result TEXT, -- JSON result from the agent
|
|
retry_count INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY (run_id) REFERENCES runs(run_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
event_id TEXT PRIMARY KEY,
|
|
run_id TEXT NOT NULL,
|
|
brief_id TEXT, -- NULL for run-level events
|
|
kind TEXT NOT NULL, -- see _EVENT_KINDS
|
|
detail TEXT, -- JSON
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (run_id) REFERENCES runs(run_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS t3_task_lists (
|
|
entry_id TEXT PRIMARY KEY,
|
|
run_id TEXT NOT NULL,
|
|
workstream_id TEXT NOT NULL,
|
|
t3_agent_id TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'draft', -- draft|committed
|
|
tasks TEXT NOT NULL DEFAULT '[]', -- JSON array of T4 task descriptors
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY (run_id) REFERENCES runs(run_id)
|
|
);
|
|
"""
|
|
|
|
# Valid status values per table — used for input validation.
|
|
_RUN_STATUSES = {"pending", "active", "review", "done", "failed"}
|
|
_WS_STATUSES = {"pending", "active", "blocked", "done", "failed"}
|
|
_BRIEF_STATUSES = {"pending", "active", "done", "failed"}
|
|
_EVENT_KINDS = {
|
|
# Lifecycle
|
|
"spawned", "completed", "failed", "escalated", "retried",
|
|
# Visibility / gates
|
|
"gate_pending", "gate_approved", "gate_rejected", "gate_paused", "gate_resumed",
|
|
# Amendments / informational
|
|
"path_amendment", "log",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Blackboard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Blackboard:
|
|
"""
|
|
Shared, persistent state store for one orchestration run.
|
|
|
|
Usage
|
|
-----
|
|
bb = Blackboard(run_id="abc123")
|
|
bb.create_run(goal="Build webhook ingestion system")
|
|
...
|
|
summary = bb.get_run_summary()
|
|
"""
|
|
|
|
def __init__(self, run_id: str) -> None:
|
|
self.run_id = run_id
|
|
self._run_dir = os.path.join("runs", run_id)
|
|
self._db_path = os.path.join(self._run_dir, "blackboard.db")
|
|
self._lock = threading.Lock()
|
|
|
|
# Ensure the run directory exists.
|
|
os.makedirs(self._run_dir, exist_ok=True)
|
|
|
|
# Open a persistent connection (thread-safe via explicit lock).
|
|
self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
|
|
self._conn.row_factory = sqlite3.Row
|
|
|
|
# Initialise schema.
|
|
with self._lock:
|
|
self._conn.executescript(_SCHEMA)
|
|
self._conn.commit()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _execute(
|
|
self,
|
|
sql: str,
|
|
params: tuple[Any, ...] = (),
|
|
*,
|
|
commit: bool = False,
|
|
) -> sqlite3.Cursor:
|
|
with self._lock:
|
|
cur = self._conn.execute(sql, params)
|
|
if commit:
|
|
self._conn.commit()
|
|
return cur
|
|
|
|
def _executemany(
|
|
self,
|
|
sql: str,
|
|
params_seq: list[tuple[Any, ...]],
|
|
*,
|
|
commit: bool = False,
|
|
) -> None:
|
|
with self._lock:
|
|
self._conn.executemany(sql, params_seq)
|
|
if commit:
|
|
self._conn.commit()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Run
|
|
# ------------------------------------------------------------------
|
|
|
|
def create_run(self, goal: str) -> None:
|
|
"""Insert a new run row. Status defaults to 'pending'."""
|
|
now = _now_iso()
|
|
self._execute(
|
|
"INSERT OR IGNORE INTO runs (run_id, goal, status, created_at, updated_at) "
|
|
"VALUES (?, ?, 'pending', ?, ?)",
|
|
(self.run_id, goal, now, now),
|
|
commit=True,
|
|
)
|
|
|
|
def update_run_status(self, status: str) -> None:
|
|
"""Update run status. Must be one of: pending|active|review|done|failed."""
|
|
if status not in _RUN_STATUSES:
|
|
raise ValueError(f"Invalid run status {status!r}. Must be one of {_RUN_STATUSES}.")
|
|
now = _now_iso()
|
|
self._execute(
|
|
"UPDATE runs SET status=?, updated_at=? WHERE run_id=?",
|
|
(status, now, self.run_id),
|
|
commit=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Workstreams
|
|
# ------------------------------------------------------------------
|
|
|
|
def create_workstream(
|
|
self,
|
|
*,
|
|
workstream_id: Optional[str] = None,
|
|
name: str,
|
|
tier: int,
|
|
owner_agent_id: Optional[str] = None,
|
|
) -> str:
|
|
"""Create a workstream row and return its workstream_id."""
|
|
ws_id = workstream_id or _new_uuid()
|
|
now = _now_iso()
|
|
self._execute(
|
|
"INSERT OR IGNORE INTO workstreams "
|
|
"(workstream_id, run_id, name, tier, status, owner_agent_id, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, 'pending', ?, ?, ?)",
|
|
(ws_id, self.run_id, name, tier, owner_agent_id, now, now),
|
|
commit=True,
|
|
)
|
|
return ws_id
|
|
|
|
def update_workstream_status(self, workstream_id: str, status: str) -> None:
|
|
"""Update workstream status. Must be one of: pending|active|blocked|done|failed."""
|
|
if status not in _WS_STATUSES:
|
|
raise ValueError(f"Invalid workstream status {status!r}. Must be one of {_WS_STATUSES}.")
|
|
now = _now_iso()
|
|
self._execute(
|
|
"UPDATE workstreams SET status=?, updated_at=? WHERE workstream_id=?",
|
|
(status, now, workstream_id),
|
|
commit=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Briefs
|
|
# ------------------------------------------------------------------
|
|
|
|
def create_brief(self, brief: "TaskBrief", workstream_id: Optional[str] = None) -> None:
|
|
"""Persist a TaskBrief. The full brief is stored as JSON in `payload`."""
|
|
now = _now_iso()
|
|
payload_json = json.dumps(brief.to_dict())
|
|
self._execute(
|
|
"INSERT OR IGNORE INTO briefs "
|
|
"(brief_id, run_id, parent_brief_id, workstream_id, tier, role, "
|
|
" status, payload, result, retry_count, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, NULL, ?, ?, ?)",
|
|
(
|
|
brief.brief_id,
|
|
self.run_id,
|
|
brief.parent_brief_id,
|
|
workstream_id,
|
|
brief.tier,
|
|
brief.role,
|
|
payload_json,
|
|
brief.retry_count,
|
|
now,
|
|
now,
|
|
),
|
|
commit=True,
|
|
)
|
|
|
|
def update_brief_status(self, brief_id: str, status: str) -> None:
|
|
"""Update brief status. Must be one of: pending|active|done|failed."""
|
|
if status not in _BRIEF_STATUSES:
|
|
raise ValueError(f"Invalid brief status {status!r}. Must be one of {_BRIEF_STATUSES}.")
|
|
now = _now_iso()
|
|
self._execute(
|
|
"UPDATE briefs SET status=?, updated_at=? WHERE brief_id=?",
|
|
(status, now, brief_id),
|
|
commit=True,
|
|
)
|
|
|
|
def update_brief_result(self, brief_id: str, result: dict[str, Any]) -> None:
|
|
"""Store the agent result JSON for a brief and mark it done."""
|
|
now = _now_iso()
|
|
result_json = json.dumps(result)
|
|
self._execute(
|
|
"UPDATE briefs SET result=?, status='done', updated_at=? WHERE brief_id=?",
|
|
(result_json, now, brief_id),
|
|
commit=True,
|
|
)
|
|
|
|
def increment_brief_retry(self, brief_id: str) -> None:
|
|
"""Bump the retry_count column for a brief."""
|
|
now = _now_iso()
|
|
self._execute(
|
|
"UPDATE briefs SET retry_count = retry_count + 1, updated_at=? WHERE brief_id=?",
|
|
(now, brief_id),
|
|
commit=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Events
|
|
# ------------------------------------------------------------------
|
|
|
|
def log_event(
|
|
self,
|
|
kind: str,
|
|
brief_id: Optional[str] = None,
|
|
detail: Optional[dict[str, Any]] = None,
|
|
) -> str:
|
|
"""
|
|
Append an event to the events table.
|
|
|
|
Parameters
|
|
----------
|
|
kind : One of spawned|completed|failed|escalated|retried.
|
|
brief_id : Associated brief, or None for run-level events.
|
|
detail : Arbitrary JSON-serialisable dict.
|
|
|
|
Returns the new event_id.
|
|
"""
|
|
if kind not in _EVENT_KINDS:
|
|
raise ValueError(f"Invalid event kind {kind!r}. Must be one of {_EVENT_KINDS}.")
|
|
event_id = _new_uuid()
|
|
now = _now_iso()
|
|
detail_json = json.dumps(detail or {})
|
|
self._execute(
|
|
"INSERT INTO events (event_id, run_id, brief_id, kind, detail, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(event_id, self.run_id, brief_id, kind, detail_json, now),
|
|
commit=True,
|
|
)
|
|
return event_id
|
|
|
|
# ------------------------------------------------------------------
|
|
# Summary
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_run_summary(self) -> dict[str, Any]:
|
|
"""
|
|
Return a snapshot of the run state including workstream and brief
|
|
counts broken down by status.
|
|
"""
|
|
# Run row
|
|
run_row = self._execute(
|
|
"SELECT * FROM runs WHERE run_id=?", (self.run_id,)
|
|
).fetchone()
|
|
|
|
if run_row is None:
|
|
return {"error": f"No run found for run_id={self.run_id!r}"}
|
|
|
|
run_data: dict[str, Any] = dict(run_row)
|
|
|
|
# Workstream status counts
|
|
ws_rows = self._execute(
|
|
"SELECT status, COUNT(*) AS cnt FROM workstreams WHERE run_id=? GROUP BY status",
|
|
(self.run_id,),
|
|
).fetchall()
|
|
run_data["workstreams"] = {r["status"]: r["cnt"] for r in ws_rows}
|
|
|
|
# Brief status counts
|
|
brief_rows = self._execute(
|
|
"SELECT status, COUNT(*) AS cnt FROM briefs WHERE run_id=? GROUP BY status",
|
|
(self.run_id,),
|
|
).fetchall()
|
|
run_data["briefs"] = {r["status"]: r["cnt"] for r in brief_rows}
|
|
|
|
# Event kind counts
|
|
event_rows = self._execute(
|
|
"SELECT kind, COUNT(*) AS cnt FROM events WHERE run_id=? GROUP BY kind",
|
|
(self.run_id,),
|
|
).fetchall()
|
|
run_data["events"] = {r["kind"]: r["cnt"] for r in event_rows}
|
|
|
|
return run_data
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cleanup
|
|
# ------------------------------------------------------------------
|
|
|
|
# ------------------------------------------------------------------
|
|
# Event queries
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_events(
|
|
self,
|
|
kinds: Optional[list[str]] = None,
|
|
after_iso: Optional[str] = None,
|
|
brief_id: Optional[str] = None,
|
|
limit: int = 100,
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Query events for this run.
|
|
|
|
Parameters
|
|
----------
|
|
kinds : Filter by event kinds (OR). None = all kinds.
|
|
after_iso : Only return events created after this ISO-8601 timestamp.
|
|
brief_id : Filter by brief_id. None = all briefs.
|
|
limit : Maximum rows to return (most recent first).
|
|
"""
|
|
conditions = ["run_id = ?"]
|
|
params: list[Any] = [self.run_id]
|
|
|
|
if kinds:
|
|
placeholders = ",".join("?" * len(kinds))
|
|
conditions.append(f"kind IN ({placeholders})")
|
|
params.extend(kinds)
|
|
|
|
if after_iso:
|
|
conditions.append("created_at > ?")
|
|
params.append(after_iso)
|
|
|
|
if brief_id:
|
|
conditions.append("brief_id = ?")
|
|
params.append(brief_id)
|
|
|
|
where = " AND ".join(conditions)
|
|
rows = self._execute(
|
|
f"SELECT * FROM events WHERE {where} ORDER BY created_at DESC LIMIT ?",
|
|
(*params, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_latest_gate_event(
|
|
self, gate_name: str, after_iso: Optional[str] = None
|
|
) -> Optional[dict[str, Any]]:
|
|
"""
|
|
Return the most recent gate_approved or gate_rejected event for
|
|
*gate_name* written after *after_iso*.
|
|
|
|
The event detail JSON is expected to contain a ``"gate"`` field
|
|
matching *gate_name*. Falls back to returning any gate resolution
|
|
event if none carry an explicit gate field (for CLI-written events
|
|
that omit it).
|
|
"""
|
|
events = self.get_events(
|
|
kinds=["gate_approved", "gate_rejected"],
|
|
after_iso=after_iso,
|
|
limit=20,
|
|
)
|
|
# Prefer events whose detail.gate matches
|
|
for ev in events:
|
|
try:
|
|
detail = json.loads(ev.get("detail") or "{}")
|
|
if detail.get("gate") == gate_name or not detail.get("gate"):
|
|
return ev
|
|
except (json.JSONDecodeError, TypeError):
|
|
return ev
|
|
return None
|
|
|
|
def get_all_events(self, limit: int = 500) -> list[dict[str, Any]]:
|
|
"""Return all events for this run, oldest first."""
|
|
rows = self._execute(
|
|
"SELECT * FROM events WHERE run_id=? ORDER BY created_at ASC LIMIT ?",
|
|
(self.run_id, limit),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ------------------------------------------------------------------
|
|
# T3 task lists
|
|
# ------------------------------------------------------------------
|
|
|
|
def create_t3_draft(
|
|
self,
|
|
*,
|
|
workstream_id: str,
|
|
t3_agent_id: str,
|
|
) -> str:
|
|
"""Insert a draft t3_task_list entry. Returns entry_id."""
|
|
entry_id = _new_uuid()
|
|
now = _now_iso()
|
|
self._execute(
|
|
"INSERT OR IGNORE INTO t3_task_lists "
|
|
"(entry_id, run_id, workstream_id, t3_agent_id, status, tasks, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, 'draft', '[]', ?, ?)",
|
|
(entry_id, self.run_id, workstream_id, t3_agent_id, now, now),
|
|
commit=True,
|
|
)
|
|
return entry_id
|
|
|
|
def commit_t3_task_list(
|
|
self,
|
|
*,
|
|
workstream_id: str,
|
|
t3_agent_id: str,
|
|
tasks: list[Any],
|
|
) -> None:
|
|
"""Update a t3_task_list entry to committed with the final task list."""
|
|
now = _now_iso()
|
|
tasks_json = json.dumps(tasks)
|
|
self._execute(
|
|
"UPDATE t3_task_lists SET status='committed', tasks=?, updated_at=? "
|
|
"WHERE run_id=? AND workstream_id=? AND t3_agent_id=?",
|
|
(tasks_json, now, self.run_id, workstream_id, t3_agent_id),
|
|
commit=True,
|
|
)
|
|
|
|
def get_t3_task_lists(self, workstream_id: str) -> list[dict[str, Any]]:
|
|
"""Return all t3_task_list entries for a workstream."""
|
|
rows = self._execute(
|
|
"SELECT * FROM t3_task_lists WHERE run_id=? AND workstream_id=? ORDER BY created_at ASC",
|
|
(self.run_id, workstream_id),
|
|
).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
try:
|
|
d["tasks"] = json.loads(d.get("tasks") or "[]")
|
|
except (json.JSONDecodeError, TypeError):
|
|
d["tasks"] = []
|
|
result.append(d)
|
|
return result
|
|
|
|
def all_t3_committed(self, workstream_id: str) -> bool:
|
|
"""Return True if all t3_task_list entries for the workstream are committed."""
|
|
rows = self._execute(
|
|
"SELECT status FROM t3_task_lists WHERE run_id=? AND workstream_id=?",
|
|
(self.run_id, workstream_id),
|
|
).fetchall()
|
|
if not rows:
|
|
return False
|
|
return all(r["status"] == "committed" for r in rows)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Briefs query
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_briefs(
|
|
self,
|
|
*,
|
|
status: Optional[str] = None,
|
|
tier: Optional[int] = None,
|
|
workstream_id: Optional[str] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Query briefs with optional filters."""
|
|
conditions = ["run_id = ?"]
|
|
params: list[Any] = [self.run_id]
|
|
|
|
if status:
|
|
conditions.append("status = ?")
|
|
params.append(status)
|
|
if tier is not None:
|
|
conditions.append("tier = ?")
|
|
params.append(tier)
|
|
if workstream_id:
|
|
conditions.append("workstream_id = ?")
|
|
params.append(workstream_id)
|
|
|
|
where = " AND ".join(conditions)
|
|
rows = self._execute(
|
|
f"SELECT * FROM briefs WHERE {where} ORDER BY created_at ASC",
|
|
tuple(params),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_workstreams(self) -> list[dict[str, Any]]:
|
|
"""Return all workstreams for this run."""
|
|
rows = self._execute(
|
|
"SELECT * FROM workstreams WHERE run_id=? ORDER BY created_at ASC",
|
|
(self.run_id,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cleanup
|
|
# ------------------------------------------------------------------
|
|
|
|
def close(self) -> None:
|
|
"""Close the database connection gracefully."""
|
|
with self._lock:
|
|
self._conn.close()
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Blackboard(run_id={self.run_id!r}, db={self._db_path!r})"
|