""" adapters/runtime/claude_code.py Claude Code sub-agent runtime adapter — Phase 2 implementation. Spawns the ``claude`` CLI as a non-interactive subprocess for T4/T5 implementation tasks:: claude --permission-mode bypassPermissions --print "" Each spawned process is tracked by a UUID job_id so callers can later poll for the result or terminate the job. Stdout is captured and returned as the agent output; stderr is included for debugging. """ from __future__ import annotations import logging import subprocess import tempfile import threading import uuid from adapters.base.runtime import RuntimeAdapter logger = logging.getLogger(__name__) class ClaudeCodeRuntimeAdapter(RuntimeAdapter): """ Runtime adapter that spawns ``claude`` CLI sub-agents for coding tasks. Credentials are inherited from the environment (``ANTHROPIC_API_KEY``). The ``claude`` CLI must be installed and reachable on PATH. Used when a TaskBrief has ``preferred_runtime == "coding_agent"``. """ def __init__(self, config: dict) -> None: """ Initialise the Claude Code runtime adapter. Parameters ---------- config : Loaded team.yaml config dict (reserved for future options). """ self._config = config # Maps job_id → running Popen instance. self._jobs: dict[str, subprocess.Popen] = {} self._lock = threading.Lock() # ------------------------------------------------------------------ # RuntimeAdapter interface # ------------------------------------------------------------------ def spawn(self, task: str, capability: str, context: dict) -> str: """ Launch ``claude --permission-mode bypassPermissions --print ""`` as a non-interactive subprocess. Parameters ---------- task : Full task description (typically a JSON-serialised brief). capability : Capability hint (not forwarded; Claude Code resolves its own model from the local environment). context : Optional keys: workdir (str) — cwd for the subprocess. A fresh temporary directory is created if omitted. Returns ------- A UUID job_id string that uniquely identifies this subprocess. """ workdir: str = context.get("workdir") or tempfile.mkdtemp( prefix="agency-claude-" ) job_id = str(uuid.uuid4()) logger.info("Spawning Claude Code job %s in %s", job_id, workdir) proc = subprocess.Popen( ["claude", "--permission-mode", "bypassPermissions", "--print", task], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=workdir, ) with self._lock: self._jobs[job_id] = proc return job_id def get_result(self, agent_id: str, timeout_s: int) -> dict: """ Wait for the Claude Code subprocess to complete and return its output. Parameters ---------- agent_id : Job id returned by spawn(). timeout_s : Maximum seconds to wait before raising TimeoutError. Returns ------- dict with keys: status ("completed" | "failed") output (str — full stdout) artifacts (list — always empty; callers must parse output) stderr (str — full stderr) Raises ------ KeyError If agent_id does not correspond to a known job. TimeoutError If the subprocess does not finish within timeout_s seconds. """ with self._lock: proc = self._jobs.get(agent_id) if proc is None: raise KeyError(f"No Claude Code job found for agent_id={agent_id!r}") try: stdout, stderr = proc.communicate(timeout=timeout_s) except subprocess.TimeoutExpired: proc.kill() stdout, stderr = proc.communicate() raise TimeoutError( f"Claude Code job {agent_id!r} did not complete within {timeout_s}s." ) status = "completed" if proc.returncode == 0 else "failed" logger.info( "Claude Code job %s finished: status=%s returncode=%d", agent_id, status, proc.returncode, ) return { "status": status, "output": stdout, "artifacts": [], "stderr": stderr, } def kill(self, agent_id: str) -> None: """ Terminate a running Claude Code subprocess. Silently succeeds if the job has already finished or the id is unknown. Parameters ---------- agent_id : Job id returned by spawn(). """ with self._lock: proc = self._jobs.get(agent_id) if proc is not None: try: proc.terminate() logger.info("Terminated Claude Code job %s", agent_id) except OSError: pass # Process already gone — that is fine.