Threads - Persistent Conversation State

The first-class primitive for conversation persistence. Threads store messages, relationships, and can be forked, shared, and resumed.

Evidence source: Amp Code v0.0.1769212917 (thread schema, persistence, forking, relationships)


Why Threads Matter

Unlike simple chat history, threads are:

  • Persistent - Survive app restarts, sync across devices
  • Forkable - Branch at any point for exploration
  • Relatable - Connect via fork, handoff, or mention
  • Observable - Real-time updates via RxJS streams

Thread ID Format

// Format: T-{uuid}
function generateThreadId(): string {
  return `T-${crypto.randomUUID()}`;
}

// Example: T-5928a90d-d53b-488f-a829-4e36442142ee

function isValidThreadId(id: string): boolean {
  if (!id.startsWith("T-")) return false;
  const uuid = id.slice(2);
  return /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(uuid);
}

Thread Schema

Core Structure

interface Thread {
  // Required fields
  v: number;              // Version for optimistic concurrency
  id: string;             // Thread ID (T-{uuid})
  created: number;        // Creation timestamp (ms since epoch)
  messages: Message[];    // Conversation messages

  // Optional metadata
  title?: string;                     // Generated or user-set
  archived?: boolean;                 // Hidden from main list
  draft?: string;                     // Unsent draft message
  autoSubmitDraft?: boolean;          // Draft should auto-submit
  agentMode?: string;                 // Mode (smart/rush/free/etc.)
  maxTokens?: number;                 // Per-thread output cap
  nextMessageId?: number;             // Monotonic message ID counter
  activatedSkills?: SkillActivation[]; // Skills loaded via skill tool

  // Relationships
  relationships?: Relationship[];     // Fork, handoff, mention links
  originThreadID?: string;            // Parent thread for forks
  forkPointIndex?: number;            // Message index of fork point
  mainThreadID?: string;              // Main thread for subagent threads

  // Environment
  env?: {
    initial?: {
      trees?: TreeSpec[];             // Repository/workspace info
      tags?: string[];                // Inference tags (e.g., "model:gpt-5")
    };
  };

  // Queue (for messages while agent is running)
  queuedMessages?: QueuedMessage[];   // Max 5 queued messages
}

Creating a Thread

function createThread(id: string): Thread {
  return {
    v: 0,
    id,
    created: Date.now(),
    messages: []
  };
}

Message Roles

Role Purpose
user User input messages
assistant AI responses with tool_use blocks
info System information (summaries, course corrections)

Thread Relationships

Threads can relate to each other in three ways:

Relationship Types

Type Purpose Data Shared
fork Branch at message Full history up to fork point
handoff Continue in new context LLM-extracted summary
mention Reference another thread Thread link only

Relationship Structure

interface Relationship {
  threadID: string;                        // Related thread ID
  type: "fork" | "handoff" | "mention";   // Relationship type
  role: "parent" | "child";               // Direction
  messageIndex?: number;                  // Where relationship was created
  createdAt?: number;                     // Timestamp
  comment?: string;                       // Optional description (for handoffs)
}

Bidirectional Relationships

Relationships are stored on both threads:

Thread A (parent) forks to Thread B (child) at message 5:

Thread A.relationships = [
  { threadID: "B", type: "fork", role: "parent", messageIndex: 5 }
]

Thread B.relationships = [
  { threadID: "A", type: "fork", role: "child", messageIndex: 5 }
]

Thread B.originThreadID = "A"
Thread B.forkPointIndex = 5

Thread Forking

Fork creates a new thread branching from a specific message.

Fork Flow

User requests fork at message N
        │
        ▼
Generate new thread ID
        │
        ▼
Copy messages 0..N to new thread
        │
        ▼
Copy environment (deep clone)
        │
        ▼
Generate fork title ("Forked: {original}")
        │
        ▼
Create bidirectional relationships
        │
        ▼
Persist both threads

Fork Implementation

async function forkThread(
  parentId: string,
  messageIndex: number,
  threadService: ThreadService
): Promise<string> {
  const parent = await threadService.get(parentId);
  if (!parent) throw new Error(`Thread ${parentId} not found`);

  // Create new thread
  const childId = generateThreadId();
  const child = createThread(childId);

  // Copy data
  child.messages = parent.messages.slice(0, messageIndex + 1).map(deepClone);
  child.env = parent.env ? deepClone(parent.env) : undefined;
  child.title = generateForkTitle(parent.title);
  child.originThreadID = parentId;
  child.forkPointIndex = messageIndex;

  // Create child's relationship to parent
  child.relationships = [{
    threadID: parentId,
    type: "fork",
    role: "child",
    messageIndex,
    createdAt: Date.now()
  }];

  // Save child
  await threadService.save(child);

  // Update parent with relationship to child
  parent.relationships = parent.relationships || [];
  parent.relationships.push({
    threadID: childId,
    type: "fork",
    role: "parent",
    messageIndex,
    createdAt: Date.now()
  });
  parent.v++;
  await threadService.save(parent);

  return childId;
}

function generateForkTitle(original: string | undefined): string {
  const title = original || "Untitled";
  const match = title.match(/^Forked(?:\((\d+)\))?: (.+)$/);

  if (!match) {
    return `Forked: ${title}`;
  }

  const count = match[1] ? parseInt(match[1], 10) : 1;
  return `Forked(${count + 1}): ${match[2]}`;
}

Thread Persistence

Dirty Thread Tracking

Threads use a flush-based persistence model with batching:

class ThreadService {
  private exclusiveThreads = new Map<string, BehaviorSubject<Thread>>();
  private dirtyThreads = new Set<string>();
  private flushDebounce = 100; // ms

  async update(threadId: string, recipe: (thread: Thread) => void): Promise<Thread> {
    const subject = this.exclusiveThreads.get(threadId);
    if (!subject) throw new Error("Thread not loaded");

    // Use Immer for immutable updates
    const [newThread, patches] = produce(subject.getValue(), recipe);
    subject.next(newThread);

    // Mark dirty and trigger flush
    this.dirtyThreads.add(threadId);
    this.scheduleFlush();

    return newThread;
  }

  private scheduleFlush = debounce(async () => {
    const toFlush = Array.from(this.dirtyThreads).slice(0, 3); // Max 3 per batch
    this.dirtyThreads = new Set(
      Array.from(this.dirtyThreads).slice(3) // Queue remainder
    );

    await Promise.all(toFlush.map(id => this.flushThread(id)));

    // Continue if more remain
    if (this.dirtyThreads.size > 0) {
      this.scheduleFlush();
    }
  }, this.flushDebounce);

  private async flushThread(id: string): Promise<void> {
    const thread = this.exclusiveThreads.get(id)?.getValue();
    if (thread) {
      await this.storage.set(id, thread);
    }
  }
}

Version-Based Consistency

async flushVersion(threadId: string, targetVersion: number, timeout = 5000): Promise<void> {
  this.dirtyThreads.add(threadId);
  await this.flushThread(threadId);

  const start = Date.now();
  while (true) {
    if (Date.now() - start > timeout) {
      throw new Error(`Timeout waiting for thread ${threadId} to reach version ${targetVersion}`);
    }

    const persisted = await this.storage.get(threadId);
    if (persisted && persisted.v >= targetVersion) {
      return;
    }

    await sleep(10); // Poll every 10ms
  }
}

Thread Observation

Threads are observable via RxJS for real-time UI updates:

class ThreadService {
  observe(threadId: string): Observable<Thread> {
    return this.observeRaw(threadId).pipe(
      map(([thread]) => thread),
      throttleTime(100, { leading: true, trailing: true })
    );
  }

  private observeRaw(threadId: string): Observable<[Thread, Patch[]]> {
    return this.exclusiveThreads.observable.pipe(
      map(threads => threads.get(threadId)),
      distinctUntilChanged(),
      switchMap(subject => {
        if (subject) return subject;
        // Load from storage if not in cache
        return new Observable(subscriber => {
          this.storage.get(threadId).then(thread => {
            if (thread) subscriber.next([thread, []]);
          });
        });
      })
    );
  }
}

Exclusive Write Access

Only one writer per thread at a time:

interface ExclusiveReadWriter {
  read(): Thread;
  write(thread: Thread): void;
  update(recipe: (thread: Thread) => void): Thread;
  asyncDispose(): Promise<void>;
}

async function exclusiveSyncReadWriter(threadId: string): Promise<ExclusiveReadWriter> {
  if (this.exclusiveThreads.has(threadId)) {
    throw new Error(`Thread ${threadId} already has an exclusive reader-writer`);
  }

  // Load or create thread...
  const subject = new BehaviorSubject<Thread>(thread);
  this.exclusiveThreads.set(threadId, subject);

  let disposed = false;

  return {
    read: () => {
      if (disposed) throw new Error("Disposed");
      return subject.getValue();
    },

    update: (recipe) => {
      if (disposed) throw new Error("Disposed");
      const [newThread] = produce(subject.getValue(), recipe);
      subject.next(newThread);
      this.dirtyThreads.add(threadId);
      this.scheduleFlush();
      return newThread;
    },

    asyncDispose: async () => {
      if (disposed) return;
      disposed = true;
      await this.flushThread(threadId);
      this.exclusiveThreads.delete(threadId);
    }
  };
}

Message Queue

Messages can be queued while the agent is running:

const MAX_QUEUED_MESSAGES = 5;

interface QueuedMessage {
  id: string;             // "queued-{uuid}"
  queuedMessage: Message; // The message waiting to be sent
}

function enqueueMessage(thread: Thread, message: Message): void {
  if (!thread.queuedMessages) {
    thread.queuedMessages = [];
  }

  if (thread.queuedMessages.length >= MAX_QUEUED_MESSAGES) {
    return; // Queue full
  }

  thread.queuedMessages.push({
    id: `queued-${crypto.randomUUID()}`,
    queuedMessage: {
      ...message,
      messageId: thread.nextMessageId++
    }
  });
}

Thread Deletion

Deletion cascades to subagent threads:

async function deleteThread(threadId: string): Promise<void> {
  // Find subagent threads
  const allIds = await this.storage.keys();
  const subagentIds: string[] = [];

  for (const id of allIds) {
    const thread = await this.storage.get(id);
    if (thread?.mainThreadID === threadId) {
      subagentIds.push(id);
    }
  }

  // Delete subagent threads
  for (const id of subagentIds) {
    await this.storage.delete(id);
  }

  // Delete main thread
  await this.storage.delete(threadId);

  // Clean up in-memory state
  this.exclusiveThreads.delete(threadId);
  this.dirtyThreads.delete(threadId);

  // Delete from server (if synced)
  await api.deleteThread({ thread: threadId });
}

Skill Activation Tracking

When skills are loaded, they're recorded on the thread:

interface SkillActivation {
  name: string;
  arguments?: Record<string, unknown>;
}

function onSkillToolComplete(thread: Thread, skillName: string, args?: unknown): void {
  if (!thread.activatedSkills) {
    thread.activatedSkills = [];
  }

  if (!thread.activatedSkills.some(s => s.name === skillName)) {
    thread.activatedSkills.push({
      name: skillName,
      arguments: args
    });
  }
}

This enables skill-gated tools (tools that require a skill to be loaded first).


Thread History Summary

For list views, create lightweight summaries:

interface ThreadHistoryItem {
  id: string;
  v: number;
  created: number;
  title: string | null;
  userLastInteractedAt: number;
  env?: ThreadEnv;
  originThreadID?: string;
  mainThreadID?: string;
  parentRelationships: Relationship[];
  summaryStats: {
    diffStats: DiffStats;
    messageCount: number;
  };
  agentMode?: string;
  archived?: boolean;
}

function createHistoryItem(thread: Thread): ThreadHistoryItem {
  return {
    id: thread.id,
    v: thread.v,
    created: thread.created,
    title: thread.title ?? null,
    userLastInteractedAt: getLastUserInteraction(thread),
    env: thread.env,
    originThreadID: thread.originThreadID,
    mainThreadID: thread.mainThreadID,
    parentRelationships: getParentRelationships(thread),
    summaryStats: {
      diffStats: getDiffStats(thread),
      messageCount: thread.messages.length
    },
    agentMode: thread.agentMode,
    archived: thread.archived
  };
}

Implementation Checklist

Building thread support? Ensure:

  • Core Schema

    • Thread ID generation (T-{uuid})
    • Version tracking for concurrency
    • Message storage (user, assistant, info)
  • Persistence

    • Dirty thread tracking
    • Debounced flush (100ms, max 3 per batch)
    • Version-based consistency
    • Exclusive write access
  • Relationships

    • Fork support (bidirectional)
    • Handoff support (with comment)
    • Mention support (from read_thread)
    • Relationship cleanup on truncate
  • Observation

    • RxJS observable streams
    • Throttled updates (100ms)
  • Operations

    • Fork with title generation
    • Archive/unarchive
    • Delete with cascade

Constants

Constant Value Purpose
Flush debounce 100 ms Batch writes
Max batch size 3 Threads per flush cycle
Version poll interval 10 ms Consistency check
Version timeout 5,000 ms Max wait for persistence
Max queued messages 5 Queue capacity

Thread system based on Amp Code v0.0.1769212917 patterns