Chapter 8

Multi-Turn Conversation Design: Context Trimming, State Management and 200K Window Optimization

Chapter 8: Multi-Turn Conversation Management: History Compression, Context Pruning, and Session Persistence

8.1 The Statelessness Problem

The Claude API is stateless. Every request is an independent HTTP call. "Conversation" is a fiction maintained by the client: you pass the full conversation history in the messages parameter, and the model uses it to appear contextually aware.

This has a direct cost implication:

Token consumption growth pattern:

Turn 1:
  Input = system(500) + user_1(100) = 600 tokens

Turn 5:
  Input = system(500) + 4 prior turns(1,200) + user_5(100) = 1,800 tokens

Turn 20:
  Input = system(500) + 19 prior turns(6,000) + user_20(100) = 6,600 tokens

Turn 50 (unmanaged):
  Input may reach 20,000+ tokens → meaningful cost and latency increase

Multi-turn conversation management solves three problems:

  1. Context length control: Keep the token budget reasonable as history grows
  2. Information preservation: Don't drop information that is still relevant
  3. Session persistence: Resume conversations after restarts, across devices, between sessions

8.2 The Data Model

Start with a clear data model before writing conversation management logic:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class Role(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"

@dataclass
class Message:
    role: Role
    content: str | list[dict]
    timestamp: datetime = field(default_factory=datetime.now)
    input_tokens: int = 0
    output_tokens: int = 0
    metadata: dict = field(default_factory=dict)

    def to_api_format(self) -> dict:
        return {"role": self.role.value, "content": self.content}

@dataclass
class Conversation:
    session_id: str
    system_prompt: str
    model: str
    messages: list[Message] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    last_updated: datetime = field(default_factory=datetime.now)

    def add(self, role: Role, content: str | list[dict], **kwargs) -> Message:
        msg = Message(role=role, content=content, **kwargs)
        self.messages.append(msg)
        self.last_updated = datetime.now()
        return msg

    def to_api_messages(self) -> list[dict]:
        return [m.to_api_format() for m in self.messages]

    def estimated_tokens(self) -> int:
        total_chars = len(self.system_prompt)
        for m in self.messages:
            if isinstance(m.content, str):
                total_chars += len(m.content)
            elif isinstance(m.content, list):
                total_chars += sum(
                    len(b.get("text", ""))
                    for b in m.content
                    if isinstance(b, dict)
                )
        return total_chars // 4  # English approximation

8.3 Context Window Management Strategies

Strategy 1: Sliding Window

Keep only the last N turns. Discard everything older:

import anthropic

client = anthropic.Anthropic()

class SlidingWindowConversation:
    """
    Retains the most recent N turns; older turns are discarded entirely.
    Best for: casual chat, FAQ bots where early context rarely matters.
    Weakness: drops user-provided background from early turns.
    """

    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_turns: int = 10,
        max_tokens_per_reply: int = 1024,
    ):
        self.system = system_prompt
        self.model = model
        self.max_turns = max_turns
        self.max_reply_tokens = max_tokens_per_reply
        self._full_history: list[dict] = []  # complete history (in memory)

    def chat(self, user_message: str) -> str:
        self._full_history.append({"role": "user", "content": user_message})

        # Send only the most recent max_turns * 2 messages
        api_messages = self._full_history[-(self.max_turns * 2):]

        resp = client.messages.create(
            model=self.model,
            max_tokens=self.max_reply_tokens,
            system=self.system,
            messages=api_messages,
        )

        reply = resp.content[0].text
        self._full_history.append({"role": "assistant", "content": reply})
        return reply

    @property
    def stats(self) -> dict:
        return {
            "total_turns": len(self._full_history) // 2,
            "active_turns": min(self.max_turns, len(self._full_history) // 2),
        }

Strategy 2: Summarization Compression

When history exceeds a token budget, compress the older portion into a natural-language summary:

from typing import Optional

class SummarizingConversation:
    """
    Compresses old conversation history into a running summary when the
    token budget is exceeded.
    Best for: most production chatbots and assistants.
    """

    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_history_tokens: int = 8_000,
        summary_model: str = "claude-haiku-4-5-20251001",
    ):
        self.system = system_prompt
        self.model = model
        self.max_history_tokens = max_history_tokens
        self.summary_model = summary_model
        self.messages: list[dict] = []
        self.summary: Optional[str] = None

    def _estimate_tokens(self, messages: list[dict]) -> int:
        return sum(
            len(str(m.get("content", ""))) // 4
            for m in messages
        )

    def _compress(self):
        """Summarize the older half of the message history."""
        if len(self.messages) < 4:
            return

        split = len(self.messages) // 2
        old, self.messages = self.messages[:split], self.messages[split:]

        prior = f"[Previous summary]\n{self.summary}\n\n" if self.summary else ""
        history_text = "\n".join(
            f"{'User' if m['role'] == 'user' else 'Claude'}: "
            f"{str(m.get('content', ''))[:300]}"
            for m in old
        )

        resp = client.messages.create(
            model=self.summary_model,
            max_tokens=350,
            messages=[{"role": "user", "content":
                f"{prior}Summarize the key information, decisions, and user-provided "
                f"background from this conversation in 4–5 sentences:\n\n{history_text}"}]
        )
        self.summary = resp.content[0].text

    def _build_api_messages(self) -> list[dict]:
        if not self.summary:
            return list(self.messages)

        return [
            {"role": "user",
             "content": f"[Conversation summary]\n{self.summary}\n\n"
                        f"(This summarizes our earlier discussion. Please keep it in mind.)"},
            {"role": "assistant",
             "content": "Understood. I have the context from our earlier conversation."},
            *self.messages,
        ]

    def chat(self, user_message: str) -> str:
        if self._estimate_tokens(self.messages) > self.max_history_tokens:
            self._compress()

        self.messages.append({"role": "user", "content": user_message})

        resp = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=self._build_api_messages(),
        )

        reply = resp.content[0].text
        self.messages.append({"role": "assistant", "content": reply})
        return reply

Strategy 3: Structured Memory Extraction

Rather than summarizing in natural language, extract and maintain a structured key-value memory store. More precise but more complex:

import json

class StructuredMemoryConversation:
    """
    Extracts and persists structured facts from the conversation rather than
    free-text summaries.
    Best for: assistant products that track user preferences, decisions, and state.
    """

    def __init__(self, system_prompt: str, model: str = "claude-sonnet-4-6"):
        self.system = system_prompt
        self.model = model
        self.messages: list[dict] = []
        self.memory: dict = {
            "user_profile": {},
            "preferences": {},
            "decisions": [],
            "task_context": {},
            "open_questions": [],
        }

    def _extract_updates(self, user_msg: str, assistant_msg: str) -> dict:
        prompt = f"""Extract memorable facts from this conversation turn.
Return ONLY a JSON object. If nothing is worth storing, return {{}}.

JSON format:
{{
  "user_profile": {{}},        // identity or background info
  "preferences": {{}},          // stated preferences
  "decisions": [],             // decisions made
  "task_context": {{}},         // task state updates
  "open_questions": []         // unresolved questions
}}

User: {user_msg[:400]}
Assistant: {assistant_msg[:400]}"""

        resp = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=300,
            messages=[{"role": "user", "content": prompt}]
        )

        try:
            import re
            m = re.search(r'\{.*\}', resp.content[0].text, re.DOTALL)
            if m:
                return json.loads(m.group())
        except (json.JSONDecodeError, Exception):
            pass
        return {}

    def _apply_updates(self, updates: dict):
        for key in ["user_profile", "preferences", "task_context"]:
            if updates.get(key):
                self.memory[key].update(updates[key])
        for key in ["decisions", "open_questions"]:
            if updates.get(key):
                self.memory[key].extend(updates[key])
                # Deduplicate
                self.memory[key] = list(dict.fromkeys(self.memory[key]))

    def _memory_context(self) -> str:
        if not any(self.memory.values()):
            return ""
        parts = ["[Conversation Memory]"]
        if self.memory["user_profile"]:
            parts.append(f"User: {json.dumps(self.memory['user_profile'])}")
        if self.memory["preferences"]:
            parts.append(f"Preferences: {json.dumps(self.memory['preferences'])}")
        if self.memory["decisions"]:
            parts.append(f"Decisions: {'; '.join(self.memory['decisions'][-5:])}")
        if self.memory["task_context"]:
            parts.append(f"Context: {json.dumps(self.memory['task_context'])}")
        return "\n".join(parts)

    def chat(self, user_message: str) -> str:
        mem_ctx = self._memory_context()
        recent = self.messages[-10:]

        if mem_ctx and not recent:
            api_messages = [{"role": "user", "content": f"{mem_ctx}\n\n{user_message}"}]
        else:
            api_messages = recent + [{"role": "user", "content": user_message}]

        resp = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=api_messages,
        )

        reply = resp.content[0].text
        self.messages.append({"role": "user", "content": user_message})
        self.messages.append({"role": "assistant", "content": reply})

        updates = self._extract_updates(user_message, reply)
        if updates:
            self._apply_updates(updates)

        return reply

8.4 Session Persistence

Database Schema (PostgreSQL)

CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id TEXT NOT NULL,
    title TEXT,
    system_prompt TEXT NOT NULL,
    model TEXT NOT NULL DEFAULT 'claude-sonnet-4-6',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    last_updated TIMESTAMPTZ DEFAULT NOW(),
    metadata JSONB DEFAULT '{}',
    is_archived BOOLEAN DEFAULT FALSE
);

CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
    content TEXT NOT NULL,
    content_blocks JSONB,            -- multimodal content (optional)
    input_tokens INT DEFAULT 0,
    output_tokens INT DEFAULT 0,
    model_used TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE conversation_summaries (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    summary TEXT NOT NULL,
    message_count_at_creation INT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_messages_conv_created ON messages(conversation_id, created_at);
CREATE INDEX idx_summaries_conversation ON conversation_summaries(conversation_id);

Persistence Layer (asyncpg)

import asyncpg
from uuid import UUID

class ConversationStore:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._pool: asyncpg.Pool | None = None

    async def initialize(self):
        self._pool = await asyncpg.create_pool(self.dsn)

    async def create_conversation(
        self, user_id: str, system_prompt: str, model: str, title: str = "New Conversation"
    ) -> str:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "INSERT INTO conversations (user_id, system_prompt, model, title) "
                "VALUES ($1, $2, $3, $4) RETURNING id",
                user_id, system_prompt, model, title
            )
            return str(row["id"])

    async def append_message(
        self, conversation_id: str, role: str, content: str,
        input_tokens: int = 0, output_tokens: int = 0, model_used: str = None
    ) -> str:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "INSERT INTO messages (conversation_id, role, content, "
                "input_tokens, output_tokens, model_used) "
                "VALUES ($1, $2, $3, $4, $5, $6) RETURNING id",
                conversation_id, role, content, input_tokens, output_tokens, model_used
            )
            await conn.execute(
                "UPDATE conversations SET last_updated = NOW() WHERE id = $1",
                conversation_id
            )
            return str(row["id"])

    async def get_recent_messages(self, conversation_id: str, limit: int = 20) -> list[dict]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT role, content FROM messages "
                "WHERE conversation_id = $1 "
                "ORDER BY created_at DESC LIMIT $2",
                conversation_id, limit
            )
            return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)]

    async def save_summary(self, conversation_id: str, summary: str, message_count: int):
        async with self._pool.acquire() as conn:
            await conn.execute(
                "INSERT INTO conversation_summaries "
                "(conversation_id, summary, message_count_at_creation) VALUES ($1, $2, $3)",
                conversation_id, summary, message_count
            )

    async def get_latest_summary(self, conversation_id: str) -> str | None:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT summary FROM conversation_summaries "
                "WHERE conversation_id = $1 ORDER BY created_at DESC LIMIT 1",
                conversation_id
            )
            return row["summary"] if row else None

8.5 Full Persistent Conversation Assistant

Combining all components:

import asyncio
import anthropic

class PersistentAssistant:
    """
    Production-grade persistent multi-turn conversation assistant.
    Features: automatic compression, database persistence, cost tracking.
    """

    def __init__(
        self,
        store: ConversationStore,
        model: str = "claude-sonnet-4-6",
        max_context_tokens: int = 10_000,
        summary_model: str = "claude-haiku-4-5-20251001",
    ):
        self.store = store
        self.client = anthropic.Anthropic()
        self.model = model
        self.max_context_tokens = max_context_tokens
        self.summary_model = summary_model

    async def start_session(self, user_id: str, system_prompt: str, **kwargs) -> str:
        return await self.store.create_conversation(
            user_id=user_id, system_prompt=system_prompt, model=self.model, **kwargs
        )

    async def chat(self, session_id: str, user_message: str, system_prompt: str) -> dict:
        # Load recent history
        recent = await self.store.get_recent_messages(session_id, limit=30)

        # Build context (with summary injection if needed)
        context = await self._build_context(session_id, recent)
        context.append({"role": "user", "content": user_message})

        # Call API
        resp = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=system_prompt,
            messages=context,
        )

        reply = resp.content[0].text

        # Persist both turns
        await self.store.append_message(
            session_id, "user", user_message,
            input_tokens=resp.usage.input_tokens,
        )
        await self.store.append_message(
            session_id, "assistant", reply,
            output_tokens=resp.usage.output_tokens,
            model_used=resp.model,
        )

        # Trigger background summarization if history is getting long
        if len(recent) > 24:
            asyncio.create_task(self._maybe_summarize(session_id, recent))

        return {
            "reply": reply,
            "input_tokens": resp.usage.input_tokens,
            "output_tokens": resp.usage.output_tokens,
            "context_messages_sent": len(context),
        }

    async def _build_context(self, session_id: str, recent: list[dict]) -> list[dict]:
        estimated = sum(len(m.get("content", "")) // 4 for m in recent)

        if estimated <= self.max_context_tokens:
            return list(recent)

        summary = await self.store.get_latest_summary(session_id)
        trimmed = recent[-10:]

        if summary:
            return [
                {"role": "user",
                 "content": f"[Earlier conversation summary]\n{summary}\n\n"
                             f"(Please keep this background in mind as we continue.)"},
                {"role": "assistant",
                 "content": "Understood, I have the context from our earlier exchange."},
                *trimmed,
            ]
        return trimmed

    async def _maybe_summarize(self, session_id: str, messages: list[dict]):
        """Background task: generate and save a summary of older messages."""
        to_summarize = messages[: len(messages) * 2 // 3]
        if len(to_summarize) < 6:
            return

        history_text = "\n".join(
            f"{'User' if m['role'] == 'user' else 'Claude'}: {m['content'][:250]}"
            for m in to_summarize
        )

        resp = self.client.messages.create(
            model=self.summary_model,
            max_tokens=400,
            messages=[{"role": "user", "content":
                f"Summarize the key points, decisions, and user background from "
                f"this conversation in 5–7 sentences:\n\n{history_text}"}]
        )

        await self.store.save_summary(
            session_id, resp.content[0].text, len(to_summarize)
        )

8.6 Cost Tracking and Budget Enforcement

from collections import defaultdict
from datetime import date

class CostTracker:
    PRICING = {
        "claude-opus-4-6":           {"input": 15.00, "output": 75.00},
        "claude-sonnet-4-6":         {"input":  3.00, "output": 15.00},
        "claude-haiku-4-5-20251001": {"input":  0.25, "output":  1.25},
    }

    def __init__(self):
        self._usage: dict[str, dict] = defaultdict(
            lambda: {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, "requests": 0}
        )

    def record(self, user_id: str, model: str, input_tokens: int, output_tokens: int):
        prices = self.PRICING.get(model, {"input": 3.0, "output": 15.0})
        cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
        key = f"{user_id}:{date.today()}"
        rec = self._usage[key]
        rec["input_tokens"] += input_tokens
        rec["output_tokens"] += output_tokens
        rec["cost_usd"] += cost
        rec["requests"] += 1

    def daily_cost(self, user_id: str) -> float:
        return self._usage[f"{user_id}:{date.today()}"]["cost_usd"]

    def within_budget(self, user_id: str, daily_limit_usd: float) -> bool:
        return self.daily_cost(user_id) < daily_limit_usd

8.7 Common Pitfalls in Multi-Turn Systems

Pitfall 1: Storing Context in Process Memory Only

If your conversation state lives only in process memory, a server restart loses all conversations. Always persist to a durable store before responding to the user.

Pitfall 2: Not Handling the Alternation Constraint

When loading history from a database, check for edge cases: a crash between saving the user message and the assistant message can leave you with an uneven sequence. Add a validation step before sending to the API.

def validate_message_sequence(messages: list[dict]) -> list[dict]:
    """
    Ensure messages strictly alternate user/assistant.
    Removes trailing assistant messages (incomplete turns).
    """
    if not messages:
        return []

    # Remove consecutive duplicates
    cleaned = [messages[0]]
    for msg in messages[1:]:
        if msg["role"] != cleaned[-1]["role"]:
            cleaned.append(msg)

    # Must end on a user message
    if cleaned and cleaned[-1]["role"] == "assistant":
        cleaned = cleaned[:-1]

    return cleaned

Pitfall 3: Summarizing Too Aggressively

Summarization loses detail. If you compress a 20-turn conversation into 3 sentences, nuanced user requirements stated in turn 3 may be lost by turn 15. Tune compression thresholds conservatively: compress only when the history exceeds a clear threshold, and keep a generous number of recent turns verbatim.

Pitfall 4: Not Accounting for System Prompt in Token Budget

The system prompt is billed on every request. A 3,000-token system prompt adds $0.009 per request with Sonnet—which is $9 per 1,000 requests. At 100,000 daily requests, that's $900/day just for the system prompt. Include it when estimating your context token budget.


Summary

Multi-turn conversation management is a core engineering concern, not an afterthought:

  1. The API is stateless: conversation history grows linearly with turns; unmanaged, this compounds costs and latency
  2. Three strategies for context control:
    • Sliding window: simplest; loses early context
    • Summarization compression: preserves semantic content; right for most production cases
    • Structured memory extraction: most precise; right for preference-tracking assistants
  3. Database persistence: always persist to durable storage; design for the case where a process dies mid-turn
  4. Cost tracking: record per-user, per-day token consumption; enforce budgets before they blow up
  5. Validation: always validate message alternation before sending history to the API

The conversation management architecture you choose will touch every other part of the system—rate limiting, cost monitoring, session recovery. Making the right choice in the first week saves significant refactoring later.

Rate this chapter
4.8  / 5  (65 ratings)

💬 Comments