Skip to content

[FEATURE] Surface agent interrupts as a dedicated StreamEvent in EventPublisher #43

@galuszkm

Description

@galuszkm

Problem Statement

When a strands agent hook raises InterruptException (human-in-the-loop pause), the exception propagates out of invoke_async and is swallowed by downstream error handlers — for example, in strands-compose-agentcore's stream_invocation, it falls into a bare except Exception and emits a generic error event:

except Exception:
    events.put_event(StreamEvent(type="error", ..., data={"message": "internal error"}))

EventPublisher has no INTERRUPT event type and its _on_complete handler emits only token usage — stop_reason and interrupts are never forwarded to the event stream. This makes it impossible for SSE-based clients consuming EventQueue / StreamEvent objects to:

  1. Detect that the agent paused waiting for human input
  2. Know which interrupt IDs and reasons are pending
  3. Re-invoke the agent with InterruptResponseContent blocks

Currently EventType has no interrupt variant. The interrupt data (Interrupt.id, Interrupt.name, Interrupt.reason) is only accessible via AgentResult.interrupts, which is not reachable through the streaming event path.

Proposed Solution

Add an INTERRUPT variant to EventType and emit it from EventPublisher when the invocation result carries stop_reason == "interrupt".

# EventType addition
INTERRUPT = "interrupt"

Emit it in _on_complete (or a dedicated AfterInvocationEvent handler) when the result indicates a pause:

def _on_complete(self, event: AfterInvocationEvent) -> None:
    if self._errored:
        return
    result = event.result
    if result is not None and result.stop_reason == "interrupt":
        for interrupt in result.interrupts:
            self._callback(
                StreamEvent(
                    type=EventType.INTERRUPT,
                    agent_name=self._agent_name,
                    data={
                        "interrupt_id": interrupt.id,
                        "name": interrupt.name,
                        "reason": interrupt.reason,
                    },
                )
            )
        return  # do not emit COMPLETE — the session is paused, not done
    # ... existing COMPLETE logic

Client code consuming the stream would then do:

pending = []
for event in client.invoke("delete the config file"):
    if event.type == "interrupt":
        pending.append(event.data)
    elif event.type == "complete":
        break

if pending:
    answer = input(f"Approve '{pending[0]['reason']}'? (yes/no): ")
    for p in pending:
        reply_blocks.append(respond(p["interrupt_id"], answer))
    for event in client.invoke(compose(*reply_blocks)):
        ...

Use Case

Any deployment where a strands agent uses BeforeToolCallEvent hooks with event.interrupt() for human-in-the-loop tool approval, e.g.:

  • An agent that calls destructive tools (delete, deploy, send_email) requires explicit user approval before execution
  • A compliance workflow where certain tool calls must be logged and acknowledged by a human operator before proceeding
  • A multi-turn REPL client (like the one in strands-compose-agentcore) that needs to detect the pause, prompt the user, and resume — all through the same SSE stream

Without this, interrupt-based HITL flows are completely unusable through the EventQueue / SSE streaming path.

Alternatives Considered

  • Catch InterruptException at the call site (e.g. in stream_invocation of downstream adapters) — works as a workaround but requires every adapter to reimplement the same interrupt-detection logic. The fix belongs in EventPublisher, which already owns the streaming contract.
  • Poll AgentResult.interrupts after invoke_async returns — only works in non-streaming usage; not possible when consuming events through EventQueue asynchronously.

Additional Context

Relevant strands types already in place — Interrupt, InterruptException, InterruptResponse, InterruptResponseContent (all in strands.types.interrupt / strands.interrupt). The wire format for client replies is {"interruptResponse": {"interruptId": ..., "response": ...}}. The missing piece is only the EventPublisherStreamEvent bridge.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions