Threads - Persistent Conversation State
The first-class primitive for conversation persistence. Threads store messages, relationships, and can be forked, shared, and resumed.
Evidence source: Amp Code v0.0.1769212917 (thread schema, persistence, forking, relationships)
Why Threads Matter
Unlike simple chat history, threads are:
- Persistent - Survive app restarts, sync across devices
- Forkable - Branch at any point for exploration
- Relatable - Connect via fork, handoff, or mention
- Observable - Real-time updates via RxJS streams
Thread ID Format
// Format: T-{uuid}
function generateThreadId(): string {
return `T-${crypto.randomUUID()}`;
}
// Example: T-5928a90d-d53b-488f-a829-4e36442142ee
function isValidThreadId(id: string): boolean {
if (!id.startsWith("T-")) return false;
const uuid = id.slice(2);
return /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(uuid);
}
Thread Schema
Core Structure
interface Thread {
// Required fields
v: number; // Version for optimistic concurrency
id: string; // Thread ID (T-{uuid})
created: number; // Creation timestamp (ms since epoch)
messages: Message[]; // Conversation messages
// Optional metadata
title?: string; // Generated or user-set
archived?: boolean; // Hidden from main list
draft?: string; // Unsent draft message
autoSubmitDraft?: boolean; // Draft should auto-submit
agentMode?: string; // Mode (smart/rush/free/etc.)
maxTokens?: number; // Per-thread output cap
nextMessageId?: number; // Monotonic message ID counter
activatedSkills?: SkillActivation[]; // Skills loaded via skill tool
// Relationships
relationships?: Relationship[]; // Fork, handoff, mention links
originThreadID?: string; // Parent thread for forks
forkPointIndex?: number; // Message index of fork point
mainThreadID?: string; // Main thread for subagent threads
// Environment
env?: {
initial?: {
trees?: TreeSpec[]; // Repository/workspace info
tags?: string[]; // Inference tags (e.g., "model:gpt-5")
};
};
// Queue (for messages while agent is running)
queuedMessages?: QueuedMessage[]; // Max 5 queued messages
}
Creating a Thread
function createThread(id: string): Thread {
return {
v: 0,
id,
created: Date.now(),
messages: []
};
}
Message Roles
| Role | Purpose |
|---|---|
user |
User input messages |
assistant |
AI responses with tool_use blocks |
info |
System information (summaries, course corrections) |
Thread Relationships
Threads can relate to each other in three ways:
Relationship Types
| Type | Purpose | Data Shared |
|---|---|---|
| fork | Branch at message | Full history up to fork point |
| handoff | Continue in new context | LLM-extracted summary |
| mention | Reference another thread | Thread link only |
Relationship Structure
interface Relationship {
threadID: string; // Related thread ID
type: "fork" | "handoff" | "mention"; // Relationship type
role: "parent" | "child"; // Direction
messageIndex?: number; // Where relationship was created
createdAt?: number; // Timestamp
comment?: string; // Optional description (for handoffs)
}
Bidirectional Relationships
Relationships are stored on both threads:
Thread A (parent) forks to Thread B (child) at message 5:
Thread A.relationships = [
{ threadID: "B", type: "fork", role: "parent", messageIndex: 5 }
]
Thread B.relationships = [
{ threadID: "A", type: "fork", role: "child", messageIndex: 5 }
]
Thread B.originThreadID = "A"
Thread B.forkPointIndex = 5
Thread Forking
Fork creates a new thread branching from a specific message.
Fork Flow
User requests fork at message N
│
▼
Generate new thread ID
│
▼
Copy messages 0..N to new thread
│
▼
Copy environment (deep clone)
│
▼
Generate fork title ("Forked: {original}")
│
▼
Create bidirectional relationships
│
▼
Persist both threads
Fork Implementation
async function forkThread(
parentId: string,
messageIndex: number,
threadService: ThreadService
): Promise<string> {
const parent = await threadService.get(parentId);
if (!parent) throw new Error(`Thread ${parentId} not found`);
// Create new thread
const childId = generateThreadId();
const child = createThread(childId);
// Copy data
child.messages = parent.messages.slice(0, messageIndex + 1).map(deepClone);
child.env = parent.env ? deepClone(parent.env) : undefined;
child.title = generateForkTitle(parent.title);
child.originThreadID = parentId;
child.forkPointIndex = messageIndex;
// Create child's relationship to parent
child.relationships = [{
threadID: parentId,
type: "fork",
role: "child",
messageIndex,
createdAt: Date.now()
}];
// Save child
await threadService.save(child);
// Update parent with relationship to child
parent.relationships = parent.relationships || [];
parent.relationships.push({
threadID: childId,
type: "fork",
role: "parent",
messageIndex,
createdAt: Date.now()
});
parent.v++;
await threadService.save(parent);
return childId;
}
function generateForkTitle(original: string | undefined): string {
const title = original || "Untitled";
const match = title.match(/^Forked(?:\((\d+)\))?: (.+)$/);
if (!match) {
return `Forked: ${title}`;
}
const count = match[1] ? parseInt(match[1], 10) : 1;
return `Forked(${count + 1}): ${match[2]}`;
}
Thread Persistence
Dirty Thread Tracking
Threads use a flush-based persistence model with batching:
class ThreadService {
private exclusiveThreads = new Map<string, BehaviorSubject<Thread>>();
private dirtyThreads = new Set<string>();
private flushDebounce = 100; // ms
async update(threadId: string, recipe: (thread: Thread) => void): Promise<Thread> {
const subject = this.exclusiveThreads.get(threadId);
if (!subject) throw new Error("Thread not loaded");
// Use Immer for immutable updates
const [newThread, patches] = produce(subject.getValue(), recipe);
subject.next(newThread);
// Mark dirty and trigger flush
this.dirtyThreads.add(threadId);
this.scheduleFlush();
return newThread;
}
private scheduleFlush = debounce(async () => {
const toFlush = Array.from(this.dirtyThreads).slice(0, 3); // Max 3 per batch
this.dirtyThreads = new Set(
Array.from(this.dirtyThreads).slice(3) // Queue remainder
);
await Promise.all(toFlush.map(id => this.flushThread(id)));
// Continue if more remain
if (this.dirtyThreads.size > 0) {
this.scheduleFlush();
}
}, this.flushDebounce);
private async flushThread(id: string): Promise<void> {
const thread = this.exclusiveThreads.get(id)?.getValue();
if (thread) {
await this.storage.set(id, thread);
}
}
}
Version-Based Consistency
async flushVersion(threadId: string, targetVersion: number, timeout = 5000): Promise<void> {
this.dirtyThreads.add(threadId);
await this.flushThread(threadId);
const start = Date.now();
while (true) {
if (Date.now() - start > timeout) {
throw new Error(`Timeout waiting for thread ${threadId} to reach version ${targetVersion}`);
}
const persisted = await this.storage.get(threadId);
if (persisted && persisted.v >= targetVersion) {
return;
}
await sleep(10); // Poll every 10ms
}
}
Thread Observation
Threads are observable via RxJS for real-time UI updates:
class ThreadService {
observe(threadId: string): Observable<Thread> {
return this.observeRaw(threadId).pipe(
map(([thread]) => thread),
throttleTime(100, { leading: true, trailing: true })
);
}
private observeRaw(threadId: string): Observable<[Thread, Patch[]]> {
return this.exclusiveThreads.observable.pipe(
map(threads => threads.get(threadId)),
distinctUntilChanged(),
switchMap(subject => {
if (subject) return subject;
// Load from storage if not in cache
return new Observable(subscriber => {
this.storage.get(threadId).then(thread => {
if (thread) subscriber.next([thread, []]);
});
});
})
);
}
}
Exclusive Write Access
Only one writer per thread at a time:
interface ExclusiveReadWriter {
read(): Thread;
write(thread: Thread): void;
update(recipe: (thread: Thread) => void): Thread;
asyncDispose(): Promise<void>;
}
async function exclusiveSyncReadWriter(threadId: string): Promise<ExclusiveReadWriter> {
if (this.exclusiveThreads.has(threadId)) {
throw new Error(`Thread ${threadId} already has an exclusive reader-writer`);
}
// Load or create thread...
const subject = new BehaviorSubject<Thread>(thread);
this.exclusiveThreads.set(threadId, subject);
let disposed = false;
return {
read: () => {
if (disposed) throw new Error("Disposed");
return subject.getValue();
},
update: (recipe) => {
if (disposed) throw new Error("Disposed");
const [newThread] = produce(subject.getValue(), recipe);
subject.next(newThread);
this.dirtyThreads.add(threadId);
this.scheduleFlush();
return newThread;
},
asyncDispose: async () => {
if (disposed) return;
disposed = true;
await this.flushThread(threadId);
this.exclusiveThreads.delete(threadId);
}
};
}
Message Queue
Messages can be queued while the agent is running:
const MAX_QUEUED_MESSAGES = 5;
interface QueuedMessage {
id: string; // "queued-{uuid}"
queuedMessage: Message; // The message waiting to be sent
}
function enqueueMessage(thread: Thread, message: Message): void {
if (!thread.queuedMessages) {
thread.queuedMessages = [];
}
if (thread.queuedMessages.length >= MAX_QUEUED_MESSAGES) {
return; // Queue full
}
thread.queuedMessages.push({
id: `queued-${crypto.randomUUID()}`,
queuedMessage: {
...message,
messageId: thread.nextMessageId++
}
});
}
Thread Deletion
Deletion cascades to subagent threads:
async function deleteThread(threadId: string): Promise<void> {
// Find subagent threads
const allIds = await this.storage.keys();
const subagentIds: string[] = [];
for (const id of allIds) {
const thread = await this.storage.get(id);
if (thread?.mainThreadID === threadId) {
subagentIds.push(id);
}
}
// Delete subagent threads
for (const id of subagentIds) {
await this.storage.delete(id);
}
// Delete main thread
await this.storage.delete(threadId);
// Clean up in-memory state
this.exclusiveThreads.delete(threadId);
this.dirtyThreads.delete(threadId);
// Delete from server (if synced)
await api.deleteThread({ thread: threadId });
}
Skill Activation Tracking
When skills are loaded, they're recorded on the thread:
interface SkillActivation {
name: string;
arguments?: Record<string, unknown>;
}
function onSkillToolComplete(thread: Thread, skillName: string, args?: unknown): void {
if (!thread.activatedSkills) {
thread.activatedSkills = [];
}
if (!thread.activatedSkills.some(s => s.name === skillName)) {
thread.activatedSkills.push({
name: skillName,
arguments: args
});
}
}
This enables skill-gated tools (tools that require a skill to be loaded first).
Thread History Summary
For list views, create lightweight summaries:
interface ThreadHistoryItem {
id: string;
v: number;
created: number;
title: string | null;
userLastInteractedAt: number;
env?: ThreadEnv;
originThreadID?: string;
mainThreadID?: string;
parentRelationships: Relationship[];
summaryStats: {
diffStats: DiffStats;
messageCount: number;
};
agentMode?: string;
archived?: boolean;
}
function createHistoryItem(thread: Thread): ThreadHistoryItem {
return {
id: thread.id,
v: thread.v,
created: thread.created,
title: thread.title ?? null,
userLastInteractedAt: getLastUserInteraction(thread),
env: thread.env,
originThreadID: thread.originThreadID,
mainThreadID: thread.mainThreadID,
parentRelationships: getParentRelationships(thread),
summaryStats: {
diffStats: getDiffStats(thread),
messageCount: thread.messages.length
},
agentMode: thread.agentMode,
archived: thread.archived
};
}
Implementation Checklist
Building thread support? Ensure:
Core Schema
- Thread ID generation (T-{uuid})
- Version tracking for concurrency
- Message storage (user, assistant, info)
Persistence
- Dirty thread tracking
- Debounced flush (100ms, max 3 per batch)
- Version-based consistency
- Exclusive write access
Relationships
- Fork support (bidirectional)
- Handoff support (with comment)
- Mention support (from read_thread)
- Relationship cleanup on truncate
Observation
- RxJS observable streams
- Throttled updates (100ms)
Operations
- Fork with title generation
- Archive/unarchive
- Delete with cascade
Constants
| Constant | Value | Purpose |
|---|---|---|
| Flush debounce | 100 ms | Batch writes |
| Max batch size | 3 | Threads per flush cycle |
| Version poll interval | 10 ms | Consistency check |
| Version timeout | 5,000 ms | Max wait for persistence |
| Max queued messages | 5 | Queue capacity |
Thread system based on Amp Code v0.0.1769212917 patterns