The Agent Loop

The heart of any coding agent. A state machine that thinks, acts, observes, and repeats.

Evidence source: Amp Code v0.0.1769212917 (ThreadWorker class, 1600+ lines of implementation)


The Core Insight

Every coding agent is fundamentally a loop:

User task → Think → Act → Observe → More work? → (loop or done)

But the naive version fails in production. Real agents need:

  • State management - Know where you are in the cycle
  • Serialized mutations - Don't corrupt state with race conditions
  • Tool batching - Run independent tools in parallel
  • Error recovery - Retry on failure, not crash
  • Cancellation - User can stop at any point

Amp solves these with a multi-level state machine and mutex-protected message handling.


State Machine Architecture

Why Multiple State Machines?

A single state variable isn't enough. Amp uses three coordinated state machines:

┌─────────────────────────────────────────────────────────────────┐
│                      ThreadWorker                                │
│                                                                 │
│   ┌─────────────────┐   ┌─────────────────┐   ┌─────────────┐  │
│   │  Worker State   │   │ Inference State │   │ Interaction │  │
│   │                 │   │                 │   │   State     │  │
│   │  initial ─────► │   │  idle ◄──────►  │   │  (computed) │  │
│   │      │          │   │   │       │     │   │             │  │
│   │      ▼          │   │   ▼       │     │   │  Based on:  │  │
│   │   active        │   │ running ──┘     │   │  - thread   │  │
│   │                 │   │   │             │   │  - inference│  │
│   │                 │   │   ▼             │   │  - tools    │  │
│   │                 │   │ cancelled       │   │             │  │
│   └─────────────────┘   └─────────────────┘   └─────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1. Worker State (Lifecycle)

Controls whether the worker is ready to operate.

State Description Transition
initial Worker created, not yet ready Constructor
active Thread acquired, operational After acquireThread()

Simple, but crucial - you can't process messages until the thread is acquired.

2. Inference State (LLM Calls)

Controls the LLM inference operation.

State Description Transition
idle No inference running Initial / after completion
running LLM call in progress During streaming
cancelled User cancelled After abort
idle ──[runInference()]──► running ──[completion]──► idle
                              │
                              └──[cancel()]──► cancelled ──[runInference()]──► idle

Key insight: cancelled is a recoverable state. You can start a new inference from cancelled.

3. Interaction State (What User Can Do)

Computed from the other states - tells the UI what actions are valid.

State Condition User Can
false Busy or invalid Nothing (wait)
handoff Handoff in progress Wait
user-message-initial Empty thread Send first message
user-message-reply After end_turn Send reply
user-tool-approval Tool blocked Approve/reject
tool-running Tools executing Cancel

Why compute it? Because it depends on multiple factors:

def compute_interaction_state(thread, inference_state, handoff_state):
    # Handoff takes priority
    if handoff_state and not handoff_state.result:
        return "handoff"

    # Busy states block input
    if inference_state in ["running", "cancelled"]:
        return False

    last_message = thread.messages[-1] if thread.messages else None

    if not last_message:
        return "user-message-initial"

    if last_message.role == "assistant":
        if last_message.state.type == "complete":
            if last_message.state.stop_reason == "end_turn":
                return "user-message-reply"
        return False

    # Check tool states
    if last_message.role == "user":
        for content in last_message.content:
            if content.type == "tool_result":
                if content.run.status == "blocked-on-user":
                    return "user-tool-approval"
                if content.run.status == "in-progress":
                    return "tool-running"

    return False

Message Flow

The Delta Pattern

Amp doesn't mutate state directly. All changes flow through deltas - atomic state transitions.

# Instead of: thread.messages.append(message)
# Do: handle({"type": "user:message", "message": message})

Delta types:

Delta Type Purpose Triggers
user:message New user message Inference
user:tool-input Tool approval Tool execution
tool:data Tool result Completion check
assistant:message Complete response Tool execution
assistant:message-update Streaming update UI update
inference:completed Inference done Queue check, hooks
cancelled User cancelled Reset state
course-correction Inject correction Resume inference

Mutex-Protected Handling

All deltas go through a single entry point with mutex protection:

async def handle(delta, signal=None):
    """Main entry point for all thread mutations."""
    is_edit = delta.type == "user:message" and delta.index is not None

    await handle_mutex.acquire()
    try:
        await inner_handle(delta, signal)
    finally:
        handle_mutex.release()

    if is_edit:
        await cleanup_after_edit()

Why mutex? Even in async code, you need serialization. Consider:

  1. User sends message → starts inference
  2. Tool result arrives → needs to update thread
  3. If both run concurrently, thread state corrupts

The mutex ensures deltas apply one at a time, in order.

The Delta Handler

After applying the delta, trigger reactions:

def on_thread_delta(delta):
    match delta.type:
        case "user:message":
            reset_retry_attempts()
            turn_start_time = now()
            track_files(delta.message.file_mentions)
            run_inference(skip_course_correction=True)

        case "tool:data":
            handle_tool_data(delta)

        case "assistant:message":
            if delta.message.state.stop_reason == "tool_use":
                tool_orchestrator.on_assistant_complete(delta.message)

        case "inference:completed":
            handle_inference_completed()

        case "cancelled":
            reset_retry_attempts()

The Inference Cycle

Running Inference

async def run_inference(skip_course_correction=False):
    # 1. Abort existing inference
    if ops.inference:
        ops.inference.abort()
        ops.inference = None

    # 2. Reset cancelled state
    if inference_state == "cancelled":
        inference_state = "idle"

    # 3. Create abort controller
    abort_controller = AbortController()
    ops.inference = abort_controller

    # 4. Check course correction (unless skipped)
    if not skip_course_correction:
        if await check_pending_course_correction():
            return  # Course correction will handle it

    # 5. Resolve model from mode
    config = await get_config()
    model, mode = resolve_model_and_mode(config, thread)
    provider = get_provider(model)

    try:
        inference_state = "running"

        # 6. Stream response
        async for message in provider.stream(model=model, thread=thread):
            await handle({
                "type": "assistant:message-update",
                "message": message
            })

        # 7. Signal completion
        await handle({"type": "inference:completed"})

    except RetryableError as e:
        delay = get_retry_delay()
        if delay:
            start_retry_countdown(delay)
        ephemeral_error = e

    except Exception as e:
        ephemeral_error = e

    finally:
        inference_state = "idle"

Stop Reasons

When inference completes, check why:

Stop Reason Meaning Next Action
end_turn Agent finished Check queue, run course correction
tool_use Tool calls needed Execute tools
max_tokens Output limit hit Error (or continue)
refusal Model refused Error to user
def handle_inference_completed():
    reset_retry_attempts()

    last = get_last_assistant_message()
    if not last:
        return

    stop_reason = last.state.stop_reason

    if stop_reason == "tool_use":
        tool_orchestrator.on_assistant_complete(last)

    elif stop_reason == "end_turn":
        if thread.queued_messages:
            handle({"type": "user:message-queue:dequeue"})
        else:
            run_course_correction()
            fire_hooks()

    elif stop_reason == "refusal":
        ephemeral_error = Error("Model refused request")

Tool Execution

The Batching Algorithm

Naive approach: run tools sequentially. Better: run independent tools in parallel.

Amp uses conflict-based batching:

def batch_tools_by_conflict(tool_uses, tool_service):
    """
    Group tools so conflicting tools are in different batches.
    Tools in the same batch run in parallel.
    """
    if not tool_uses:
        return []

    batches = []
    current_batch = []

    for tool in tool_uses:
        if has_conflict_with_batch(tool, current_batch, tool_service):
            if current_batch:
                batches.append(current_batch)
                current_batch = []
        current_batch.append(tool)

    if current_batch:
        batches.append(current_batch)

    return batches

Conflict Detection

Two tools conflict if they can't safely run in parallel:

def has_conflict(tool_a, tool_b, tool_service):
    profile_a = tool_service.get_execution_profile(tool_a.name)
    profile_b = tool_service.get_execution_profile(tool_b.name)

    # No profile = assume conflict (conservative)
    if not profile_a or not profile_b:
        return True

    # Serial tools always conflict
    if profile_a.serial or profile_b.serial:
        return True

    # Check resource key conflicts
    keys_a = profile_a.resource_keys(tool_a.input or {})
    keys_b = profile_b.resource_keys(tool_b.input or {})

    for key_a in keys_a:
        for key_b in keys_b:
            if key_a.key == key_b.key:
                # Same resource - conflict if either is writing
                if key_a.mode == "write" or key_b.mode == "write":
                    return True

    return False  # No conflict - can run in parallel

Resource keys example:

  • Read("/src/app.ts")[{key: "/src/app.ts", mode: "read"}]
  • Edit("/src/app.ts", ...)[{key: "/src/app.ts", mode: "write"}]
  • Two Reads on same file: no conflict (parallel OK)
  • Read + Edit on same file: conflict (sequential)

Batch Execution

async def execute_batches(batches):
    """Execute batches sequentially, tools within batch in parallel."""
    for batch in batches:
        results = await asyncio.gather(
            *[invoke_tool(tool) for tool in batch],
            return_exceptions=True
        )

        # Log failures but continue
        failures = [r for r in results if isinstance(r, Exception)]
        if failures:
            log.warning(f"{len(failures)} tools failed in batch")

Tool Run States

Each tool has its own state:

Status Description Terminal?
in-progress Executing No
blocked-on-user Awaiting approval No
done Completed successfully Yes
cancelled Cancelled Yes
rejected-by-user User rejected Yes
error Failed Yes

After each tool completes, check if all tools are done:

def should_run_inference_after_tool(thread, tool_use_id):
    """Check if all tool results are terminal."""
    last_user = find_last_message(thread, "user")
    if not last_user:
        return False

    for content in last_user.content:
        if content.type == "tool_result":
            if not is_terminal_status(content.run):
                return False  # Still have non-terminal tools

    return True  # All done - run inference

Error Handling & Retry

Retryable Errors

Not all errors should crash. Amp has automatic retry with exponential backoff:

BASE_RETRY_SECONDS = 5
MAX_RETRY_SECONDS = 60
MAX_AUTO_RETRIES = 5

def get_retry_delay(attempt):
    if attempt >= MAX_AUTO_RETRIES:
        return None  # Give up

    delay = BASE_RETRY_SECONDS * (2 ** attempt)
    return min(delay, MAX_RETRY_SECONDS)

# Attempt 0: 5s
# Attempt 1: 10s
# Attempt 2: 20s
# Attempt 3: 40s
# Attempt 4: 60s (capped)
# Attempt 5+: None (max retries)

Ephemeral Errors

Errors that can be retried are stored as "ephemeral" - shown to user but clearable:

async def run_inference(...):
    try:
        # ... inference logic
    except RetryableError as e:
        delay = get_retry_delay(retry_attempt)
        if delay:
            start_retry_countdown(delay)
        ephemeral_error = e  # Show to user, but can retry
    except Exception as e:
        ephemeral_error = e  # Show to user

The Retry Flow

async def retry():
    """User clicks retry, or auto-retry fires."""
    clear_retry_countdown()

    if ephemeral_error:
        retry_attempt += 1
        ephemeral_error = None

    abort_inference()
    truncate_incomplete_message()
    inference_state = "idle"

    await run_inference()

Cancellation

Cancel Reasons

Reason Trigger
user:cancelled User clicks cancel
user:interrupted User sends new message
system:edited User edited earlier message
system:disposed Worker cleanup
system:safety Dangerous tool on resume

Cancellation Flow

async def cancel():
    """Cancel all operations."""
    # 1. Cancel inference
    if ops.inference:
        ops.inference.abort()
        ops.inference = None
    inference_state = "cancelled"

    # 2. Cancel all tools
    await tool_orchestrator.cancel_all("user:cancelled")

    # 3. Notify
    await handle({"type": "cancelled"})

Dangerous Tools

Some tools shouldn't auto-resume after app restart:

DANGEROUS_TOOLS = ["Bash", "shell_command", "repl", "Task"]  # + all MCP tools

def should_cancel_on_resume(tool_name):
    return tool_name in DANGEROUS_TOOLS or tool_name.startswith("mcp__")

Message Queuing

Users can send messages while the agent is busy:

def handle_enqueue(delta):
    """User sent message while busy."""
    interaction = compute_interaction_state(thread, inference_state)

    # If we can process now, do it
    if interaction != "tool-running":
        if inference_state == "cancelled":
            handle({"type": "user:message-queue:dequeue"})
            return
        if inference_state == "idle":
            last = thread.messages[-1]
            if last and last.role == "assistant":
                if last.state.type in ["cancelled", "error"]:
                    handle({"type": "user:message-queue:dequeue"})
                    return
                if last.state.stop_reason != "tool_use":
                    handle({"type": "user:message-queue:dequeue"})
                    return

    # Otherwise, keep in queue - will dequeue on end_turn

Constants Reference

Constant Value Purpose
BASE_RETRY_SECONDS 5 Initial retry delay
MAX_RETRY_SECONDS 60 Retry delay cap
MAX_AUTO_RETRIES 5 Retries before giving up
MAX_OUTPUT_TOKENS 32,000 Max LLM output
SUBAGENT_MAX_TOKENS 8,000 Subagent output limit

Implementation Checklist

Building your own agent loop? Ensure:

  • State Machine

    • Worker state (initial → active)
    • Inference state (idle ↔ running ↔ cancelled)
    • Interaction state computation
  • Message Handling

    • Mutex serialization
    • Delta pattern for all mutations
    • Delta type dispatch
  • Inference Cycle

    • Model resolution from mode
    • Stream processing
    • Stop reason handling
  • Tool Execution

    • Conflict-based batching
    • Parallel within batch
    • Completion tracking
  • Error Handling

    • Retryable error detection
    • Exponential backoff
    • Ephemeral error display
  • Cancellation

    • Inference abort
    • Tool cancellation
    • Dangerous tool handling

What's Next

The loop calls tools, but what are tools and how do they work?

04-tool-system.md - Tool definitions, registration, execution

For reconstruction-grade detail with complete pseudocode and data structures:

03-agent-loop.spec.md - Full implementation specification