The Agent Loop
The heart of any coding agent. A state machine that thinks, acts, observes, and repeats.
Evidence source: Amp Code v0.0.1769212917 (ThreadWorker class, 1600+ lines of implementation)
The Core Insight
Every coding agent is fundamentally a loop:
User task → Think → Act → Observe → More work? → (loop or done)
But the naive version fails in production. Real agents need:
- State management - Know where you are in the cycle
- Serialized mutations - Don't corrupt state with race conditions
- Tool batching - Run independent tools in parallel
- Error recovery - Retry on failure, not crash
- Cancellation - User can stop at any point
Amp solves these with a multi-level state machine and mutex-protected message handling.
State Machine Architecture
Why Multiple State Machines?
A single state variable isn't enough. Amp uses three coordinated state machines:
┌─────────────────────────────────────────────────────────────────┐
│ ThreadWorker │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
│ │ Worker State │ │ Inference State │ │ Interaction │ │
│ │ │ │ │ │ State │ │
│ │ initial ─────► │ │ idle ◄──────► │ │ (computed) │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ ▼ │ │ ▼ │ │ │ Based on: │ │
│ │ active │ │ running ──┘ │ │ - thread │ │
│ │ │ │ │ │ │ - inference│ │
│ │ │ │ ▼ │ │ - tools │ │
│ │ │ │ cancelled │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1. Worker State (Lifecycle)
Controls whether the worker is ready to operate.
| State | Description | Transition |
|---|---|---|
initial |
Worker created, not yet ready | Constructor |
active |
Thread acquired, operational | After acquireThread() |
Simple, but crucial - you can't process messages until the thread is acquired.
2. Inference State (LLM Calls)
Controls the LLM inference operation.
| State | Description | Transition |
|---|---|---|
idle |
No inference running | Initial / after completion |
running |
LLM call in progress | During streaming |
cancelled |
User cancelled | After abort |
idle ──[runInference()]──► running ──[completion]──► idle
│
└──[cancel()]──► cancelled ──[runInference()]──► idle
Key insight: cancelled is a recoverable state. You can start a new inference from cancelled.
3. Interaction State (What User Can Do)
Computed from the other states - tells the UI what actions are valid.
| State | Condition | User Can |
|---|---|---|
false |
Busy or invalid | Nothing (wait) |
handoff |
Handoff in progress | Wait |
user-message-initial |
Empty thread | Send first message |
user-message-reply |
After end_turn |
Send reply |
user-tool-approval |
Tool blocked | Approve/reject |
tool-running |
Tools executing | Cancel |
Why compute it? Because it depends on multiple factors:
def compute_interaction_state(thread, inference_state, handoff_state):
# Handoff takes priority
if handoff_state and not handoff_state.result:
return "handoff"
# Busy states block input
if inference_state in ["running", "cancelled"]:
return False
last_message = thread.messages[-1] if thread.messages else None
if not last_message:
return "user-message-initial"
if last_message.role == "assistant":
if last_message.state.type == "complete":
if last_message.state.stop_reason == "end_turn":
return "user-message-reply"
return False
# Check tool states
if last_message.role == "user":
for content in last_message.content:
if content.type == "tool_result":
if content.run.status == "blocked-on-user":
return "user-tool-approval"
if content.run.status == "in-progress":
return "tool-running"
return False
Message Flow
The Delta Pattern
Amp doesn't mutate state directly. All changes flow through deltas - atomic state transitions.
# Instead of: thread.messages.append(message)
# Do: handle({"type": "user:message", "message": message})
Delta types:
| Delta Type | Purpose | Triggers |
|---|---|---|
user:message |
New user message | Inference |
user:tool-input |
Tool approval | Tool execution |
tool:data |
Tool result | Completion check |
assistant:message |
Complete response | Tool execution |
assistant:message-update |
Streaming update | UI update |
inference:completed |
Inference done | Queue check, hooks |
cancelled |
User cancelled | Reset state |
course-correction |
Inject correction | Resume inference |
Mutex-Protected Handling
All deltas go through a single entry point with mutex protection:
async def handle(delta, signal=None):
"""Main entry point for all thread mutations."""
is_edit = delta.type == "user:message" and delta.index is not None
await handle_mutex.acquire()
try:
await inner_handle(delta, signal)
finally:
handle_mutex.release()
if is_edit:
await cleanup_after_edit()
Why mutex? Even in async code, you need serialization. Consider:
- User sends message → starts inference
- Tool result arrives → needs to update thread
- If both run concurrently, thread state corrupts
The mutex ensures deltas apply one at a time, in order.
The Delta Handler
After applying the delta, trigger reactions:
def on_thread_delta(delta):
match delta.type:
case "user:message":
reset_retry_attempts()
turn_start_time = now()
track_files(delta.message.file_mentions)
run_inference(skip_course_correction=True)
case "tool:data":
handle_tool_data(delta)
case "assistant:message":
if delta.message.state.stop_reason == "tool_use":
tool_orchestrator.on_assistant_complete(delta.message)
case "inference:completed":
handle_inference_completed()
case "cancelled":
reset_retry_attempts()
The Inference Cycle
Running Inference
async def run_inference(skip_course_correction=False):
# 1. Abort existing inference
if ops.inference:
ops.inference.abort()
ops.inference = None
# 2. Reset cancelled state
if inference_state == "cancelled":
inference_state = "idle"
# 3. Create abort controller
abort_controller = AbortController()
ops.inference = abort_controller
# 4. Check course correction (unless skipped)
if not skip_course_correction:
if await check_pending_course_correction():
return # Course correction will handle it
# 5. Resolve model from mode
config = await get_config()
model, mode = resolve_model_and_mode(config, thread)
provider = get_provider(model)
try:
inference_state = "running"
# 6. Stream response
async for message in provider.stream(model=model, thread=thread):
await handle({
"type": "assistant:message-update",
"message": message
})
# 7. Signal completion
await handle({"type": "inference:completed"})
except RetryableError as e:
delay = get_retry_delay()
if delay:
start_retry_countdown(delay)
ephemeral_error = e
except Exception as e:
ephemeral_error = e
finally:
inference_state = "idle"
Stop Reasons
When inference completes, check why:
| Stop Reason | Meaning | Next Action |
|---|---|---|
end_turn |
Agent finished | Check queue, run course correction |
tool_use |
Tool calls needed | Execute tools |
max_tokens |
Output limit hit | Error (or continue) |
refusal |
Model refused | Error to user |
def handle_inference_completed():
reset_retry_attempts()
last = get_last_assistant_message()
if not last:
return
stop_reason = last.state.stop_reason
if stop_reason == "tool_use":
tool_orchestrator.on_assistant_complete(last)
elif stop_reason == "end_turn":
if thread.queued_messages:
handle({"type": "user:message-queue:dequeue"})
else:
run_course_correction()
fire_hooks()
elif stop_reason == "refusal":
ephemeral_error = Error("Model refused request")
Tool Execution
The Batching Algorithm
Naive approach: run tools sequentially. Better: run independent tools in parallel.
Amp uses conflict-based batching:
def batch_tools_by_conflict(tool_uses, tool_service):
"""
Group tools so conflicting tools are in different batches.
Tools in the same batch run in parallel.
"""
if not tool_uses:
return []
batches = []
current_batch = []
for tool in tool_uses:
if has_conflict_with_batch(tool, current_batch, tool_service):
if current_batch:
batches.append(current_batch)
current_batch = []
current_batch.append(tool)
if current_batch:
batches.append(current_batch)
return batches
Conflict Detection
Two tools conflict if they can't safely run in parallel:
def has_conflict(tool_a, tool_b, tool_service):
profile_a = tool_service.get_execution_profile(tool_a.name)
profile_b = tool_service.get_execution_profile(tool_b.name)
# No profile = assume conflict (conservative)
if not profile_a or not profile_b:
return True
# Serial tools always conflict
if profile_a.serial or profile_b.serial:
return True
# Check resource key conflicts
keys_a = profile_a.resource_keys(tool_a.input or {})
keys_b = profile_b.resource_keys(tool_b.input or {})
for key_a in keys_a:
for key_b in keys_b:
if key_a.key == key_b.key:
# Same resource - conflict if either is writing
if key_a.mode == "write" or key_b.mode == "write":
return True
return False # No conflict - can run in parallel
Resource keys example:
Read("/src/app.ts")→[{key: "/src/app.ts", mode: "read"}]Edit("/src/app.ts", ...)→[{key: "/src/app.ts", mode: "write"}]- Two Reads on same file: no conflict (parallel OK)
- Read + Edit on same file: conflict (sequential)
Batch Execution
async def execute_batches(batches):
"""Execute batches sequentially, tools within batch in parallel."""
for batch in batches:
results = await asyncio.gather(
*[invoke_tool(tool) for tool in batch],
return_exceptions=True
)
# Log failures but continue
failures = [r for r in results if isinstance(r, Exception)]
if failures:
log.warning(f"{len(failures)} tools failed in batch")
Tool Run States
Each tool has its own state:
| Status | Description | Terminal? |
|---|---|---|
in-progress |
Executing | No |
blocked-on-user |
Awaiting approval | No |
done |
Completed successfully | Yes |
cancelled |
Cancelled | Yes |
rejected-by-user |
User rejected | Yes |
error |
Failed | Yes |
After each tool completes, check if all tools are done:
def should_run_inference_after_tool(thread, tool_use_id):
"""Check if all tool results are terminal."""
last_user = find_last_message(thread, "user")
if not last_user:
return False
for content in last_user.content:
if content.type == "tool_result":
if not is_terminal_status(content.run):
return False # Still have non-terminal tools
return True # All done - run inference
Error Handling & Retry
Retryable Errors
Not all errors should crash. Amp has automatic retry with exponential backoff:
BASE_RETRY_SECONDS = 5
MAX_RETRY_SECONDS = 60
MAX_AUTO_RETRIES = 5
def get_retry_delay(attempt):
if attempt >= MAX_AUTO_RETRIES:
return None # Give up
delay = BASE_RETRY_SECONDS * (2 ** attempt)
return min(delay, MAX_RETRY_SECONDS)
# Attempt 0: 5s
# Attempt 1: 10s
# Attempt 2: 20s
# Attempt 3: 40s
# Attempt 4: 60s (capped)
# Attempt 5+: None (max retries)
Ephemeral Errors
Errors that can be retried are stored as "ephemeral" - shown to user but clearable:
async def run_inference(...):
try:
# ... inference logic
except RetryableError as e:
delay = get_retry_delay(retry_attempt)
if delay:
start_retry_countdown(delay)
ephemeral_error = e # Show to user, but can retry
except Exception as e:
ephemeral_error = e # Show to user
The Retry Flow
async def retry():
"""User clicks retry, or auto-retry fires."""
clear_retry_countdown()
if ephemeral_error:
retry_attempt += 1
ephemeral_error = None
abort_inference()
truncate_incomplete_message()
inference_state = "idle"
await run_inference()
Cancellation
Cancel Reasons
| Reason | Trigger |
|---|---|
user:cancelled |
User clicks cancel |
user:interrupted |
User sends new message |
system:edited |
User edited earlier message |
system:disposed |
Worker cleanup |
system:safety |
Dangerous tool on resume |
Cancellation Flow
async def cancel():
"""Cancel all operations."""
# 1. Cancel inference
if ops.inference:
ops.inference.abort()
ops.inference = None
inference_state = "cancelled"
# 2. Cancel all tools
await tool_orchestrator.cancel_all("user:cancelled")
# 3. Notify
await handle({"type": "cancelled"})
Dangerous Tools
Some tools shouldn't auto-resume after app restart:
DANGEROUS_TOOLS = ["Bash", "shell_command", "repl", "Task"] # + all MCP tools
def should_cancel_on_resume(tool_name):
return tool_name in DANGEROUS_TOOLS or tool_name.startswith("mcp__")
Message Queuing
Users can send messages while the agent is busy:
def handle_enqueue(delta):
"""User sent message while busy."""
interaction = compute_interaction_state(thread, inference_state)
# If we can process now, do it
if interaction != "tool-running":
if inference_state == "cancelled":
handle({"type": "user:message-queue:dequeue"})
return
if inference_state == "idle":
last = thread.messages[-1]
if last and last.role == "assistant":
if last.state.type in ["cancelled", "error"]:
handle({"type": "user:message-queue:dequeue"})
return
if last.state.stop_reason != "tool_use":
handle({"type": "user:message-queue:dequeue"})
return
# Otherwise, keep in queue - will dequeue on end_turn
Constants Reference
| Constant | Value | Purpose |
|---|---|---|
BASE_RETRY_SECONDS |
5 | Initial retry delay |
MAX_RETRY_SECONDS |
60 | Retry delay cap |
MAX_AUTO_RETRIES |
5 | Retries before giving up |
MAX_OUTPUT_TOKENS |
32,000 | Max LLM output |
SUBAGENT_MAX_TOKENS |
8,000 | Subagent output limit |
Implementation Checklist
Building your own agent loop? Ensure:
State Machine
- Worker state (initial → active)
- Inference state (idle ↔ running ↔ cancelled)
- Interaction state computation
Message Handling
- Mutex serialization
- Delta pattern for all mutations
- Delta type dispatch
Inference Cycle
- Model resolution from mode
- Stream processing
- Stop reason handling
Tool Execution
- Conflict-based batching
- Parallel within batch
- Completion tracking
Error Handling
- Retryable error detection
- Exponential backoff
- Ephemeral error display
Cancellation
- Inference abort
- Tool cancellation
- Dangerous tool handling
What's Next
The loop calls tools, but what are tools and how do they work?
→ 04-tool-system.md - Tool definitions, registration, execution
For reconstruction-grade detail with complete pseudocode and data structures:
→ 03-agent-loop.spec.md - Full implementation specification