Context Window Management
The finite resource that limits everything. How to manage what the LLM can see.
Evidence source: Amp Code v0.0.1769212917 (compaction system, handoff, token counting API)
The Problem
LLMs have finite context windows. Codebases are infinite (relatively).
Claude Sonnet 4.5: 1,000,000 tokens
Claude Opus 4.5: 200,000 tokens
Average codebase: 10,000,000+ tokens
A single file: 500-5,000 tokens
Tool result: 100-10,000 tokens
After 50 tool calls: Context approaching limit
Every turn adds to the context:
- User message
- Assistant response
- Tool calls
- Tool results
- File contents
- Command outputs
Without management, context fills up. Then the agent fails.
Amp's Design Principle: Never silently drop messages. Use LLM-generated summaries to preserve essential context.
Model Context Limits
Context Windows by Model
| Model | Context Window | Max Output | Effective Input |
|---|---|---|---|
| Claude Sonnet 4.5 | 1,000,000 | 32,000 | 968,000 |
| Claude Opus 4.5 | 200,000 | 32,000 | 168,000 |
| Claude Haiku 4.5 | 200,000 | 64,000 | 136,000 |
| Gemini 2.5 Flash | 1,048,576 | 65,535 | 983,041 |
| GPT-5.2 | 400,000 | 128,000 | 272,000 |
Effective Input Calculation:
def get_effective_input_limit(model_config):
return model_config.context_window - model_config.max_output_tokens
Token Counting
1. API Token Counting (Most Accurate)
async def count_tokens_api(client, model, request):
"""Use Anthropic's token counting API."""
try:
result = await client.messages.count_tokens(
model=model,
messages=request.messages or [{"role": "user", "content": "x"}],
tools=request.tools if request.tools else None,
system=request.system if request.system else None,
thinking={
"type": "enabled",
"budget_tokens": 10000
}
)
return result.input_tokens
except Exception as e:
# Fallback to estimation
return estimate_tokens_fallback(request)
2. Character-Based Estimation (Fallback)
CHARS_PER_TOKEN = 4 # ~4 characters per token for English/code
def estimate_tokens(text: str) -> int:
"""Rough estimate when API unavailable."""
return len(text) // CHARS_PER_TOKEN
3. Byte-Based Estimation (For Files)
BYTES_PER_TOKEN = 3.5 # Amp uses 3.5 bytes per token
MAX_FILE_SIZE_FOR_ESTIMATION = 32768 # 32KB cap for token budgeting
def estimate_file_tokens(file_size_bytes: int) -> int:
"""Estimate tokens for file content."""
capped_size = min(file_size_bytes, MAX_FILE_SIZE_FOR_ESTIMATION)
return math.ceil(capped_size / BYTES_PER_TOKEN)
Note: This 32KB cap is intentionally smaller than the Read tool's 64KB limit (see 05-core-tools.md). Token estimation uses a conservative cap to prevent single files from consuming excessive budget during handoff file selection.
Per-Message Usage Tracking
Track token usage for each message:
@dataclass
class MessageUsage:
model: str
input_tokens: int
output_tokens: int
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0
@property
def total_input_tokens(self) -> int:
return (self.input_tokens +
self.cache_creation_input_tokens +
self.cache_read_input_tokens)
Context Size Tiers
Strategy by Context Size
| Tier | Size Range | Models | Strategy |
|---|---|---|---|
| Extended | 1M+ tokens | Claude Sonnet, Gemini, Kimi K2 | Can handle most sessions |
| Large | 400K tokens | GPT-5 family | Compaction rarely needed |
| Medium | 200K-262K | Claude Opus/Haiku, Grok | Compaction common |
| Standard | 128K-230K | GPT OSS, Fireworks | Aggressive compaction |
Threshold Monitoring
Proactive Check Before API Call
def check_context_overflow(thread, model_config):
"""Check if context exceeds model limits."""
last_usage = get_last_token_usage(thread)
max_input = model_config.context_window - model_config.max_output_tokens
if last_usage and last_usage.total_input_tokens >= max_input:
logger.info(
"Thread input tokens exceed model context limit, falling back",
thread_id=thread.id,
model=model_config.name,
total_input_tokens=last_usage.total_input_tokens,
max_input_tokens=max_input,
fallback_model="gemini-2.5-flash"
)
return "gemini-2.5-flash" # Fallback model
return None # No fallback needed
Finding Last Usage
def get_last_token_usage(thread):
"""Get most recent token usage, skipping summaries."""
for message in reversed(thread.messages):
# Skip summary messages
if message.role == "info":
content = message.content[0]
if content.type == "summary":
return None
# Return last non-zero assistant usage
if message.role == "assistant" and message.usage:
if message.usage.total_input_tokens > 0:
return message.usage
return None
Compaction Algorithm
When Compaction Triggers
Compaction triggers at 100,000 tokens (configurable):
COMPACTION_THRESHOLD = 100_000 # tokens
async def check_and_do_compaction(params):
if not params.compaction_control or not params.compaction_control.enabled:
return False
# Get total tokens from last message
last_usage = await get_last_message_usage()
if not last_usage:
return False
total_tokens = (
last_usage.input_tokens +
last_usage.cache_creation_input_tokens +
last_usage.cache_read_input_tokens +
last_usage.output_tokens
)
threshold = params.compaction_control.context_token_threshold or COMPACTION_THRESHOLD
if total_tokens < threshold:
return False
# Trigger compaction
await generate_compaction_summary(params)
return True
Summary Generation Prompt
You have been working on the task described above but have not yet completed it.
Write a continuation summary that will allow you (or another instance of yourself) to resume
work efficiently in a future context window where the conversation history will be replaced
with this summary. Your summary should be structured, concise, and actionable. Include:
1. Task Overview
- The user's core request and success criteria
- Any clarifications or constraints they specified
2. Current State
- What has been completed so far
- Files created, modified, or analyzed (with paths if relevant)
- Key outputs or artifacts produced
3. Important Discoveries
- Technical constraints or requirements uncovered
- Decisions made and their rationale
- Errors encountered and how they were resolved
- What approaches were tried that didn't work (and why)
4. Next Steps
- Specific actions needed to complete the task
- Any blockers or open questions to resolve
- Priority order if multiple steps remain
5. Context to Preserve
- User preferences or style requirements
- Domain-specific details that aren't obvious
- Any promises made to the user
Be concise but complete—err on the side of including information that would prevent
duplicate work or repeated mistakes. Write in a way that enables immediate resumption
of the task.
Wrap your summary in <summary></summary> tags.
Compaction Implementation
async def generate_compaction_summary(params):
messages = params.messages.copy()
# Remove trailing tool_use if present (incomplete)
if messages and messages[-1].role == "assistant":
last = messages[-1]
if isinstance(last.content, list):
filtered = [c for c in last.content if c.type != "tool_use"]
if not filtered:
messages.pop()
else:
last.content = filtered
# Generate summary
summary_response = await client.messages.create(
model=params.model,
messages=[
*messages,
{"role": "user", "content": COMPACTION_PROMPT}
],
max_tokens=params.max_tokens,
headers={"x-stainless-helper": "compaction"}
)
# Replace ALL messages with summary
params.messages = [{
"role": "user",
"content": summary_response.content
}]
Summary as Info Message
The summary is stored as an "info" message:
{
"role": "info",
"content": [{
"type": "summary",
"summary": {
"type": "message",
"summary": "<summary text>"
}
}]
}
Post-Summary Message Processing
When building messages for the API, skip everything before the summary:
def build_api_messages(thread):
# Find most recent summary
summary_block, summary_index = find_summary_block(thread)
messages = []
# Use summary as starting point
if summary_block and summary_block.summary.type == "message":
messages.append({
"role": "assistant",
"content": [{
"type": "text",
"text": summary_block.summary.summary.strip()
}]
})
# Process messages AFTER summary only
start_index = summary_index + 1 if summary_block else 0
for i in range(start_index, len(thread.messages)):
messages.append(convert_message(thread.messages[i]))
return messages
def find_summary_block(thread):
"""Search backwards for most recent summary."""
for i in range(len(thread.messages) - 1, -1, -1):
message = thread.messages[i]
if message.role == "info":
for content in message.content:
if content.type == "summary":
return content, i
return None, 0
Handoff System
When compaction isn't enough, create a new thread with extracted context.
When Handoff Triggers
- Manual: Model calls
handofftool - User request: Explicit instruction to hand off
LLM-Driven Context Extraction
Extract relevant context from the conversation above for continuing this work.
Write from my perspective (first person: "I did...", "I told you...").
Consider what would be useful to know based on my request below. Questions that might be relevant:
- What did I just do or implement?
- What instructions did I already give you which are still relevant?
- What files did I already tell you that's important or that I am working on?
- Did I provide a plan or spec that should be included?
- What did I already tell you that's important (libraries, patterns, constraints)?
Handoff Constants
| Constant | Value | Description |
|---|---|---|
HANDOFF_FILE_BUDGET |
25,000 tokens | Token budget for files |
MAX_HANDOFF_FILES |
10 | Maximum files in handoff |
MAX_FILE_SIZE |
32,768 bytes | Per-file size cap (for budgeting) |
HANDOFF_TIMEOUT |
30,000 ms | Extraction timeout |
Handoff Implementation
async def create_handoff(thread, user_request):
# Force tool call to extract context
extraction = await force_tool_call(
thread,
tool="create_handoff_context",
prompt=HANDOFF_EXTRACTION_PROMPT + user_request
)
# Create new thread with parent relationship
new_thread = create_thread(
parent_id=thread.id,
parent_relationship="handoff"
)
# Build initial message with extracted context
initial_message = build_handoff_message(
extraction.summary,
extraction.files[:10], # Max 10 files
user_request
)
new_thread.messages.append(initial_message)
return new_thread
Caching Strategy
Prompt Caching
Anthropic supports prompt caching. Structure prompts to maximize cache hits:
# Cacheable content (stable, prefix)
system_prompt = get_system_prompt() # Cached
tool_definitions = get_tool_schemas() # Cached
project_memory = load_agents_md() # Cached
# Dynamic content (varies, suffix)
conversation_history = thread.messages # Not cached
current_user_message = user_input # Not cached
Cache Control Points
def build_system_blocks(system_prompt, tools, memory):
"""Structure for maximum cache reuse."""
return [
# Block 1: Base system prompt (most stable)
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}
},
# Block 2: Tools (stable per mode)
{
"type": "text",
"text": format_tools(tools),
"cache_control": {"type": "ephemeral"}
},
# Block 3: Project memory (stable per session)
{
"type": "text",
"text": memory,
"cache_control": {"type": "ephemeral"}
}
]
Implementation Checklist
Building context management? Ensure:
Token Counting
- API-based counting (primary)
- Character estimation fallback
- Byte estimation for files
- Per-message usage tracking
Threshold Monitoring
- Pre-call context check
- Model fallback logic
- Usage history tracking
Compaction
- Trigger threshold (100K default)
- Summary generation prompt
- Info message storage
- Post-summary message skipping
Handoff
- Context extraction via tool call
- File budget enforcement
- New thread creation
- Parent relationship tracking
Caching
- Stable content identification
- Cache control points
- Cache hit monitoring
Constants Reference
| Constant | Value | Purpose |
|---|---|---|
CHARS_PER_TOKEN |
4 | Character estimation |
BYTES_PER_TOKEN |
3.5 | File estimation |
COMPACTION_THRESHOLD |
100,000 | Compaction trigger |
HANDOFF_FILE_BUDGET |
25,000 | Handoff file tokens |
MAX_HANDOFF_FILES |
10 | Max files in handoff |
MAX_FILE_SIZE |
32,768 | Per-file byte limit (budgeting) |
HANDOFF_TIMEOUT |
30,000 | Extraction timeout (ms) |
What's Next
Context is managed. But how do we stream the response in real-time?
→ 07-streaming.md - SSE parsing, delta callbacks