""" core/blackboard.py SQLite-backed shared state store for a single orchestration run. One database is created at: runs//blackboard.db All methods are synchronous and thread-safe at the connection level (SQLite WAL mode + check_same_thread=False with an explicit lock). """ from __future__ import annotations import json import os import sqlite3 import threading import uuid from datetime import datetime, timezone from typing import Any, Optional # --------------------------------------------------------------------------- # Import TaskBrief only for type hints to avoid circular imports at runtime. # --------------------------------------------------------------------------- from typing import TYPE_CHECKING if TYPE_CHECKING: from core.task_brief import TaskBrief # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _new_uuid() -> str: return str(uuid.uuid4()) # --------------------------------------------------------------------------- # SQL schema # --------------------------------------------------------------------------- _SCHEMA = """ PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS runs ( run_id TEXT PRIMARY KEY, goal TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', -- pending|active|review|done|failed created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS workstreams ( workstream_id TEXT PRIMARY KEY, run_id TEXT NOT NULL, name TEXT NOT NULL, tier INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', -- pending|active|blocked|done|failed owner_agent_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (run_id) REFERENCES runs(run_id) ); CREATE TABLE IF NOT EXISTS briefs ( brief_id TEXT PRIMARY KEY, run_id TEXT NOT NULL, parent_brief_id TEXT, workstream_id TEXT, tier INTEGER NOT NULL, role TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', -- pending|active|done|failed payload TEXT, -- JSON-serialised TaskBrief.to_dict() result TEXT, -- JSON result from the agent retry_count INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (run_id) REFERENCES runs(run_id) ); CREATE TABLE IF NOT EXISTS events ( event_id TEXT PRIMARY KEY, run_id TEXT NOT NULL, brief_id TEXT, -- NULL for run-level events kind TEXT NOT NULL, -- spawned|completed|failed|escalated|retried detail TEXT, -- JSON created_at TEXT NOT NULL, FOREIGN KEY (run_id) REFERENCES runs(run_id) ); """ # Valid status values per table — used for input validation. _RUN_STATUSES = {"pending", "active", "review", "done", "failed"} _WS_STATUSES = {"pending", "active", "blocked", "done", "failed"} _BRIEF_STATUSES = {"pending", "active", "done", "failed"} _EVENT_KINDS = {"spawned", "completed", "failed", "escalated", "retried"} # --------------------------------------------------------------------------- # Blackboard # --------------------------------------------------------------------------- class Blackboard: """ Shared, persistent state store for one orchestration run. Usage ----- bb = Blackboard(run_id="abc123") bb.create_run(goal="Build webhook ingestion system") ... summary = bb.get_run_summary() """ def __init__(self, run_id: str) -> None: self.run_id = run_id self._run_dir = os.path.join("runs", run_id) self._db_path = os.path.join(self._run_dir, "blackboard.db") self._lock = threading.Lock() # Ensure the run directory exists. os.makedirs(self._run_dir, exist_ok=True) # Open a persistent connection (thread-safe via explicit lock). self._conn = sqlite3.connect(self._db_path, check_same_thread=False) self._conn.row_factory = sqlite3.Row # Initialise schema. with self._lock: self._conn.executescript(_SCHEMA) self._conn.commit() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _execute( self, sql: str, params: tuple[Any, ...] = (), *, commit: bool = False, ) -> sqlite3.Cursor: with self._lock: cur = self._conn.execute(sql, params) if commit: self._conn.commit() return cur def _executemany( self, sql: str, params_seq: list[tuple[Any, ...]], *, commit: bool = False, ) -> None: with self._lock: self._conn.executemany(sql, params_seq) if commit: self._conn.commit() # ------------------------------------------------------------------ # Run # ------------------------------------------------------------------ def create_run(self, goal: str) -> None: """Insert a new run row. Status defaults to 'pending'.""" now = _now_iso() self._execute( "INSERT OR IGNORE INTO runs (run_id, goal, status, created_at, updated_at) " "VALUES (?, ?, 'pending', ?, ?)", (self.run_id, goal, now, now), commit=True, ) def update_run_status(self, status: str) -> None: """Update run status. Must be one of: pending|active|review|done|failed.""" if status not in _RUN_STATUSES: raise ValueError(f"Invalid run status {status!r}. Must be one of {_RUN_STATUSES}.") now = _now_iso() self._execute( "UPDATE runs SET status=?, updated_at=? WHERE run_id=?", (status, now, self.run_id), commit=True, ) # ------------------------------------------------------------------ # Workstreams # ------------------------------------------------------------------ def create_workstream( self, *, workstream_id: Optional[str] = None, name: str, tier: int, owner_agent_id: Optional[str] = None, ) -> str: """Create a workstream row and return its workstream_id.""" ws_id = workstream_id or _new_uuid() now = _now_iso() self._execute( "INSERT OR IGNORE INTO workstreams " "(workstream_id, run_id, name, tier, status, owner_agent_id, created_at, updated_at) " "VALUES (?, ?, ?, ?, 'pending', ?, ?, ?)", (ws_id, self.run_id, name, tier, owner_agent_id, now, now), commit=True, ) return ws_id def update_workstream_status(self, workstream_id: str, status: str) -> None: """Update workstream status. Must be one of: pending|active|blocked|done|failed.""" if status not in _WS_STATUSES: raise ValueError(f"Invalid workstream status {status!r}. Must be one of {_WS_STATUSES}.") now = _now_iso() self._execute( "UPDATE workstreams SET status=?, updated_at=? WHERE workstream_id=?", (status, now, workstream_id), commit=True, ) # ------------------------------------------------------------------ # Briefs # ------------------------------------------------------------------ def create_brief(self, brief: "TaskBrief", workstream_id: Optional[str] = None) -> None: """Persist a TaskBrief. The full brief is stored as JSON in `payload`.""" now = _now_iso() payload_json = json.dumps(brief.to_dict()) self._execute( "INSERT OR IGNORE INTO briefs " "(brief_id, run_id, parent_brief_id, workstream_id, tier, role, " " status, payload, result, retry_count, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, NULL, ?, ?, ?)", ( brief.brief_id, self.run_id, brief.parent_brief_id, workstream_id, brief.tier, brief.role, payload_json, brief.retry_count, now, now, ), commit=True, ) def update_brief_status(self, brief_id: str, status: str) -> None: """Update brief status. Must be one of: pending|active|done|failed.""" if status not in _BRIEF_STATUSES: raise ValueError(f"Invalid brief status {status!r}. Must be one of {_BRIEF_STATUSES}.") now = _now_iso() self._execute( "UPDATE briefs SET status=?, updated_at=? WHERE brief_id=?", (status, now, brief_id), commit=True, ) def update_brief_result(self, brief_id: str, result: dict[str, Any]) -> None: """Store the agent result JSON for a brief and mark it done.""" now = _now_iso() result_json = json.dumps(result) self._execute( "UPDATE briefs SET result=?, status='done', updated_at=? WHERE brief_id=?", (result_json, now, brief_id), commit=True, ) def increment_brief_retry(self, brief_id: str) -> None: """Bump the retry_count column for a brief.""" now = _now_iso() self._execute( "UPDATE briefs SET retry_count = retry_count + 1, updated_at=? WHERE brief_id=?", (now, brief_id), commit=True, ) # ------------------------------------------------------------------ # Events # ------------------------------------------------------------------ def log_event( self, kind: str, brief_id: Optional[str] = None, detail: Optional[dict[str, Any]] = None, ) -> str: """ Append an event to the events table. Parameters ---------- kind : One of spawned|completed|failed|escalated|retried. brief_id : Associated brief, or None for run-level events. detail : Arbitrary JSON-serialisable dict. Returns the new event_id. """ if kind not in _EVENT_KINDS: raise ValueError(f"Invalid event kind {kind!r}. Must be one of {_EVENT_KINDS}.") event_id = _new_uuid() now = _now_iso() detail_json = json.dumps(detail or {}) self._execute( "INSERT INTO events (event_id, run_id, brief_id, kind, detail, created_at) " "VALUES (?, ?, ?, ?, ?, ?)", (event_id, self.run_id, brief_id, kind, detail_json, now), commit=True, ) return event_id # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ def get_run_summary(self) -> dict[str, Any]: """ Return a snapshot of the run state including workstream and brief counts broken down by status. """ # Run row run_row = self._execute( "SELECT * FROM runs WHERE run_id=?", (self.run_id,) ).fetchone() if run_row is None: return {"error": f"No run found for run_id={self.run_id!r}"} run_data: dict[str, Any] = dict(run_row) # Workstream status counts ws_rows = self._execute( "SELECT status, COUNT(*) AS cnt FROM workstreams WHERE run_id=? GROUP BY status", (self.run_id,), ).fetchall() run_data["workstreams"] = {r["status"]: r["cnt"] for r in ws_rows} # Brief status counts brief_rows = self._execute( "SELECT status, COUNT(*) AS cnt FROM briefs WHERE run_id=? GROUP BY status", (self.run_id,), ).fetchall() run_data["briefs"] = {r["status"]: r["cnt"] for r in brief_rows} # Event kind counts event_rows = self._execute( "SELECT kind, COUNT(*) AS cnt FROM events WHERE run_id=? GROUP BY kind", (self.run_id,), ).fetchall() run_data["events"] = {r["kind"]: r["cnt"] for r in event_rows} return run_data # ------------------------------------------------------------------ # Cleanup # ------------------------------------------------------------------ def close(self) -> None: """Close the database connection gracefully.""" with self._lock: self._conn.close() def __repr__(self) -> str: return f"Blackboard(run_id={self.run_id!r}, db={self._db_path!r})"