Multi-Turn Conversation Design: Context Trimming, State Management and 200K Window Optimization
Chapter 8: Multi-Turn Conversation Management: History Compression, Context Pruning, and Session Persistence
8.1 The Statelessness Problem
The Claude API is stateless. Every request is an independent HTTP call. "Conversation" is a fiction maintained by the client: you pass the full conversation history in the messages parameter, and the model uses it to appear contextually aware.
This has a direct cost implication:
Token consumption growth pattern:
Turn 1:
Input = system(500) + user_1(100) = 600 tokens
Turn 5:
Input = system(500) + 4 prior turns(1,200) + user_5(100) = 1,800 tokens
Turn 20:
Input = system(500) + 19 prior turns(6,000) + user_20(100) = 6,600 tokens
Turn 50 (unmanaged):
Input may reach 20,000+ tokens → meaningful cost and latency increase
Multi-turn conversation management solves three problems:
- Context length control: Keep the token budget reasonable as history grows
- Information preservation: Don't drop information that is still relevant
- Session persistence: Resume conversations after restarts, across devices, between sessions
8.2 The Data Model
Start with a clear data model before writing conversation management logic:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class Role(str, Enum):
USER = "user"
ASSISTANT = "assistant"
@dataclass
class Message:
role: Role
content: str | list[dict]
timestamp: datetime = field(default_factory=datetime.now)
input_tokens: int = 0
output_tokens: int = 0
metadata: dict = field(default_factory=dict)
def to_api_format(self) -> dict:
return {"role": self.role.value, "content": self.content}
@dataclass
class Conversation:
session_id: str
system_prompt: str
model: str
messages: list[Message] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
def add(self, role: Role, content: str | list[dict], **kwargs) -> Message:
msg = Message(role=role, content=content, **kwargs)
self.messages.append(msg)
self.last_updated = datetime.now()
return msg
def to_api_messages(self) -> list[dict]:
return [m.to_api_format() for m in self.messages]
def estimated_tokens(self) -> int:
total_chars = len(self.system_prompt)
for m in self.messages:
if isinstance(m.content, str):
total_chars += len(m.content)
elif isinstance(m.content, list):
total_chars += sum(
len(b.get("text", ""))
for b in m.content
if isinstance(b, dict)
)
return total_chars // 4 # English approximation
8.3 Context Window Management Strategies
Strategy 1: Sliding Window
Keep only the last N turns. Discard everything older:
import anthropic
client = anthropic.Anthropic()
class SlidingWindowConversation:
"""
Retains the most recent N turns; older turns are discarded entirely.
Best for: casual chat, FAQ bots where early context rarely matters.
Weakness: drops user-provided background from early turns.
"""
def __init__(
self,
system_prompt: str,
model: str = "claude-sonnet-4-6",
max_turns: int = 10,
max_tokens_per_reply: int = 1024,
):
self.system = system_prompt
self.model = model
self.max_turns = max_turns
self.max_reply_tokens = max_tokens_per_reply
self._full_history: list[dict] = [] # complete history (in memory)
def chat(self, user_message: str) -> str:
self._full_history.append({"role": "user", "content": user_message})
# Send only the most recent max_turns * 2 messages
api_messages = self._full_history[-(self.max_turns * 2):]
resp = client.messages.create(
model=self.model,
max_tokens=self.max_reply_tokens,
system=self.system,
messages=api_messages,
)
reply = resp.content[0].text
self._full_history.append({"role": "assistant", "content": reply})
return reply
@property
def stats(self) -> dict:
return {
"total_turns": len(self._full_history) // 2,
"active_turns": min(self.max_turns, len(self._full_history) // 2),
}
Strategy 2: Summarization Compression
When history exceeds a token budget, compress the older portion into a natural-language summary:
from typing import Optional
class SummarizingConversation:
"""
Compresses old conversation history into a running summary when the
token budget is exceeded.
Best for: most production chatbots and assistants.
"""
def __init__(
self,
system_prompt: str,
model: str = "claude-sonnet-4-6",
max_history_tokens: int = 8_000,
summary_model: str = "claude-haiku-4-5-20251001",
):
self.system = system_prompt
self.model = model
self.max_history_tokens = max_history_tokens
self.summary_model = summary_model
self.messages: list[dict] = []
self.summary: Optional[str] = None
def _estimate_tokens(self, messages: list[dict]) -> int:
return sum(
len(str(m.get("content", ""))) // 4
for m in messages
)
def _compress(self):
"""Summarize the older half of the message history."""
if len(self.messages) < 4:
return
split = len(self.messages) // 2
old, self.messages = self.messages[:split], self.messages[split:]
prior = f"[Previous summary]\n{self.summary}\n\n" if self.summary else ""
history_text = "\n".join(
f"{'User' if m['role'] == 'user' else 'Claude'}: "
f"{str(m.get('content', ''))[:300]}"
for m in old
)
resp = client.messages.create(
model=self.summary_model,
max_tokens=350,
messages=[{"role": "user", "content":
f"{prior}Summarize the key information, decisions, and user-provided "
f"background from this conversation in 4–5 sentences:\n\n{history_text}"}]
)
self.summary = resp.content[0].text
def _build_api_messages(self) -> list[dict]:
if not self.summary:
return list(self.messages)
return [
{"role": "user",
"content": f"[Conversation summary]\n{self.summary}\n\n"
f"(This summarizes our earlier discussion. Please keep it in mind.)"},
{"role": "assistant",
"content": "Understood. I have the context from our earlier conversation."},
*self.messages,
]
def chat(self, user_message: str) -> str:
if self._estimate_tokens(self.messages) > self.max_history_tokens:
self._compress()
self.messages.append({"role": "user", "content": user_message})
resp = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system,
messages=self._build_api_messages(),
)
reply = resp.content[0].text
self.messages.append({"role": "assistant", "content": reply})
return reply
Strategy 3: Structured Memory Extraction
Rather than summarizing in natural language, extract and maintain a structured key-value memory store. More precise but more complex:
import json
class StructuredMemoryConversation:
"""
Extracts and persists structured facts from the conversation rather than
free-text summaries.
Best for: assistant products that track user preferences, decisions, and state.
"""
def __init__(self, system_prompt: str, model: str = "claude-sonnet-4-6"):
self.system = system_prompt
self.model = model
self.messages: list[dict] = []
self.memory: dict = {
"user_profile": {},
"preferences": {},
"decisions": [],
"task_context": {},
"open_questions": [],
}
def _extract_updates(self, user_msg: str, assistant_msg: str) -> dict:
prompt = f"""Extract memorable facts from this conversation turn.
Return ONLY a JSON object. If nothing is worth storing, return {{}}.
JSON format:
{{
"user_profile": {{}}, // identity or background info
"preferences": {{}}, // stated preferences
"decisions": [], // decisions made
"task_context": {{}}, // task state updates
"open_questions": [] // unresolved questions
}}
User: {user_msg[:400]}
Assistant: {assistant_msg[:400]}"""
resp = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{"role": "user", "content": prompt}]
)
try:
import re
m = re.search(r'\{.*\}', resp.content[0].text, re.DOTALL)
if m:
return json.loads(m.group())
except (json.JSONDecodeError, Exception):
pass
return {}
def _apply_updates(self, updates: dict):
for key in ["user_profile", "preferences", "task_context"]:
if updates.get(key):
self.memory[key].update(updates[key])
for key in ["decisions", "open_questions"]:
if updates.get(key):
self.memory[key].extend(updates[key])
# Deduplicate
self.memory[key] = list(dict.fromkeys(self.memory[key]))
def _memory_context(self) -> str:
if not any(self.memory.values()):
return ""
parts = ["[Conversation Memory]"]
if self.memory["user_profile"]:
parts.append(f"User: {json.dumps(self.memory['user_profile'])}")
if self.memory["preferences"]:
parts.append(f"Preferences: {json.dumps(self.memory['preferences'])}")
if self.memory["decisions"]:
parts.append(f"Decisions: {'; '.join(self.memory['decisions'][-5:])}")
if self.memory["task_context"]:
parts.append(f"Context: {json.dumps(self.memory['task_context'])}")
return "\n".join(parts)
def chat(self, user_message: str) -> str:
mem_ctx = self._memory_context()
recent = self.messages[-10:]
if mem_ctx and not recent:
api_messages = [{"role": "user", "content": f"{mem_ctx}\n\n{user_message}"}]
else:
api_messages = recent + [{"role": "user", "content": user_message}]
resp = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system,
messages=api_messages,
)
reply = resp.content[0].text
self.messages.append({"role": "user", "content": user_message})
self.messages.append({"role": "assistant", "content": reply})
updates = self._extract_updates(user_message, reply)
if updates:
self._apply_updates(updates)
return reply
8.4 Session Persistence
Database Schema (PostgreSQL)
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id TEXT NOT NULL,
title TEXT,
system_prompt TEXT NOT NULL,
model TEXT NOT NULL DEFAULT 'claude-sonnet-4-6',
created_at TIMESTAMPTZ DEFAULT NOW(),
last_updated TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB DEFAULT '{}',
is_archived BOOLEAN DEFAULT FALSE
);
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
content_blocks JSONB, -- multimodal content (optional)
input_tokens INT DEFAULT 0,
output_tokens INT DEFAULT 0,
model_used TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE conversation_summaries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
summary TEXT NOT NULL,
message_count_at_creation INT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_messages_conv_created ON messages(conversation_id, created_at);
CREATE INDEX idx_summaries_conversation ON conversation_summaries(conversation_id);
Persistence Layer (asyncpg)
import asyncpg
from uuid import UUID
class ConversationStore:
def __init__(self, dsn: str):
self.dsn = dsn
self._pool: asyncpg.Pool | None = None
async def initialize(self):
self._pool = await asyncpg.create_pool(self.dsn)
async def create_conversation(
self, user_id: str, system_prompt: str, model: str, title: str = "New Conversation"
) -> str:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"INSERT INTO conversations (user_id, system_prompt, model, title) "
"VALUES ($1, $2, $3, $4) RETURNING id",
user_id, system_prompt, model, title
)
return str(row["id"])
async def append_message(
self, conversation_id: str, role: str, content: str,
input_tokens: int = 0, output_tokens: int = 0, model_used: str = None
) -> str:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"INSERT INTO messages (conversation_id, role, content, "
"input_tokens, output_tokens, model_used) "
"VALUES ($1, $2, $3, $4, $5, $6) RETURNING id",
conversation_id, role, content, input_tokens, output_tokens, model_used
)
await conn.execute(
"UPDATE conversations SET last_updated = NOW() WHERE id = $1",
conversation_id
)
return str(row["id"])
async def get_recent_messages(self, conversation_id: str, limit: int = 20) -> list[dict]:
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"SELECT role, content FROM messages "
"WHERE conversation_id = $1 "
"ORDER BY created_at DESC LIMIT $2",
conversation_id, limit
)
return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)]
async def save_summary(self, conversation_id: str, summary: str, message_count: int):
async with self._pool.acquire() as conn:
await conn.execute(
"INSERT INTO conversation_summaries "
"(conversation_id, summary, message_count_at_creation) VALUES ($1, $2, $3)",
conversation_id, summary, message_count
)
async def get_latest_summary(self, conversation_id: str) -> str | None:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT summary FROM conversation_summaries "
"WHERE conversation_id = $1 ORDER BY created_at DESC LIMIT 1",
conversation_id
)
return row["summary"] if row else None
8.5 Full Persistent Conversation Assistant
Combining all components:
import asyncio
import anthropic
class PersistentAssistant:
"""
Production-grade persistent multi-turn conversation assistant.
Features: automatic compression, database persistence, cost tracking.
"""
def __init__(
self,
store: ConversationStore,
model: str = "claude-sonnet-4-6",
max_context_tokens: int = 10_000,
summary_model: str = "claude-haiku-4-5-20251001",
):
self.store = store
self.client = anthropic.Anthropic()
self.model = model
self.max_context_tokens = max_context_tokens
self.summary_model = summary_model
async def start_session(self, user_id: str, system_prompt: str, **kwargs) -> str:
return await self.store.create_conversation(
user_id=user_id, system_prompt=system_prompt, model=self.model, **kwargs
)
async def chat(self, session_id: str, user_message: str, system_prompt: str) -> dict:
# Load recent history
recent = await self.store.get_recent_messages(session_id, limit=30)
# Build context (with summary injection if needed)
context = await self._build_context(session_id, recent)
context.append({"role": "user", "content": user_message})
# Call API
resp = self.client.messages.create(
model=self.model,
max_tokens=1024,
system=system_prompt,
messages=context,
)
reply = resp.content[0].text
# Persist both turns
await self.store.append_message(
session_id, "user", user_message,
input_tokens=resp.usage.input_tokens,
)
await self.store.append_message(
session_id, "assistant", reply,
output_tokens=resp.usage.output_tokens,
model_used=resp.model,
)
# Trigger background summarization if history is getting long
if len(recent) > 24:
asyncio.create_task(self._maybe_summarize(session_id, recent))
return {
"reply": reply,
"input_tokens": resp.usage.input_tokens,
"output_tokens": resp.usage.output_tokens,
"context_messages_sent": len(context),
}
async def _build_context(self, session_id: str, recent: list[dict]) -> list[dict]:
estimated = sum(len(m.get("content", "")) // 4 for m in recent)
if estimated <= self.max_context_tokens:
return list(recent)
summary = await self.store.get_latest_summary(session_id)
trimmed = recent[-10:]
if summary:
return [
{"role": "user",
"content": f"[Earlier conversation summary]\n{summary}\n\n"
f"(Please keep this background in mind as we continue.)"},
{"role": "assistant",
"content": "Understood, I have the context from our earlier exchange."},
*trimmed,
]
return trimmed
async def _maybe_summarize(self, session_id: str, messages: list[dict]):
"""Background task: generate and save a summary of older messages."""
to_summarize = messages[: len(messages) * 2 // 3]
if len(to_summarize) < 6:
return
history_text = "\n".join(
f"{'User' if m['role'] == 'user' else 'Claude'}: {m['content'][:250]}"
for m in to_summarize
)
resp = self.client.messages.create(
model=self.summary_model,
max_tokens=400,
messages=[{"role": "user", "content":
f"Summarize the key points, decisions, and user background from "
f"this conversation in 5–7 sentences:\n\n{history_text}"}]
)
await self.store.save_summary(
session_id, resp.content[0].text, len(to_summarize)
)
8.6 Cost Tracking and Budget Enforcement
from collections import defaultdict
from datetime import date
class CostTracker:
PRICING = {
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
}
def __init__(self):
self._usage: dict[str, dict] = defaultdict(
lambda: {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, "requests": 0}
)
def record(self, user_id: str, model: str, input_tokens: int, output_tokens: int):
prices = self.PRICING.get(model, {"input": 3.0, "output": 15.0})
cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
key = f"{user_id}:{date.today()}"
rec = self._usage[key]
rec["input_tokens"] += input_tokens
rec["output_tokens"] += output_tokens
rec["cost_usd"] += cost
rec["requests"] += 1
def daily_cost(self, user_id: str) -> float:
return self._usage[f"{user_id}:{date.today()}"]["cost_usd"]
def within_budget(self, user_id: str, daily_limit_usd: float) -> bool:
return self.daily_cost(user_id) < daily_limit_usd
8.7 Common Pitfalls in Multi-Turn Systems
Pitfall 1: Storing Context in Process Memory Only
If your conversation state lives only in process memory, a server restart loses all conversations. Always persist to a durable store before responding to the user.
Pitfall 2: Not Handling the Alternation Constraint
When loading history from a database, check for edge cases: a crash between saving the user message and the assistant message can leave you with an uneven sequence. Add a validation step before sending to the API.
def validate_message_sequence(messages: list[dict]) -> list[dict]:
"""
Ensure messages strictly alternate user/assistant.
Removes trailing assistant messages (incomplete turns).
"""
if not messages:
return []
# Remove consecutive duplicates
cleaned = [messages[0]]
for msg in messages[1:]:
if msg["role"] != cleaned[-1]["role"]:
cleaned.append(msg)
# Must end on a user message
if cleaned and cleaned[-1]["role"] == "assistant":
cleaned = cleaned[:-1]
return cleaned
Pitfall 3: Summarizing Too Aggressively
Summarization loses detail. If you compress a 20-turn conversation into 3 sentences, nuanced user requirements stated in turn 3 may be lost by turn 15. Tune compression thresholds conservatively: compress only when the history exceeds a clear threshold, and keep a generous number of recent turns verbatim.
Pitfall 4: Not Accounting for System Prompt in Token Budget
The system prompt is billed on every request. A 3,000-token system prompt adds $0.009 per request with Sonnet—which is $9 per 1,000 requests. At 100,000 daily requests, that's $900/day just for the system prompt. Include it when estimating your context token budget.
Summary
Multi-turn conversation management is a core engineering concern, not an afterthought:
- The API is stateless: conversation history grows linearly with turns; unmanaged, this compounds costs and latency
- Three strategies for context control:
- Sliding window: simplest; loses early context
- Summarization compression: preserves semantic content; right for most production cases
- Structured memory extraction: most precise; right for preference-tracking assistants
- Database persistence: always persist to durable storage; design for the case where a process dies mid-turn
- Cost tracking: record per-user, per-day token consumption; enforce budgets before they blow up
- Validation: always validate message alternation before sending history to the API
The conversation management architecture you choose will touch every other part of the system—rate limiting, cost monitoring, session recovery. Making the right choice in the first week saves significant refactoring later.