Streaming

Real-time output as the agent thinks and acts. Essential for user experience.

Evidence source: Amp Code v0.0.1769212917 (SSE parsing, delta handling, reconnection)


Why Streaming Matters

Without streaming:

User: "Add authentication"
[... 30 seconds of nothing ...]
Agent: "Done! Here's what I did..."

With streaming:

User: "Add authentication"
Agent: "I'll start by finding the auth files..."
Agent: [Reading src/auth.ts...]
Agent: "I see the current implementation. Let me add..."
Agent: [Editing src/auth.ts...]
...

Users see progress. They can interrupt. The experience feels responsive.


The Multi-Layer Pipeline

Amp's Insight: SSE parsing is a 4-layer pipeline, not a simple parser.

HTTP Response Body (bytes)
          │
          ▼
┌─────────────────────────────────────┐
│         LineDecoder                  │
│  - Converts bytes to UTF-8 text     │
│  - Handles \n, \r, \r\n endings     │
│  - Buffers incomplete lines         │
│  - Output: complete lines           │
└─────────────────┬───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│         SSEDecoder                   │
│  - Parses SSE field syntax          │
│  - Handles event:, data:, id:       │
│  - Assembles multi-line data        │
│  - Output: {event, data, id}        │
└─────────────────┬───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│       MessageStream                  │
│  - Routes by event type             │
│  - Handles deltas by delta type     │
│  - Accumulates content blocks       │
│  - Output: events + complete msg    │
└─────────────────┬───────────────────┘
                  │
                  ▼
┌─────────────────────────────────────┐
│         UI State Manager            │
│  - Tracks streaming segments        │
│  - Manages tool progress            │
│  - Updates connection state         │
│  - Output: React state updates      │
└─────────────────────────────────────┘

Layer 1: Line Decoding

The first layer converts raw bytes into complete text lines. Must handle all line endings.

Binary Line Decoder

class LineDecoder:
    """Converts bytes to lines, handling \n, \r, \r\n."""

    NEWLINE_CHARS = {'\n', '\r'}

    def __init__(self):
        self.buffer = bytearray()
        self.pending_carriage = None

    def decode(self, chunk: bytes) -> list[str]:
        if not chunk:
            return []

        self.buffer.extend(chunk)
        lines = []

        while True:
            line_info = self._find_newline()
            if not line_info:
                break

            preceding, index, is_carriage = line_info

            # Handle \r that might be followed by \n
            if is_carriage and self.pending_carriage is None:
                self.pending_carriage = index
                continue

            # Check if pending \r was standalone
            if self.pending_carriage is not None:
                if index != self.pending_carriage + 1 or is_carriage:
                    # \r was standalone line ending
                    line = self.buffer[:self.pending_carriage - 1].decode('utf-8')
                    lines.append(line)
                    self.buffer = self.buffer[self.pending_carriage:]
                    self.pending_carriage = None
                    continue

            # Calculate line length (exclude \r if \r\n)
            line_length = preceding - 1 if self.pending_carriage else preceding
            line = self.buffer[:line_length].decode('utf-8')
            lines.append(line)
            self.buffer = self.buffer[index:]
            self.pending_carriage = None

        return lines

    def flush(self) -> list[str]:
        """Force flush remaining buffer."""
        if not self.buffer:
            return []
        return self.decode(b'\n')  # Force line break

    def _find_newline(self) -> tuple[int, int, bool] | None:
        start = self.pending_carriage or 0
        for i in range(start, len(self.buffer)):
            if self.buffer[i] == 10:  # \n
                return (i, i + 1, False)
            if self.buffer[i] == 13:  # \r
                return (i, i + 1, True)
        return None

Layer 2: SSE Parsing

The second layer parses SSE protocol syntax into structured events.

SSE Field Syntax

event: message_start
data: {"type": "message_start", "message": {...}}

data: line1
data: line2

id: event-123
retry: 1000
: this is a comment

SSE Parser Implementation

@dataclass
class SSEEvent:
    id: str | None = None
    event: str | None = None
    data: str = ""

class SSEParser:
    def __init__(self, on_event: callable, on_retry: callable = None):
        self.on_event = on_event
        self.on_retry = on_retry
        self.buffer = ""
        self.is_first_chunk = True

        # Current event state
        self.event_id = None
        self.event_type = ""
        self.data = ""

    def feed(self, chunk: str):
        # Strip BOM from first chunk
        text = chunk.lstrip('\ufeff') if self.is_first_chunk else chunk
        self.is_first_chunk = False

        self.buffer += text

        # Process complete lines
        while '\n' in self.buffer:
            line, self.buffer = self.buffer.split('\n', 1)
            self._process_line(line.rstrip('\r'))

    def _process_line(self, line: str):
        # Empty line dispatches the event
        if line == "":
            self._dispatch_event()
            return

        # Comment line (starts with :)
        if line.startswith(':'):
            return

        # Field:value parsing
        if ':' in line:
            colon = line.index(':')
            field = line[:colon]
            # Skip optional space after colon
            offset = 2 if line[colon + 1:colon + 2] == ' ' else 1
            value = line[colon + offset:]
            self._process_field(field, value)
        else:
            # Field with no value
            self._process_field(line, "")

    def _process_field(self, field: str, value: str):
        if field == "event":
            self.event_type = value
        elif field == "data":
            # Multiple data lines concatenate with newlines
            self.data += value + "\n"
        elif field == "id":
            # ID cannot contain null character
            if '\0' not in value:
                self.event_id = value
        elif field == "retry":
            # Must be digits only
            if value.isdigit() and self.on_retry:
                self.on_retry(int(value))

    def _dispatch_event(self):
        if self.data:
            self.on_event(SSEEvent(
                id=self.event_id,
                event=self.event_type or None,
                data=self.data.rstrip('\n')  # Remove trailing newline
            ))
        # Reset for next event (event_id persists per spec)
        self.event_id = None
        self.data = ""
        self.event_type = ""

Layer 3: Event Types and Delta Handling

SSE Event Types

Event Type Data Format Description
message_start MessageStartEvent New message begins
message_delta MessageDeltaEvent Message metadata update
message_stop MessageStopEvent Message complete
content_block_start ContentBlockStartEvent New content block
content_block_delta ContentBlockDeltaEvent Incremental content
content_block_stop ContentBlockStopEvent Block complete
ping Empty Keep-alive (ignored)
error Error object API error

Delta Types

Delta Type Block Type Accumulation Description
text_delta text Append string Incremental text
input_json_delta tool_use Append + parse Tool input JSON
thinking_delta thinking Append string Extended thinking
signature_delta thinking Replace Verification signature
citations_delta text Push to array Citation references

Message Stream Implementation

@dataclass
class Message:
    id: str
    role: str
    content: list
    model: str
    stop_reason: str | None
    usage: dict

class MessageStream:
    def __init__(self):
        self.current_message = None
        self.callbacks = {}

    def on(self, event: str, callback: callable):
        self.callbacks.setdefault(event, []).append(callback)

    def emit(self, event: str, *args):
        for cb in self.callbacks.get(event, []):
            cb(*args)

    def process_sse_event(self, event: SSEEvent):
        # Ignore pings
        if event.event == "ping":
            return

        # Handle errors
        if event.event == "error":
            self.emit("error", json.loads(event.data))
            return

        data = json.loads(event.data)
        self._accumulate_event(data)

    def _accumulate_event(self, event: dict):
        event_type = event["type"]

        if event_type == "message_start":
            self.current_message = Message(**event["message"])
            self.emit("messageStart", self.current_message)

        elif event_type == "content_block_start":
            self.current_message.content.append(event["content_block"])

        elif event_type == "content_block_delta":
            self._handle_delta(event)

        elif event_type == "content_block_stop":
            block = self.current_message.content[-1]
            self.emit("contentBlock", block)

        elif event_type == "message_delta":
            self.current_message.stop_reason = event["delta"].get("stop_reason")
            self.current_message.usage.update(event.get("usage", {}))

        elif event_type == "message_stop":
            self.emit("message", self.current_message)
            self.emit("end")

    def _handle_delta(self, event: dict):
        block = self.current_message.content[event["index"]]
        delta = event["delta"]
        delta_type = delta["type"]

        if delta_type == "text_delta":
            if block["type"] == "text":
                old_text = block.get("text", "")
                block["text"] = old_text + delta["text"]
                self.emit("text", delta["text"], old_text)

        elif delta_type == "input_json_delta":
            if block["type"] in ("tool_use", "server_tool_use", "mcp_tool_use"):
                # Accumulate partial JSON
                buffer = getattr(block, '_json_buffer', "")
                buffer += delta["partial_json"]
                block._json_buffer = buffer
                # Try to parse
                try:
                    block["input"] = parse_partial_json(buffer)
                except:
                    pass
                self.emit("inputJson", delta["partial_json"], block.get("input"))

        elif delta_type == "thinking_delta":
            if block["type"] == "thinking":
                block["thinking"] = block.get("thinking", "") + delta["thinking"]
                self.emit("thinking", delta["thinking"], block["thinking"])

        elif delta_type == "signature_delta":
            if block["type"] == "thinking":
                block["signature"] = delta["signature"]
                self.emit("signature", block["signature"])

        elif delta_type == "citations_delta":
            if block["type"] == "text":
                block.setdefault("citations", []).append(delta["citation"])
                self.emit("citation", delta["citation"])

Partial JSON Parser

Tool call arguments stream as incomplete JSON. A custom parser handles this.

Why Needed

  1. Tool input arrives incrementally (e.g., {"path": "/sr)
  2. Standard json.loads throws on incomplete input
  3. UI needs to show arguments as they stream
  4. Parser must be resilient to mid-string, mid-number, etc.

4-Stage Pipeline

Input: '{"name": "test", "value": 42'
           │
           ▼
┌─────────────────────────────────┐
│  Stage 1: Tokenize              │
│  → Tokens with type/value       │
│  → Handle incomplete strings    │
└─────────────────┬───────────────┘
                  │
                  ▼
┌─────────────────────────────────┐
│  Stage 2: Cleanup               │
│  → Remove trailing separators   │
│  → Remove incomplete values     │
└─────────────────┬───────────────┘
                  │
                  ▼
┌─────────────────────────────────┐
│  Stage 3: Balance Brackets      │
│  → Add missing } and ]          │
│  → Track open/close count       │
└─────────────────┬───────────────┘
                  │
                  ▼
┌─────────────────────────────────┐
│  Stage 4: Serialize & Parse     │
│  → Convert tokens to string     │
│  → JSON.parse the result        │
└─────────────────────────────────┘
                  │
                  ▼
Output: { name: "test" }

Implementation

@dataclass
class Token:
    type: str  # brace, paren, separator, delimiter, string, number, name
    value: str

def parse_partial_json(json_str: str):
    """Parse potentially incomplete JSON."""
    tokens = tokenize(json_str)
    tokens = cleanup_tokens(tokens)
    tokens = balance_brackets(tokens)
    json_str = serialize_tokens(tokens)
    return json.loads(json_str)

def tokenize(json_str: str) -> list[Token]:
    """Stage 1: Convert JSON string to tokens."""
    tokens = []
    pos = 0

    while pos < len(json_str):
        char = json_str[pos]

        # Braces
        if char == '{':
            tokens.append(Token('brace', '{'))
            pos += 1
        elif char == '}':
            tokens.append(Token('brace', '}'))
            pos += 1

        # Brackets
        elif char == '[':
            tokens.append(Token('paren', '['))
            pos += 1
        elif char == ']':
            tokens.append(Token('paren', ']'))
            pos += 1

        # Separator and delimiter
        elif char == ':':
            tokens.append(Token('separator', ':'))
            pos += 1
        elif char == ',':
            tokens.append(Token('delimiter', ','))
            pos += 1

        # String
        elif char == '"':
            value, pos, complete = parse_string(json_str, pos)
            if complete:
                tokens.append(Token('string', value))
            # Incomplete strings are dropped

        # Whitespace
        elif char.isspace():
            pos += 1

        # Number
        elif char.isdigit() or char in '-.' :
            value, pos = parse_number(json_str, pos)
            tokens.append(Token('number', value))

        # Keywords (true, false, null)
        elif char.isalpha():
            value, pos = parse_keyword(json_str, pos)
            if value in ('true', 'false', 'null'):
                tokens.append(Token('name', value))

        else:
            pos += 1

    return tokens

def cleanup_tokens(tokens: list[Token]) -> list[Token]:
    """Stage 2: Remove trailing incomplete tokens."""
    if not tokens:
        return tokens

    last = tokens[-1]

    # Remove trailing separators and delimiters
    if last.type in ('separator', 'delimiter'):
        return cleanup_tokens(tokens[:-1])

    # Check for incomplete numbers
    if last.type == 'number':
        if last.value[-1] in '.eE-':
            return cleanup_tokens(tokens[:-1])

    # Remove orphaned values after comma or opening brace
    if last.type in ('string', 'number', 'name'):
        if len(tokens) >= 2:
            prev = tokens[-2]
            if prev.type == 'delimiter':  # After comma
                return cleanup_tokens(tokens[:-1])
            if prev.type == 'brace' and prev.value == '{':  # After {
                return cleanup_tokens(tokens[:-1])

    return tokens

def balance_brackets(tokens: list[Token]) -> list[Token]:
    """Stage 3: Close unclosed brackets."""
    closers = []

    for token in tokens:
        if token.type == 'brace':
            if token.value == '{':
                closers.append('}')
            else:
                if '}' in closers:
                    closers.remove('}')
        elif token.type == 'paren':
            if token.value == '[':
                closers.append(']')
            else:
                if ']' in closers:
                    closers.remove(']')

    # Add missing closers
    result = tokens.copy()
    for closer in reversed(closers):
        if closer == '}':
            result.append(Token('brace', '}'))
        else:
            result.append(Token('paren', ']'))

    return result

def serialize_tokens(tokens: list[Token]) -> str:
    """Stage 4: Convert tokens back to JSON string."""
    result = ""
    for token in tokens:
        if token.type == 'string':
            result += f'"{token.value}"'
        else:
            result += token.value
    return result

Example Transformations

Input (Partial) After Cleanup After Balance Parsed Output
{"a": "te { {} {}
{"a": "test" {"a": "test" {"a": "test"} {a: "test"}
{"a": 123, {"a": 123 {"a": 123} {a: 123}
[1, 2, [1, 2 [1, 2] [1, 2]

Reconnection

Configuration

DEFAULT_RECONNECTION_OPTIONS = {
    "initial_delay": 1000,      # 1 second
    "max_delay": 30000,         # 30 seconds
    "growth_factor": 1.5,
    "max_retries": 2
}

When to Reconnect

Reconnect on:

  • SSE stream ends unexpectedly (no message_stop)
  • Network error / connection dropped
  • Server closes connection without completion

Don't reconnect on:

  • User abort (AbortController signaled)
  • Max retries exceeded
  • Normal completion (message_stop received)
  • Server error event (API returned error)

Backoff Calculation

def get_next_delay(attempt: int, server_retry_ms: int | None, options: dict) -> int:
    """Calculate next reconnection delay with exponential backoff."""
    # Server-specified retry takes precedence
    if server_retry_ms is not None:
        return server_retry_ms

    # Exponential backoff: initial * factor^attempt
    delay = options["initial_delay"] * (options["growth_factor"] ** attempt)

    # Cap at maximum
    return min(int(delay), options["max_delay"])

# Delay sequence: 1000ms, 1500ms, 2250ms, 3375ms, ... max 30000ms

Reconnection with Resumption

class SSETransport:
    def __init__(self, options: dict = None):
        self.options = {**DEFAULT_RECONNECTION_OPTIONS, **(options or {})}
        self.last_event_id = None
        self.server_retry_ms = None
        self.abort_controller = None

    async def connect(self, url: str, body: dict) -> AsyncIterator[SSEEvent]:
        """Connect with automatic reconnection."""
        attempts = 0
        received_complete = False

        while attempts <= self.options["max_retries"]:
            try:
                headers = {"Accept": "text/event-stream"}

                # Resume from last event ID
                if self.last_event_id:
                    headers["Last-Event-ID"] = self.last_event_id

                response = await fetch(url, method="POST", headers=headers, body=body)

                async for event in self._read_stream(response):
                    # Track event ID for resumption
                    if event.id:
                        self.last_event_id = event.id

                    # Check for completion
                    data = json.loads(event.data) if event.data else {}
                    if data.get("type") == "message_stop":
                        received_complete = True

                    yield event

                # Stream ended
                if received_complete:
                    return  # Success

                # Incomplete - fall through to retry
                raise StreamIncomplete()

            except AbortError:
                raise  # Don't retry user aborts

            except Exception as e:
                attempts += 1
                if attempts > self.options["max_retries"]:
                    raise

                # Wait with backoff
                delay = get_next_delay(attempts - 1, self.server_retry_ms, self.options)
                await asyncio.sleep(delay / 1000)

Cancellation

Double-Escape Pattern

The CLI uses a confirmation pattern to prevent accidental cancellation:

class InputHandler:
    def __init__(self):
        self.confirming_cancel = False
        self.cancel_timeout = None

    def handle_escape(self) -> bool:
        """Handle escape key press."""
        # Second escape within 1 second: actually cancel
        if self.confirming_cancel:
            self._do_cancel()
            self.confirming_cancel = False
            if self.cancel_timeout:
                self.cancel_timeout.cancel()
            return True

        # First escape: show confirmation
        if self.is_processing:
            self.confirming_cancel = True
            # Auto-clear after 1 second
            self.cancel_timeout = asyncio.get_event_loop().call_later(
                1.0,
                self._clear_confirmation
            )
            return True

        return False

    def _clear_confirmation(self):
        self.confirming_cancel = False
        self.cancel_timeout = None

What Gets Cancelled

Component Method Effect
LLM inference abort_controller.abort() Stops generation
Bash commands process.terminate() SIGTERM to process
SSE stream response.close() Closes connection
Subagent tasks task.cancel() Stops subagent

UI State Management

Streaming State Structure

@dataclass
class StreamingUIState:
    # Text streaming with animation timestamps
    streaming_segments: list[dict]  # [{text, birth_time}]

    # Tool calls in progress
    streaming_tool_calls: list[dict]  # [{id, name, arguments}]

    # Tool execution progress by ID
    streaming_tool_progress: dict[str, str]

    # Complete content blocks
    streaming_content_blocks: list

    # Connection state
    connection_state: str  # disconnected, connecting, connected, reconnecting

    # Token usage
    token_usage: dict | None = None

Progress Spinner

class BrailleSpinner:
    FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']

    def __init__(self):
        self.frame = 0

    def advance(self):
        self.frame = (self.frame + 1) % len(self.FRAMES)

    def current(self) -> str:
        return self.FRAMES[self.frame]

Complete Streaming Example

async def stream_agent_response(messages: list, tools: list) -> AsyncIterator[dict]:
    """Stream a complete agent response with all handling."""

    # Create stream components
    line_decoder = LineDecoder()
    sse_parser = SSEParser(
        on_event=lambda e: event_queue.put_nowait(e),
        on_retry=lambda ms: setattr(transport, 'server_retry_ms', ms)
    )
    message_stream = MessageStream()

    # Register event handlers
    message_stream.on("text", lambda text, _: yield_text(text))
    message_stream.on("inputJson", lambda json, _: yield_tool_input(json))
    message_stream.on("contentBlock", lambda block: yield_block(block))
    message_stream.on("end", lambda: yield_complete())

    # Connect and stream
    transport = SSETransport()

    async for event in transport.connect(API_URL, {
        "model": "claude-sonnet-4-20250514",
        "messages": messages,
        "tools": tools,
        "stream": True
    }):
        message_stream.process_sse_event(event)

Implementation Checklist

Building streaming? Ensure:

  • Line Decoding

    • Handle \n, \r, \r\n line endings
    • Buffer incomplete lines
    • Handle binary data correctly
  • SSE Parsing

    • BOM stripping on first chunk
    • Multi-line data concatenation
    • Event ID tracking for resumption
    • Retry directive handling
  • Delta Handling

    • text_delta accumulation
    • input_json_delta with partial parsing
    • thinking_delta for extended thinking
    • signature_delta for verification
  • Reconnection

    • Exponential backoff
    • Last-Event-ID resumption
    • Max retry limit (2)
  • Cancellation

    • AbortController integration
    • Double-escape confirmation
    • Cleanup on abort

What's Next

We're streaming output. But how do we know if the agent actually succeeded?

08-verification.md - Course correction and meta-agent monitoring