""" adapters/runtime/openclaw.py OpenClaw agent runtime adapter — Phase 2 stub. TODO (Phase 2): - Implement spawn() to submit a task to an OpenClaw worker pool. - Implement get_result() to poll or subscribe for agent completion. - Implement kill() to cancel a running OpenClaw agent job. - Read endpoint and credentials from environment (OPENCLAW_API_KEY, OPENCLAW_URL). - Map capability hint to an appropriate worker class/queue. """ from __future__ import annotations from adapters.base.runtime import RuntimeAdapter class OpenClawRuntimeAdapter(RuntimeAdapter): """ Runtime adapter that dispatches agent tasks to OpenClaw workers. Expects environment variables: OPENCLAW_API_KEY — authentication token OPENCLAW_URL — base URL for the OpenClaw API """ def __init__(self, config: dict) -> None: # TODO (Phase 2): Accept loaded team.yaml config dict. # Extract OPENCLAW_API_KEY and OPENCLAW_URL from environment. # Initialise HTTP client and any job-tracking state. raise NotImplementedError("OpenClawRuntimeAdapter.__init__ is not yet implemented.") def spawn(self, task: str, capability: str, context: dict) -> str: # TODO (Phase 2): Submit task to OpenClaw worker pool. # Map capability ("reasoning-heavy" | "capable" | "fast-cheap") to # an appropriate worker queue or model hint. # Return an agent_id string that can be used to poll for results. raise NotImplementedError("OpenClawRuntimeAdapter.spawn is not yet implemented.") def get_result(self, agent_id: str, timeout_s: int) -> dict: # TODO (Phase 2): Poll or long-poll the OpenClaw API for job completion. # Raise TimeoutError if timeout_s elapses before the job finishes. # Return a dict with at minimum: {"status": ..., "output": ..., "artifacts": [...]}. raise NotImplementedError("OpenClawRuntimeAdapter.get_result is not yet implemented.") def kill(self, agent_id: str) -> None: # TODO (Phase 2): Send a cancellation request to the OpenClaw API. # Silently succeed if the agent has already finished. raise NotImplementedError("OpenClawRuntimeAdapter.kill is not yet implemented.")