246 lines
8.8 KiB
Python
246 lines
8.8 KiB
Python
"""
|
|
core/escalation.py
|
|
Failure classification and escalation logic for the-agency pipeline.
|
|
|
|
When an agent returns a result the EscalationHandler decides what to do next:
|
|
retry, escalate, salvage-and-retry, or mark complete.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
from core.task_brief import TaskBrief
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FailureType
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class FailureType(str, Enum):
|
|
"""Classification of an agent result."""
|
|
|
|
SUCCESS = "success"
|
|
"""Agent completed the task and the output looks valid."""
|
|
|
|
BAD_OUTPUT = "bad_output"
|
|
"""Agent returned output that fails acceptance criteria or is malformed."""
|
|
|
|
PARTIAL = "partial"
|
|
"""Agent completed some but not all of the required work."""
|
|
|
|
BLOCKED = "blocked"
|
|
"""Agent could not proceed due to a dependency or missing information."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# EscalationResult
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class EscalationResult:
|
|
"""
|
|
The decision produced by EscalationHandler.handle().
|
|
|
|
Attributes
|
|
----------
|
|
action : What the runner should do next.
|
|
"retry" — resubmit amended_brief as-is.
|
|
"escalate" — pass the failure up to the parent tier.
|
|
"salvage_and_retry" — use salvaged_result as partial context
|
|
and resubmit amended_brief.
|
|
"complete" — accept the result and move on.
|
|
amended_brief : A (possibly modified) TaskBrief to re-submit.
|
|
None when action == "complete" or "escalate".
|
|
salvaged_result : Partial output worth keeping from a PARTIAL result.
|
|
None unless action == "salvage_and_retry".
|
|
reason : Human-readable explanation of the decision.
|
|
"""
|
|
|
|
action: str # "retry" | "escalate" | "salvage_and_retry" | "complete"
|
|
amended_brief: Optional[TaskBrief] = None
|
|
salvaged_result: Optional[dict[str, Any]] = None
|
|
reason: str = ""
|
|
|
|
def __post_init__(self) -> None:
|
|
valid_actions = {"retry", "escalate", "salvage_and_retry", "complete"}
|
|
if self.action not in valid_actions:
|
|
raise ValueError(f"Invalid action {self.action!r}. Must be one of {valid_actions}.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# EscalationHandler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class EscalationHandler:
|
|
"""
|
|
Stateless handler that classifies agent results and produces escalation
|
|
decisions.
|
|
|
|
Usage
|
|
-----
|
|
handler = EscalationHandler()
|
|
failure_type = handler.classify_result(result)
|
|
decision = handler.handle(brief, result)
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# Classification
|
|
# ------------------------------------------------------------------
|
|
|
|
def classify_result(self, result: dict[str, Any]) -> FailureType:
|
|
"""
|
|
Inspect the result dict returned by an agent and return a FailureType.
|
|
|
|
The result dict is expected to contain at minimum a "status" key.
|
|
Recognised status values:
|
|
"done" / "passed" → SUCCESS
|
|
"partial" → PARTIAL
|
|
"blocked" → BLOCKED
|
|
"failed" → BAD_OUTPUT (default for unknown statuses too)
|
|
|
|
The handler also inspects an optional "blocking_issues" or "blockers"
|
|
key: if present and non-empty the result is treated as BLOCKED even if
|
|
status says "failed".
|
|
"""
|
|
status = str(result.get("status", "")).lower().strip()
|
|
|
|
# Explicit blocked signals
|
|
if status == "blocked":
|
|
return FailureType.BLOCKED
|
|
|
|
# Check for blocking_issues / blockers fields
|
|
blocking_issues: list = result.get("blocking_issues") or result.get("blockers") or []
|
|
if blocking_issues:
|
|
return FailureType.BLOCKED
|
|
|
|
if status in ("done", "passed"):
|
|
return FailureType.SUCCESS
|
|
|
|
if status == "partial":
|
|
return FailureType.PARTIAL
|
|
|
|
# "failed" or anything unrecognised
|
|
return FailureType.BAD_OUTPUT
|
|
|
|
# ------------------------------------------------------------------
|
|
# Decision
|
|
# ------------------------------------------------------------------
|
|
|
|
def handle(self, brief: TaskBrief, result: dict[str, Any]) -> EscalationResult:
|
|
"""
|
|
Decide what to do with an agent result.
|
|
|
|
Rules
|
|
-----
|
|
BLOCKED → escalate immediately (no retries for blocked states)
|
|
BAD_OUTPUT + retries remaining → retry with amended brief
|
|
(retry_count incremented)
|
|
BAD_OUTPUT + retries exhausted → escalate
|
|
PARTIAL → salvage the good parts and retry with amended brief
|
|
SUCCESS → complete
|
|
|
|
Parameters
|
|
----------
|
|
brief : The TaskBrief that was executed.
|
|
result : The dict returned by the agent runtime.
|
|
|
|
Returns
|
|
-------
|
|
An EscalationResult with the recommended action.
|
|
"""
|
|
failure_type = self.classify_result(result)
|
|
|
|
# ---- SUCCESS ---------------------------------------------------
|
|
if failure_type == FailureType.SUCCESS:
|
|
return EscalationResult(
|
|
action="complete",
|
|
reason="Agent reported success; acceptance criteria satisfied.",
|
|
)
|
|
|
|
# ---- BLOCKED ---------------------------------------------------
|
|
if failure_type == FailureType.BLOCKED:
|
|
blocking_issues = (
|
|
result.get("blocking_issues")
|
|
or result.get("blockers")
|
|
or [result.get("reason", "unspecified blocker")]
|
|
)
|
|
return EscalationResult(
|
|
action="escalate",
|
|
reason=(
|
|
f"Agent is blocked and cannot proceed. "
|
|
f"Blockers: {blocking_issues}"
|
|
),
|
|
)
|
|
|
|
# ---- PARTIAL ---------------------------------------------------
|
|
if failure_type == FailureType.PARTIAL:
|
|
salvaged: dict[str, Any] = {}
|
|
if "completed" in result:
|
|
salvaged["completed"] = result["completed"]
|
|
if "artifacts" in result:
|
|
salvaged["artifacts"] = result["artifacts"]
|
|
|
|
amended = self._amend_brief_for_retry(brief, result, failure_type)
|
|
return EscalationResult(
|
|
action="salvage_and_retry",
|
|
amended_brief=amended,
|
|
salvaged_result=salvaged or result,
|
|
reason=(
|
|
"Agent returned partial output. "
|
|
"Salvaging completed artifacts and retrying remaining work."
|
|
),
|
|
)
|
|
|
|
# ---- BAD_OUTPUT ------------------------------------------------
|
|
# failure_type == FailureType.BAD_OUTPUT
|
|
retries_remaining = brief.retry_budget - brief.retry_count
|
|
|
|
if retries_remaining > 0:
|
|
amended = self._amend_brief_for_retry(brief, result, failure_type)
|
|
return EscalationResult(
|
|
action="retry",
|
|
amended_brief=amended,
|
|
reason=(
|
|
f"Agent returned bad output. "
|
|
f"Retrying (attempt {amended.retry_count}/{amended.retry_budget})."
|
|
),
|
|
)
|
|
else:
|
|
return EscalationResult(
|
|
action="escalate",
|
|
reason=(
|
|
f"Agent returned bad output and retry budget "
|
|
f"({brief.retry_budget}) is exhausted. Escalating."
|
|
),
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _amend_brief_for_retry(
|
|
self,
|
|
brief: TaskBrief,
|
|
result: dict[str, Any],
|
|
failure_type: FailureType,
|
|
) -> TaskBrief:
|
|
"""
|
|
Return a copy of the brief with retry_count incremented and
|
|
previous-failure context injected into the context dict.
|
|
"""
|
|
import copy
|
|
|
|
amended = copy.deepcopy(brief)
|
|
amended.retry_count += 1
|
|
|
|
# Inject failure context so the agent knows what went wrong.
|
|
amended.context["_previous_failure"] = {
|
|
"failure_type": failure_type.value,
|
|
"result": result,
|
|
"attempt": amended.retry_count,
|
|
}
|
|
|
|
return amended
|