OpenClawRuntimeAdapter: - spawn() shells out to `openclaw session spawn --task <t> --mode run` - get_result() polls `openclaw session get <id>` until terminal status or timeout - kill() calls `openclaw session kill <id>`, silently succeeds if finished - Parses JSON or raw-text session IDs; raises NotImplementedError with helpful message when openclaw CLI is absent from PATH ClaudeCodeRuntimeAdapter: - spawn() launches `claude --permission-mode bypassPermissions --print <task>` in a temp dir (or context["workdir"]), returns a UUID job_id - Tracks all Popen instances in a thread-safe dict - get_result() calls communicate(timeout=...), raises TimeoutError on timeout - kill() terminates the Popen; silently ignores already-finished processes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
164 lines
5.1 KiB
Python
164 lines
5.1 KiB
Python
"""
|
|
adapters/runtime/claude_code.py
|
|
Claude Code sub-agent runtime adapter — Phase 2 implementation.
|
|
|
|
Spawns the ``claude`` CLI as a non-interactive subprocess for T4/T5
|
|
implementation tasks::
|
|
|
|
claude --permission-mode bypassPermissions --print "<task>"
|
|
|
|
Each spawned process is tracked by a UUID job_id so callers can later poll
|
|
for the result or terminate the job. Stdout is captured and returned as the
|
|
agent output; stderr is included for debugging.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import uuid
|
|
|
|
from adapters.base.runtime import RuntimeAdapter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ClaudeCodeRuntimeAdapter(RuntimeAdapter):
|
|
"""
|
|
Runtime adapter that spawns ``claude`` CLI sub-agents for coding tasks.
|
|
|
|
Credentials are inherited from the environment (``ANTHROPIC_API_KEY``).
|
|
The ``claude`` CLI must be installed and reachable on PATH.
|
|
|
|
Used when a TaskBrief has ``preferred_runtime == "coding_agent"``.
|
|
"""
|
|
|
|
def __init__(self, config: dict) -> None:
|
|
"""
|
|
Initialise the Claude Code runtime adapter.
|
|
|
|
Parameters
|
|
----------
|
|
config : Loaded team.yaml config dict (reserved for future options).
|
|
"""
|
|
self._config = config
|
|
# Maps job_id → running Popen instance.
|
|
self._jobs: dict[str, subprocess.Popen] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
# ------------------------------------------------------------------
|
|
# RuntimeAdapter interface
|
|
# ------------------------------------------------------------------
|
|
|
|
def spawn(self, task: str, capability: str, context: dict) -> str:
|
|
"""
|
|
Launch ``claude --permission-mode bypassPermissions --print "<task>"``
|
|
as a non-interactive subprocess.
|
|
|
|
Parameters
|
|
----------
|
|
task : Full task description (typically a JSON-serialised brief).
|
|
capability : Capability hint (not forwarded; Claude Code resolves its
|
|
own model from the local environment).
|
|
context : Optional keys:
|
|
workdir (str) — cwd for the subprocess. A fresh
|
|
temporary directory is created if omitted.
|
|
|
|
Returns
|
|
-------
|
|
A UUID job_id string that uniquely identifies this subprocess.
|
|
"""
|
|
workdir: str = context.get("workdir") or tempfile.mkdtemp(
|
|
prefix="agency-claude-"
|
|
)
|
|
job_id = str(uuid.uuid4())
|
|
logger.info("Spawning Claude Code job %s in %s", job_id, workdir)
|
|
|
|
proc = subprocess.Popen(
|
|
["claude", "--permission-mode", "bypassPermissions", "--print", task],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
cwd=workdir,
|
|
)
|
|
|
|
with self._lock:
|
|
self._jobs[job_id] = proc
|
|
|
|
return job_id
|
|
|
|
def get_result(self, agent_id: str, timeout_s: int) -> dict:
|
|
"""
|
|
Wait for the Claude Code subprocess to complete and return its output.
|
|
|
|
Parameters
|
|
----------
|
|
agent_id : Job id returned by spawn().
|
|
timeout_s : Maximum seconds to wait before raising TimeoutError.
|
|
|
|
Returns
|
|
-------
|
|
dict with keys:
|
|
status ("completed" | "failed")
|
|
output (str — full stdout)
|
|
artifacts (list — always empty; callers must parse output)
|
|
stderr (str — full stderr)
|
|
|
|
Raises
|
|
------
|
|
KeyError
|
|
If agent_id does not correspond to a known job.
|
|
TimeoutError
|
|
If the subprocess does not finish within timeout_s seconds.
|
|
"""
|
|
with self._lock:
|
|
proc = self._jobs.get(agent_id)
|
|
|
|
if proc is None:
|
|
raise KeyError(f"No Claude Code job found for agent_id={agent_id!r}")
|
|
|
|
try:
|
|
stdout, stderr = proc.communicate(timeout=timeout_s)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
stdout, stderr = proc.communicate()
|
|
raise TimeoutError(
|
|
f"Claude Code job {agent_id!r} did not complete within {timeout_s}s."
|
|
)
|
|
|
|
status = "completed" if proc.returncode == 0 else "failed"
|
|
logger.info(
|
|
"Claude Code job %s finished: status=%s returncode=%d",
|
|
agent_id,
|
|
status,
|
|
proc.returncode,
|
|
)
|
|
|
|
return {
|
|
"status": status,
|
|
"output": stdout,
|
|
"artifacts": [],
|
|
"stderr": stderr,
|
|
}
|
|
|
|
def kill(self, agent_id: str) -> None:
|
|
"""
|
|
Terminate a running Claude Code subprocess.
|
|
|
|
Silently succeeds if the job has already finished or the id is unknown.
|
|
|
|
Parameters
|
|
----------
|
|
agent_id : Job id returned by spawn().
|
|
"""
|
|
with self._lock:
|
|
proc = self._jobs.get(agent_id)
|
|
|
|
if proc is not None:
|
|
try:
|
|
proc.terminate()
|
|
logger.info("Terminated Claude Code job %s", agent_id)
|
|
except OSError:
|
|
pass # Process already gone — that is fine.
|