""" adapters/runtime/openclaw.py OpenClaw agent runtime adapter — Phase 2 implementation. Spawns sub-agents by shelling out to the ``openclaw`` CLI:: openclaw session spawn --task "" --mode run openclaw session get openclaw session kill If the ``openclaw`` binary is unavailable, all methods raise ``NotImplementedError`` with a helpful message rather than crashing with a raw ``FileNotFoundError``. """ from __future__ import annotations import json import logging import re import subprocess import time from adapters.base.runtime import RuntimeAdapter logger = logging.getLogger(__name__) # Status strings from the openclaw CLI that indicate a session has finished. _TERMINAL_STATUSES = frozenset( {"done", "completed", "failed", "partial", "blocked", "error"} ) class OpenClawRuntimeAdapter(RuntimeAdapter): """ Runtime adapter that dispatches agent tasks to OpenClaw worker sessions. All interactions use the ``openclaw`` CLI. No additional credentials are required beyond what OpenClaw manages in the local environment. """ def __init__(self, config: dict) -> None: """ Initialise the OpenClaw runtime adapter. Parameters ---------- config : Loaded team.yaml config dict (reserved for future options). """ self._config = config # ------------------------------------------------------------------ # RuntimeAdapter interface # ------------------------------------------------------------------ def spawn(self, task: str, capability: str, context: dict) -> str: """ Spawn an OpenClaw agent session for the given task. Parameters ---------- task : Natural-language task description. capability : Capability hint ("reasoning-heavy" | "capable" | "fast-cheap"). Passed informally; actual routing is handled by OpenClaw. context : Arbitrary context bag (currently unused by this adapter). Returns ------- session_id string parsed from the CLI output. Raises ------ NotImplementedError If the ``openclaw`` CLI is not available on PATH. RuntimeError If the session_id cannot be parsed from the CLI output. """ # TODO: map capability to an openclaw worker tier / model hint if the # openclaw CLI gains that flag in a future release. cmd = ["openclaw", "session", "spawn", "--task", task, "--mode", "run"] try: result = subprocess.run( cmd, capture_output=True, text=True, check=True, ) except FileNotFoundError: raise NotImplementedError( "openclaw CLI not found on PATH. " "Install OpenClaw or configure a different runtime adapter " "(e.g. adapters.runtime.claude_code.ClaudeCodeRuntimeAdapter)." ) except subprocess.CalledProcessError as exc: raise RuntimeError( f"openclaw session spawn failed (exit {exc.returncode}): " f"{exc.stderr.strip()}" ) from exc return self._parse_session_id(result.stdout) def get_result(self, agent_id: str, timeout_s: int) -> dict: """ Poll ``openclaw session get`` until the session reaches a terminal state or *timeout_s* seconds elapse. Parameters ---------- agent_id : Session ID returned by spawn(). timeout_s : Maximum seconds to wait before raising TimeoutError. Returns ------- dict with keys: ``status``, ``output``, ``artifacts``. Raises ------ TimeoutError If the session does not finish within timeout_s seconds. NotImplementedError If the ``openclaw`` CLI is not available on PATH. """ deadline = time.monotonic() + timeout_s poll_interval = 2.0 while time.monotonic() < deadline: try: result = subprocess.run( ["openclaw", "session", "get", agent_id], capture_output=True, text=True, timeout=15, ) except FileNotFoundError: raise NotImplementedError( "openclaw CLI not found on PATH. " "Install OpenClaw or switch to a different runtime adapter." ) except subprocess.TimeoutExpired: logger.debug("openclaw session get timed out; will retry") time.sleep(poll_interval) continue if result.returncode == 0 and result.stdout.strip(): parsed = self._parse_get_output(result.stdout) if parsed.get("status", "").lower() in _TERMINAL_STATUSES: return parsed else: logger.debug( "openclaw session get returned exit=%d; retrying. stderr=%s", result.returncode, result.stderr.strip(), ) time.sleep(poll_interval) raise TimeoutError( f"Agent {agent_id!r} did not complete within {timeout_s}s." ) def kill(self, agent_id: str) -> None: """ Terminate an OpenClaw session unconditionally. Silently succeeds if the session has already finished. Parameters ---------- agent_id : Session ID returned by spawn(). Raises ------ NotImplementedError If the ``openclaw`` CLI is not available on PATH. """ try: subprocess.run( ["openclaw", "session", "kill", agent_id], capture_output=True, text=True, timeout=15, ) except FileNotFoundError: raise NotImplementedError( "openclaw CLI not found on PATH. " "Install OpenClaw or switch to a different runtime adapter." ) except subprocess.TimeoutExpired: logger.warning("openclaw session kill timed out for agent %s", agent_id) # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _parse_session_id(self, output: str) -> str: """Extract a session_id from the raw stdout of ``openclaw session spawn``.""" output = output.strip() # Prefer structured JSON output. try: data = json.loads(output) for key in ("session_id", "sessionId", "id"): if key in data: return str(data[key]) except (json.JSONDecodeError, TypeError): pass # Regex: look for "session_id: " or similar. m = re.search( r"(?:session[_\s]?id|sessionId)[:\s]+([a-zA-Z0-9_\-]+)", output, re.IGNORECASE, ) if m: return m.group(1) # Last resort: return the first non-empty line. lines = [ln.strip() for ln in output.splitlines() if ln.strip()] if lines: return lines[0] raise RuntimeError( f"Could not parse session_id from openclaw output: {output!r}" ) def _parse_get_output(self, output: str) -> dict: """Parse the stdout of ``openclaw session get`` into a result dict.""" output = output.strip() try: data = json.loads(output) return { "status": data.get("status", "done"), "output": data.get("output", output), "artifacts": data.get("artifacts", []), } except (json.JSONDecodeError, TypeError): # Non-JSON output — treat as completed with raw text output. return { "status": "done", "output": output, "artifacts": [], }