Files
the-agency/adapters/runtime/openclaw.py
hansheinemann 084cfb0bb2 feat: implement all adapter layers (#2)
Adapters implemented:
- adapters/llm/anthropic.py — Anthropic Claude SDK, capability-based model selection,
  max_tokens + temperature configurable via team.yaml, lazy SDK import
- adapters/vcs/github.py — GitHub PR/branch operations via gh CLI
- adapters/notify/openclaw.py — OpenClaw system event notifications
- adapters/runtime/openclaw.py — OpenClaw sessions_spawn for agent execution
- adapters/runtime/claude_code.py — Claude Code CLI for T4/T5 coding tasks

All adapters follow the abstract base interfaces from Phase 1.
Config-driven model selection via capability_map in team.yaml.
2026-03-16 11:45:11 -04:00

242 lines
8.0 KiB
Python

"""
adapters/runtime/openclaw.py
OpenClaw agent runtime adapter — Phase 2 implementation.
Spawns sub-agents by shelling out to the ``openclaw`` CLI::
openclaw session spawn --task "<task>" --mode run
openclaw session get <session_id>
openclaw session kill <session_id>
If the ``openclaw`` binary is unavailable, all methods raise
``NotImplementedError`` with a helpful message rather than crashing with a
raw ``FileNotFoundError``.
"""
from __future__ import annotations
import json
import logging
import re
import subprocess
import time
from adapters.base.runtime import RuntimeAdapter
logger = logging.getLogger(__name__)
# Status strings from the openclaw CLI that indicate a session has finished.
_TERMINAL_STATUSES = frozenset(
{"done", "completed", "failed", "partial", "blocked", "error"}
)
class OpenClawRuntimeAdapter(RuntimeAdapter):
"""
Runtime adapter that dispatches agent tasks to OpenClaw worker sessions.
All interactions use the ``openclaw`` CLI. No additional credentials are
required beyond what OpenClaw manages in the local environment.
"""
def __init__(self, config: dict) -> None:
"""
Initialise the OpenClaw runtime adapter.
Parameters
----------
config : Loaded team.yaml config dict (reserved for future options).
"""
self._config = config
# ------------------------------------------------------------------
# RuntimeAdapter interface
# ------------------------------------------------------------------
def spawn(self, task: str, capability: str, context: dict) -> str:
"""
Spawn an OpenClaw agent session for the given task.
Parameters
----------
task : Natural-language task description.
capability : Capability hint ("reasoning-heavy" | "capable" | "fast-cheap").
Passed informally; actual routing is handled by OpenClaw.
context : Arbitrary context bag (currently unused by this adapter).
Returns
-------
session_id string parsed from the CLI output.
Raises
------
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
RuntimeError
If the session_id cannot be parsed from the CLI output.
"""
# TODO: map capability to an openclaw worker tier / model hint if the
# openclaw CLI gains that flag in a future release.
cmd = ["openclaw", "session", "spawn", "--task", task, "--mode", "run"]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or configure a different runtime adapter "
"(e.g. adapters.runtime.claude_code.ClaudeCodeRuntimeAdapter)."
)
except subprocess.CalledProcessError as exc:
raise RuntimeError(
f"openclaw session spawn failed (exit {exc.returncode}): "
f"{exc.stderr.strip()}"
) from exc
return self._parse_session_id(result.stdout)
def get_result(self, agent_id: str, timeout_s: int) -> dict:
"""
Poll ``openclaw session get`` until the session reaches a terminal
state or *timeout_s* seconds elapse.
Parameters
----------
agent_id : Session ID returned by spawn().
timeout_s : Maximum seconds to wait before raising TimeoutError.
Returns
-------
dict with keys: ``status``, ``output``, ``artifacts``.
Raises
------
TimeoutError
If the session does not finish within timeout_s seconds.
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
"""
deadline = time.monotonic() + timeout_s
poll_interval = 2.0
while time.monotonic() < deadline:
try:
result = subprocess.run(
["openclaw", "session", "get", agent_id],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or switch to a different runtime adapter."
)
except subprocess.TimeoutExpired:
logger.debug("openclaw session get timed out; will retry")
time.sleep(poll_interval)
continue
if result.returncode == 0 and result.stdout.strip():
parsed = self._parse_get_output(result.stdout)
if parsed.get("status", "").lower() in _TERMINAL_STATUSES:
return parsed
else:
logger.debug(
"openclaw session get returned exit=%d; retrying. stderr=%s",
result.returncode,
result.stderr.strip(),
)
time.sleep(poll_interval)
raise TimeoutError(
f"Agent {agent_id!r} did not complete within {timeout_s}s."
)
def kill(self, agent_id: str) -> None:
"""
Terminate an OpenClaw session unconditionally.
Silently succeeds if the session has already finished.
Parameters
----------
agent_id : Session ID returned by spawn().
Raises
------
NotImplementedError
If the ``openclaw`` CLI is not available on PATH.
"""
try:
subprocess.run(
["openclaw", "session", "kill", agent_id],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
raise NotImplementedError(
"openclaw CLI not found on PATH. "
"Install OpenClaw or switch to a different runtime adapter."
)
except subprocess.TimeoutExpired:
logger.warning("openclaw session kill timed out for agent %s", agent_id)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _parse_session_id(self, output: str) -> str:
"""Extract a session_id from the raw stdout of ``openclaw session spawn``."""
output = output.strip()
# Prefer structured JSON output.
try:
data = json.loads(output)
for key in ("session_id", "sessionId", "id"):
if key in data:
return str(data[key])
except (json.JSONDecodeError, TypeError):
pass
# Regex: look for "session_id: <id>" or similar.
m = re.search(
r"(?:session[_\s]?id|sessionId)[:\s]+([a-zA-Z0-9_\-]+)",
output,
re.IGNORECASE,
)
if m:
return m.group(1)
# Last resort: return the first non-empty line.
lines = [ln.strip() for ln in output.splitlines() if ln.strip()]
if lines:
return lines[0]
raise RuntimeError(
f"Could not parse session_id from openclaw output: {output!r}"
)
def _parse_get_output(self, output: str) -> dict:
"""Parse the stdout of ``openclaw session get`` into a result dict."""
output = output.strip()
try:
data = json.loads(output)
return {
"status": data.get("status", "done"),
"output": data.get("output", output),
"artifacts": data.get("artifacts", []),
}
except (json.JSONDecodeError, TypeError):
# Non-JSON output — treat as completed with raw text output.
return {
"status": "done",
"output": output,
"artifacts": [],
}