Streaming
Real-time output as the agent thinks and acts. Essential for user experience.
Evidence source: Amp Code v0.0.1769212917 (SSE parsing, delta handling, reconnection)
Why Streaming Matters
Without streaming:
User: "Add authentication"
[... 30 seconds of nothing ...]
Agent: "Done! Here's what I did..."
With streaming:
User: "Add authentication"
Agent: "I'll start by finding the auth files..."
Agent: [Reading src/auth.ts...]
Agent: "I see the current implementation. Let me add..."
Agent: [Editing src/auth.ts...]
...
Users see progress. They can interrupt. The experience feels responsive.
The Multi-Layer Pipeline
Amp's Insight: SSE parsing is a 4-layer pipeline, not a simple parser.
HTTP Response Body (bytes)
│
▼
┌─────────────────────────────────────┐
│ LineDecoder │
│ - Converts bytes to UTF-8 text │
│ - Handles \n, \r, \r\n endings │
│ - Buffers incomplete lines │
│ - Output: complete lines │
└─────────────────┬───────────────────┘
│
▼
┌─────────────────────────────────────┐
│ SSEDecoder │
│ - Parses SSE field syntax │
│ - Handles event:, data:, id: │
│ - Assembles multi-line data │
│ - Output: {event, data, id} │
└─────────────────┬───────────────────┘
│
▼
┌─────────────────────────────────────┐
│ MessageStream │
│ - Routes by event type │
│ - Handles deltas by delta type │
│ - Accumulates content blocks │
│ - Output: events + complete msg │
└─────────────────┬───────────────────┘
│
▼
┌─────────────────────────────────────┐
│ UI State Manager │
│ - Tracks streaming segments │
│ - Manages tool progress │
│ - Updates connection state │
│ - Output: React state updates │
└─────────────────────────────────────┘
Layer 1: Line Decoding
The first layer converts raw bytes into complete text lines. Must handle all line endings.
Binary Line Decoder
class LineDecoder:
"""Converts bytes to lines, handling \n, \r, \r\n."""
NEWLINE_CHARS = {'\n', '\r'}
def __init__(self):
self.buffer = bytearray()
self.pending_carriage = None
def decode(self, chunk: bytes) -> list[str]:
if not chunk:
return []
self.buffer.extend(chunk)
lines = []
while True:
line_info = self._find_newline()
if not line_info:
break
preceding, index, is_carriage = line_info
# Handle \r that might be followed by \n
if is_carriage and self.pending_carriage is None:
self.pending_carriage = index
continue
# Check if pending \r was standalone
if self.pending_carriage is not None:
if index != self.pending_carriage + 1 or is_carriage:
# \r was standalone line ending
line = self.buffer[:self.pending_carriage - 1].decode('utf-8')
lines.append(line)
self.buffer = self.buffer[self.pending_carriage:]
self.pending_carriage = None
continue
# Calculate line length (exclude \r if \r\n)
line_length = preceding - 1 if self.pending_carriage else preceding
line = self.buffer[:line_length].decode('utf-8')
lines.append(line)
self.buffer = self.buffer[index:]
self.pending_carriage = None
return lines
def flush(self) -> list[str]:
"""Force flush remaining buffer."""
if not self.buffer:
return []
return self.decode(b'\n') # Force line break
def _find_newline(self) -> tuple[int, int, bool] | None:
start = self.pending_carriage or 0
for i in range(start, len(self.buffer)):
if self.buffer[i] == 10: # \n
return (i, i + 1, False)
if self.buffer[i] == 13: # \r
return (i, i + 1, True)
return None
Layer 2: SSE Parsing
The second layer parses SSE protocol syntax into structured events.
SSE Field Syntax
event: message_start
data: {"type": "message_start", "message": {...}}
data: line1
data: line2
id: event-123
retry: 1000
: this is a comment
SSE Parser Implementation
@dataclass
class SSEEvent:
id: str | None = None
event: str | None = None
data: str = ""
class SSEParser:
def __init__(self, on_event: callable, on_retry: callable = None):
self.on_event = on_event
self.on_retry = on_retry
self.buffer = ""
self.is_first_chunk = True
# Current event state
self.event_id = None
self.event_type = ""
self.data = ""
def feed(self, chunk: str):
# Strip BOM from first chunk
text = chunk.lstrip('\ufeff') if self.is_first_chunk else chunk
self.is_first_chunk = False
self.buffer += text
# Process complete lines
while '\n' in self.buffer:
line, self.buffer = self.buffer.split('\n', 1)
self._process_line(line.rstrip('\r'))
def _process_line(self, line: str):
# Empty line dispatches the event
if line == "":
self._dispatch_event()
return
# Comment line (starts with :)
if line.startswith(':'):
return
# Field:value parsing
if ':' in line:
colon = line.index(':')
field = line[:colon]
# Skip optional space after colon
offset = 2 if line[colon + 1:colon + 2] == ' ' else 1
value = line[colon + offset:]
self._process_field(field, value)
else:
# Field with no value
self._process_field(line, "")
def _process_field(self, field: str, value: str):
if field == "event":
self.event_type = value
elif field == "data":
# Multiple data lines concatenate with newlines
self.data += value + "\n"
elif field == "id":
# ID cannot contain null character
if '\0' not in value:
self.event_id = value
elif field == "retry":
# Must be digits only
if value.isdigit() and self.on_retry:
self.on_retry(int(value))
def _dispatch_event(self):
if self.data:
self.on_event(SSEEvent(
id=self.event_id,
event=self.event_type or None,
data=self.data.rstrip('\n') # Remove trailing newline
))
# Reset for next event (event_id persists per spec)
self.event_id = None
self.data = ""
self.event_type = ""
Layer 3: Event Types and Delta Handling
SSE Event Types
| Event Type | Data Format | Description |
|---|---|---|
message_start |
MessageStartEvent |
New message begins |
message_delta |
MessageDeltaEvent |
Message metadata update |
message_stop |
MessageStopEvent |
Message complete |
content_block_start |
ContentBlockStartEvent |
New content block |
content_block_delta |
ContentBlockDeltaEvent |
Incremental content |
content_block_stop |
ContentBlockStopEvent |
Block complete |
ping |
Empty | Keep-alive (ignored) |
error |
Error object | API error |
Delta Types
| Delta Type | Block Type | Accumulation | Description |
|---|---|---|---|
text_delta |
text |
Append string | Incremental text |
input_json_delta |
tool_use |
Append + parse | Tool input JSON |
thinking_delta |
thinking |
Append string | Extended thinking |
signature_delta |
thinking |
Replace | Verification signature |
citations_delta |
text |
Push to array | Citation references |
Message Stream Implementation
@dataclass
class Message:
id: str
role: str
content: list
model: str
stop_reason: str | None
usage: dict
class MessageStream:
def __init__(self):
self.current_message = None
self.callbacks = {}
def on(self, event: str, callback: callable):
self.callbacks.setdefault(event, []).append(callback)
def emit(self, event: str, *args):
for cb in self.callbacks.get(event, []):
cb(*args)
def process_sse_event(self, event: SSEEvent):
# Ignore pings
if event.event == "ping":
return
# Handle errors
if event.event == "error":
self.emit("error", json.loads(event.data))
return
data = json.loads(event.data)
self._accumulate_event(data)
def _accumulate_event(self, event: dict):
event_type = event["type"]
if event_type == "message_start":
self.current_message = Message(**event["message"])
self.emit("messageStart", self.current_message)
elif event_type == "content_block_start":
self.current_message.content.append(event["content_block"])
elif event_type == "content_block_delta":
self._handle_delta(event)
elif event_type == "content_block_stop":
block = self.current_message.content[-1]
self.emit("contentBlock", block)
elif event_type == "message_delta":
self.current_message.stop_reason = event["delta"].get("stop_reason")
self.current_message.usage.update(event.get("usage", {}))
elif event_type == "message_stop":
self.emit("message", self.current_message)
self.emit("end")
def _handle_delta(self, event: dict):
block = self.current_message.content[event["index"]]
delta = event["delta"]
delta_type = delta["type"]
if delta_type == "text_delta":
if block["type"] == "text":
old_text = block.get("text", "")
block["text"] = old_text + delta["text"]
self.emit("text", delta["text"], old_text)
elif delta_type == "input_json_delta":
if block["type"] in ("tool_use", "server_tool_use", "mcp_tool_use"):
# Accumulate partial JSON
buffer = getattr(block, '_json_buffer', "")
buffer += delta["partial_json"]
block._json_buffer = buffer
# Try to parse
try:
block["input"] = parse_partial_json(buffer)
except:
pass
self.emit("inputJson", delta["partial_json"], block.get("input"))
elif delta_type == "thinking_delta":
if block["type"] == "thinking":
block["thinking"] = block.get("thinking", "") + delta["thinking"]
self.emit("thinking", delta["thinking"], block["thinking"])
elif delta_type == "signature_delta":
if block["type"] == "thinking":
block["signature"] = delta["signature"]
self.emit("signature", block["signature"])
elif delta_type == "citations_delta":
if block["type"] == "text":
block.setdefault("citations", []).append(delta["citation"])
self.emit("citation", delta["citation"])
Partial JSON Parser
Tool call arguments stream as incomplete JSON. A custom parser handles this.
Why Needed
- Tool input arrives incrementally (e.g.,
{"path": "/sr) - Standard
json.loadsthrows on incomplete input - UI needs to show arguments as they stream
- Parser must be resilient to mid-string, mid-number, etc.
4-Stage Pipeline
Input: '{"name": "test", "value": 42'
│
▼
┌─────────────────────────────────┐
│ Stage 1: Tokenize │
│ → Tokens with type/value │
│ → Handle incomplete strings │
└─────────────────┬───────────────┘
│
▼
┌─────────────────────────────────┐
│ Stage 2: Cleanup │
│ → Remove trailing separators │
│ → Remove incomplete values │
└─────────────────┬───────────────┘
│
▼
┌─────────────────────────────────┐
│ Stage 3: Balance Brackets │
│ → Add missing } and ] │
│ → Track open/close count │
└─────────────────┬───────────────┘
│
▼
┌─────────────────────────────────┐
│ Stage 4: Serialize & Parse │
│ → Convert tokens to string │
│ → JSON.parse the result │
└─────────────────────────────────┘
│
▼
Output: { name: "test" }
Implementation
@dataclass
class Token:
type: str # brace, paren, separator, delimiter, string, number, name
value: str
def parse_partial_json(json_str: str):
"""Parse potentially incomplete JSON."""
tokens = tokenize(json_str)
tokens = cleanup_tokens(tokens)
tokens = balance_brackets(tokens)
json_str = serialize_tokens(tokens)
return json.loads(json_str)
def tokenize(json_str: str) -> list[Token]:
"""Stage 1: Convert JSON string to tokens."""
tokens = []
pos = 0
while pos < len(json_str):
char = json_str[pos]
# Braces
if char == '{':
tokens.append(Token('brace', '{'))
pos += 1
elif char == '}':
tokens.append(Token('brace', '}'))
pos += 1
# Brackets
elif char == '[':
tokens.append(Token('paren', '['))
pos += 1
elif char == ']':
tokens.append(Token('paren', ']'))
pos += 1
# Separator and delimiter
elif char == ':':
tokens.append(Token('separator', ':'))
pos += 1
elif char == ',':
tokens.append(Token('delimiter', ','))
pos += 1
# String
elif char == '"':
value, pos, complete = parse_string(json_str, pos)
if complete:
tokens.append(Token('string', value))
# Incomplete strings are dropped
# Whitespace
elif char.isspace():
pos += 1
# Number
elif char.isdigit() or char in '-.' :
value, pos = parse_number(json_str, pos)
tokens.append(Token('number', value))
# Keywords (true, false, null)
elif char.isalpha():
value, pos = parse_keyword(json_str, pos)
if value in ('true', 'false', 'null'):
tokens.append(Token('name', value))
else:
pos += 1
return tokens
def cleanup_tokens(tokens: list[Token]) -> list[Token]:
"""Stage 2: Remove trailing incomplete tokens."""
if not tokens:
return tokens
last = tokens[-1]
# Remove trailing separators and delimiters
if last.type in ('separator', 'delimiter'):
return cleanup_tokens(tokens[:-1])
# Check for incomplete numbers
if last.type == 'number':
if last.value[-1] in '.eE-':
return cleanup_tokens(tokens[:-1])
# Remove orphaned values after comma or opening brace
if last.type in ('string', 'number', 'name'):
if len(tokens) >= 2:
prev = tokens[-2]
if prev.type == 'delimiter': # After comma
return cleanup_tokens(tokens[:-1])
if prev.type == 'brace' and prev.value == '{': # After {
return cleanup_tokens(tokens[:-1])
return tokens
def balance_brackets(tokens: list[Token]) -> list[Token]:
"""Stage 3: Close unclosed brackets."""
closers = []
for token in tokens:
if token.type == 'brace':
if token.value == '{':
closers.append('}')
else:
if '}' in closers:
closers.remove('}')
elif token.type == 'paren':
if token.value == '[':
closers.append(']')
else:
if ']' in closers:
closers.remove(']')
# Add missing closers
result = tokens.copy()
for closer in reversed(closers):
if closer == '}':
result.append(Token('brace', '}'))
else:
result.append(Token('paren', ']'))
return result
def serialize_tokens(tokens: list[Token]) -> str:
"""Stage 4: Convert tokens back to JSON string."""
result = ""
for token in tokens:
if token.type == 'string':
result += f'"{token.value}"'
else:
result += token.value
return result
Example Transformations
| Input (Partial) | After Cleanup | After Balance | Parsed Output |
|---|---|---|---|
{"a": "te |
{ |
{} |
{} |
{"a": "test" |
{"a": "test" |
{"a": "test"} |
{a: "test"} |
{"a": 123, |
{"a": 123 |
{"a": 123} |
{a: 123} |
[1, 2, |
[1, 2 |
[1, 2] |
[1, 2] |
Reconnection
Configuration
DEFAULT_RECONNECTION_OPTIONS = {
"initial_delay": 1000, # 1 second
"max_delay": 30000, # 30 seconds
"growth_factor": 1.5,
"max_retries": 2
}
When to Reconnect
Reconnect on:
- SSE stream ends unexpectedly (no
message_stop) - Network error / connection dropped
- Server closes connection without completion
Don't reconnect on:
- User abort (AbortController signaled)
- Max retries exceeded
- Normal completion (
message_stopreceived) - Server error event (API returned error)
Backoff Calculation
def get_next_delay(attempt: int, server_retry_ms: int | None, options: dict) -> int:
"""Calculate next reconnection delay with exponential backoff."""
# Server-specified retry takes precedence
if server_retry_ms is not None:
return server_retry_ms
# Exponential backoff: initial * factor^attempt
delay = options["initial_delay"] * (options["growth_factor"] ** attempt)
# Cap at maximum
return min(int(delay), options["max_delay"])
# Delay sequence: 1000ms, 1500ms, 2250ms, 3375ms, ... max 30000ms
Reconnection with Resumption
class SSETransport:
def __init__(self, options: dict = None):
self.options = {**DEFAULT_RECONNECTION_OPTIONS, **(options or {})}
self.last_event_id = None
self.server_retry_ms = None
self.abort_controller = None
async def connect(self, url: str, body: dict) -> AsyncIterator[SSEEvent]:
"""Connect with automatic reconnection."""
attempts = 0
received_complete = False
while attempts <= self.options["max_retries"]:
try:
headers = {"Accept": "text/event-stream"}
# Resume from last event ID
if self.last_event_id:
headers["Last-Event-ID"] = self.last_event_id
response = await fetch(url, method="POST", headers=headers, body=body)
async for event in self._read_stream(response):
# Track event ID for resumption
if event.id:
self.last_event_id = event.id
# Check for completion
data = json.loads(event.data) if event.data else {}
if data.get("type") == "message_stop":
received_complete = True
yield event
# Stream ended
if received_complete:
return # Success
# Incomplete - fall through to retry
raise StreamIncomplete()
except AbortError:
raise # Don't retry user aborts
except Exception as e:
attempts += 1
if attempts > self.options["max_retries"]:
raise
# Wait with backoff
delay = get_next_delay(attempts - 1, self.server_retry_ms, self.options)
await asyncio.sleep(delay / 1000)
Cancellation
Double-Escape Pattern
The CLI uses a confirmation pattern to prevent accidental cancellation:
class InputHandler:
def __init__(self):
self.confirming_cancel = False
self.cancel_timeout = None
def handle_escape(self) -> bool:
"""Handle escape key press."""
# Second escape within 1 second: actually cancel
if self.confirming_cancel:
self._do_cancel()
self.confirming_cancel = False
if self.cancel_timeout:
self.cancel_timeout.cancel()
return True
# First escape: show confirmation
if self.is_processing:
self.confirming_cancel = True
# Auto-clear after 1 second
self.cancel_timeout = asyncio.get_event_loop().call_later(
1.0,
self._clear_confirmation
)
return True
return False
def _clear_confirmation(self):
self.confirming_cancel = False
self.cancel_timeout = None
What Gets Cancelled
| Component | Method | Effect |
|---|---|---|
| LLM inference | abort_controller.abort() |
Stops generation |
| Bash commands | process.terminate() |
SIGTERM to process |
| SSE stream | response.close() |
Closes connection |
| Subagent tasks | task.cancel() |
Stops subagent |
UI State Management
Streaming State Structure
@dataclass
class StreamingUIState:
# Text streaming with animation timestamps
streaming_segments: list[dict] # [{text, birth_time}]
# Tool calls in progress
streaming_tool_calls: list[dict] # [{id, name, arguments}]
# Tool execution progress by ID
streaming_tool_progress: dict[str, str]
# Complete content blocks
streaming_content_blocks: list
# Connection state
connection_state: str # disconnected, connecting, connected, reconnecting
# Token usage
token_usage: dict | None = None
Progress Spinner
class BrailleSpinner:
FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
def __init__(self):
self.frame = 0
def advance(self):
self.frame = (self.frame + 1) % len(self.FRAMES)
def current(self) -> str:
return self.FRAMES[self.frame]
Complete Streaming Example
async def stream_agent_response(messages: list, tools: list) -> AsyncIterator[dict]:
"""Stream a complete agent response with all handling."""
# Create stream components
line_decoder = LineDecoder()
sse_parser = SSEParser(
on_event=lambda e: event_queue.put_nowait(e),
on_retry=lambda ms: setattr(transport, 'server_retry_ms', ms)
)
message_stream = MessageStream()
# Register event handlers
message_stream.on("text", lambda text, _: yield_text(text))
message_stream.on("inputJson", lambda json, _: yield_tool_input(json))
message_stream.on("contentBlock", lambda block: yield_block(block))
message_stream.on("end", lambda: yield_complete())
# Connect and stream
transport = SSETransport()
async for event in transport.connect(API_URL, {
"model": "claude-sonnet-4-20250514",
"messages": messages,
"tools": tools,
"stream": True
}):
message_stream.process_sse_event(event)
Implementation Checklist
Building streaming? Ensure:
Line Decoding
- Handle \n, \r, \r\n line endings
- Buffer incomplete lines
- Handle binary data correctly
SSE Parsing
- BOM stripping on first chunk
- Multi-line data concatenation
- Event ID tracking for resumption
- Retry directive handling
Delta Handling
- text_delta accumulation
- input_json_delta with partial parsing
- thinking_delta for extended thinking
- signature_delta for verification
Reconnection
- Exponential backoff
- Last-Event-ID resumption
- Max retry limit (2)
Cancellation
- AbortController integration
- Double-escape confirmation
- Cleanup on abort
What's Next
We're streaming output. But how do we know if the agent actually succeeded?
→ 08-verification.md - Course correction and meta-agent monitoring