""" core/escalation.py Failure classification and escalation logic for the-agency pipeline. When an agent returns a result the EscalationHandler decides what to do next: retry, escalate, salvage-and-retry, or mark complete. """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Any, Optional from core.task_brief import TaskBrief # --------------------------------------------------------------------------- # FailureType # --------------------------------------------------------------------------- class FailureType(str, Enum): """Classification of an agent result.""" SUCCESS = "success" """Agent completed the task and the output looks valid.""" BAD_OUTPUT = "bad_output" """Agent returned output that fails acceptance criteria or is malformed.""" PARTIAL = "partial" """Agent completed some but not all of the required work.""" BLOCKED = "blocked" """Agent could not proceed due to a dependency or missing information.""" # --------------------------------------------------------------------------- # EscalationResult # --------------------------------------------------------------------------- @dataclass class EscalationResult: """ The decision produced by EscalationHandler.handle(). Attributes ---------- action : What the runner should do next. "retry" — resubmit amended_brief as-is. "escalate" — pass the failure up to the parent tier. "salvage_and_retry" — use salvaged_result as partial context and resubmit amended_brief. "complete" — accept the result and move on. amended_brief : A (possibly modified) TaskBrief to re-submit. None when action == "complete" or "escalate". salvaged_result : Partial output worth keeping from a PARTIAL result. None unless action == "salvage_and_retry". reason : Human-readable explanation of the decision. """ action: str # "retry" | "escalate" | "salvage_and_retry" | "complete" amended_brief: Optional[TaskBrief] = None salvaged_result: Optional[dict[str, Any]] = None reason: str = "" def __post_init__(self) -> None: valid_actions = {"retry", "escalate", "salvage_and_retry", "complete"} if self.action not in valid_actions: raise ValueError(f"Invalid action {self.action!r}. Must be one of {valid_actions}.") # --------------------------------------------------------------------------- # EscalationHandler # --------------------------------------------------------------------------- class EscalationHandler: """ Stateless handler that classifies agent results and produces escalation decisions. Usage ----- handler = EscalationHandler() failure_type = handler.classify_result(result) decision = handler.handle(brief, result) """ # ------------------------------------------------------------------ # Classification # ------------------------------------------------------------------ def classify_result(self, result: dict[str, Any]) -> FailureType: """ Inspect the result dict returned by an agent and return a FailureType. The result dict is expected to contain at minimum a "status" key. Recognised status values: "done" / "passed" → SUCCESS "partial" → PARTIAL "blocked" → BLOCKED "failed" → BAD_OUTPUT (default for unknown statuses too) The handler also inspects an optional "blocking_issues" or "blockers" key: if present and non-empty the result is treated as BLOCKED even if status says "failed". """ status = str(result.get("status", "")).lower().strip() # Explicit blocked signals if status == "blocked": return FailureType.BLOCKED # Check for blocking_issues / blockers fields blocking_issues: list = result.get("blocking_issues") or result.get("blockers") or [] if blocking_issues: return FailureType.BLOCKED if status in ("done", "passed"): return FailureType.SUCCESS if status == "partial": return FailureType.PARTIAL # "failed" or anything unrecognised return FailureType.BAD_OUTPUT # ------------------------------------------------------------------ # Decision # ------------------------------------------------------------------ def handle(self, brief: TaskBrief, result: dict[str, Any]) -> EscalationResult: """ Decide what to do with an agent result. Rules ----- BLOCKED → escalate immediately (no retries for blocked states) BAD_OUTPUT + retries remaining → retry with amended brief (retry_count incremented) BAD_OUTPUT + retries exhausted → escalate PARTIAL → salvage the good parts and retry with amended brief SUCCESS → complete Parameters ---------- brief : The TaskBrief that was executed. result : The dict returned by the agent runtime. Returns ------- An EscalationResult with the recommended action. """ failure_type = self.classify_result(result) # ---- SUCCESS --------------------------------------------------- if failure_type == FailureType.SUCCESS: return EscalationResult( action="complete", reason="Agent reported success; acceptance criteria satisfied.", ) # ---- BLOCKED --------------------------------------------------- if failure_type == FailureType.BLOCKED: blocking_issues = ( result.get("blocking_issues") or result.get("blockers") or [result.get("reason", "unspecified blocker")] ) return EscalationResult( action="escalate", reason=( f"Agent is blocked and cannot proceed. " f"Blockers: {blocking_issues}" ), ) # ---- PARTIAL --------------------------------------------------- if failure_type == FailureType.PARTIAL: salvaged: dict[str, Any] = {} if "completed" in result: salvaged["completed"] = result["completed"] if "artifacts" in result: salvaged["artifacts"] = result["artifacts"] amended = self._amend_brief_for_retry(brief, result, failure_type) return EscalationResult( action="salvage_and_retry", amended_brief=amended, salvaged_result=salvaged or result, reason=( "Agent returned partial output. " "Salvaging completed artifacts and retrying remaining work." ), ) # ---- BAD_OUTPUT ------------------------------------------------ # failure_type == FailureType.BAD_OUTPUT retries_remaining = brief.retry_budget - brief.retry_count if retries_remaining > 0: amended = self._amend_brief_for_retry(brief, result, failure_type) return EscalationResult( action="retry", amended_brief=amended, reason=( f"Agent returned bad output. " f"Retrying (attempt {amended.retry_count}/{amended.retry_budget})." ), ) else: return EscalationResult( action="escalate", reason=( f"Agent returned bad output and retry budget " f"({brief.retry_budget}) is exhausted. Escalating." ), ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _amend_brief_for_retry( self, brief: TaskBrief, result: dict[str, Any], failure_type: FailureType, ) -> TaskBrief: """ Return a copy of the brief with retry_count incremented and previous-failure context injected into the context dict. """ import copy amended = copy.deepcopy(brief) amended.retry_count += 1 # Inject failure context so the agent knows what went wrong. amended.context["_previous_failure"] = { "failure_type": failure_type.value, "result": result, "attempt": amended.retry_count, } return amended