Chapter 8

Multi-Turn Conversation Design: Context Trimming, State Management and 200K Window Optimization

Chapter 8: Multi-Turn Conversation Management: History Compression, Context Pruning, and Session Persistence

8.1 The Statelessness Problem

The Claude API is stateless. Every request is an independent HTTP call. "Conversation" is a fiction maintained by the client: you pass the full conversation history in the messages parameter, and the model uses it to appear contextually aware.

This has a direct cost implication:

Token consumption growth pattern:

Turn 1:
  Input = system(500) + user_1(100) = 600 tokens

Turn 5:
  Input = system(500) + 4 prior turns(1,200) + user_5(100) = 1,800 tokens

Turn 20:
  Input = system(500) + 19 prior turns(6,000) + user_20(100) = 6,600 tokens

Turn 50 (unmanaged):
  Input may reach 20,000+ tokens โ†’ meaningful cost and latency increase

Multi-turn conversation management solves three problems:

  1. Context length control: Keep the token budget reasonable as history grows
  2. Information preservation: Don't drop information that is still relevant
  3. Session persistence: Resume conversations after restarts, across devices, between sessions

8.2 The Data Model

Start with a clear data model before writing conversation management logic:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class Role(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"

@dataclass
class Message:
    role: Role
    content: str | list[dict]
    timestamp: datetime = field(default_factory=datetime.now)
    input_tokens: int = 0
    output_tokens: int = 0
    metadata: dict = field(default_factory=dict)

    def to_api_format(self) -> dict:
        return {"role": self.role.value, "content": self.content}

@dataclass
class Conversation:
    session_id: str
    system_prompt: str
    model: str
    messages: list[Message] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    last_updated: datetime = field(default_factory=datetime.now)

    def add(self, role: Role, content: str | list[dict], **kwargs) -> Message:
        msg = Message(role=role, content=content, **kwargs)
        self.messages.append(msg)
        self.last_updated = datetime.now()
        return msg

    def to_api_messages(self) -> list[dict]:
        return [m.to_api_format() for m in self.messages]

    def estimated_tokens(self) -> int:
        total_chars = len(self.system_prompt)
        for m in self.messages:
            if isinstance(m.content, str):
                total_chars += len(m.content)
            elif isinstance(m.content, list):
                total_chars += sum(
                    len(b.get("text", ""))
                    for b in m.content
                    if isinstance(b, dict)
                )
        return total_chars // 4  # English approximation

8.3 Context Window Management Strategies

Strategy 1: Sliding Window

Keep only the last N turns. Discard everything older:

import anthropic

client = anthropic.Anthropic()

class SlidingWindowConversation:
    """
    Retains the most recent N turns; older turns are discarded entirely.
    Best for: casual chat, FAQ bots where early context rarely matters.
    Weakness: drops user-provided background from early turns.
    """

    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_turns: int = 10,
        max_tokens_per_reply: int = 1024,
    ):
        self.system = system_prompt
        self.model = model
        self.max_turns = max_turns
        self.max_reply_tokens = max_tokens_per_reply
        self._full_history: list[dict] = []  # complete history (in memory)

    def chat(self, user_message: str) -> str:
        self._full_history.append({"role": "user", "content": user_message})

        # Send only the most recent max_turns * 2 messages
        api_messages = self._full_history[-(self.max_turns * 2):]

        resp = client.messages.create(
            model=self.model,
            max_tokens=self.max_reply_tokens,
            system=self.system,
            messages=api_messages,
        )

        reply = resp.content[0].text
        self._full_history.append({"role": "assistant", "content": reply})
        return reply

    @property
    def stats(self) -> dict:
        return {
            "total_turns": len(self._full_history) // 2,
            "active_turns": min(self.max_turns, len(self._full_history) // 2),
        }

Strategy 2: Summarization Compression

When history exceeds a token budget, compress the older portion into a natural-language summary:

from typing import Optional

class SummarizingConversation:
    """
    Compresses old conversation history into a running summary when the
    token budget is exceeded.
    Best for: most production chatbots and assistants.
    """

    def __init__(
        self,
        system_prompt: str,
        model: str = "claude-sonnet-4-6",
        max_history_tokens: int = 8_000,
        summary_model: str = "claude-haiku-4-5-20251001",
    ):
        self.system = system_prompt
        self.model = model
        self.max_history_tokens = max_history_tokens
        self.summary_model = summary_model
        self.messages: list[dict] = []
        self.summary: Optional[str] = None

    def _estimate_tokens(self, messages: list[dict]) -> int:
        return sum(
            len(str(m.get("content", ""))) // 4
            for m in messages
        )

    def _compress(self):
        """Summarize the older half of the message history."""
        if len(self.messages) < 4:
            return

        split = len(self.messages) // 2
        old, self.messages = self.messages[:split], self.messages[split:]

        prior = f"[Previous summary]\n{self.summary}\n\n" if self.summary else ""
        history_text = "\n".join(
            f"{'User' if m['role'] == 'user' else 'Claude'}: "
            f"{str(m.get('content', ''))[:300]}"
            for m in old
        )

        resp = client.messages.create(
            model=self.summary_model,
            max_tokens=350,
            messages=[{"role": "user", "content":
                f"{prior}Summarize the key information, decisions, and user-provided "
                f"background from this conversation in 4โ€“5 sentences:\n\n{history_text}"}]
        )
        self.summary = resp.content[0].text

    def _build_api_messages(self) -> list[dict]:
        if not self.summary:
            return list(self.messages)

        return [
            {"role": "user",
             "content": f"[Conversation summary]\n{self.summary}\n\n"
                        f"(This summarizes our earlier discussion. Please keep it in mind.)"},
            {"role": "assistant",
             "content": "Understood. I have the context from our earlier conversation."},
            *self.messages,
        ]

    def chat(self, user_message: str) -> str:
        if self._estimate_tokens(self.messages) > self.max_history_tokens:
            self._compress()

        self.messages.append({"role": "user", "content": user_message})

        resp = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=self._build_api_messages(),
        )

        reply = resp.content[0].text
        self.messages.append({"role": "assistant", "content": reply})
        return reply

Strategy 3: Structured Memory Extraction

Rather than summarizing in natural language, extract and maintain a structured key-value memory store. More precise but more complex:

import json

class StructuredMemoryConversation:
    """
    Extracts and persists structured facts from the conversation rather than
    free-text summaries.
    Best for: assistant products that track user preferences, decisions, and state.
    """

    def __init__(self, system_prompt: str, model: str = "claude-sonnet-4-6"):
        self.system = system_prompt
        self.model = model
        self.messages: list[dict] = []
        self.memory: dict = {
            "user_profile": {},
            "preferences": {},
            "decisions": [],
            "task_context": {},
            "open_questions": [],
        }

    def _extract_updates(self, user_msg: str, assistant_msg: str) -> dict:
        prompt = f"""Extract memorable facts from this conversation turn.
Return ONLY a JSON object. If nothing is worth storing, return {{}}.

JSON format:
{{
  "user_profile": {{}},        // identity or background info
  "preferences": {{}},          // stated preferences
  "decisions": [],             // decisions made
  "task_context": {{}},         // task state updates
  "open_questions": []         // unresolved questions
}}

User: {user_msg[:400]}
Assistant: {assistant_msg[:400]}"""

        resp = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=300,
            messages=[{"role": "user", "content": prompt}]
        )

        try:
            import re
            m = re.search(r'\{.*\}', resp.content[0].text, re.DOTALL)
            if m:
                return json.loads(m.group())
        except (json.JSONDecodeError, Exception):
            pass
        return {}

    def _apply_updates(self, updates: dict):
        for key in ["user_profile", "preferences", "task_context"]:
            if updates.get(key):
                self.memory[key].update(updates[key])
        for key in ["decisions", "open_questions"]:
            if updates.get(key):
                self.memory[key].extend(updates[key])
                # Deduplicate
                self.memory[key] = list(dict.fromkeys(self.memory[key]))

    def _memory_context(self) -> str:
        if not any(self.memory.values()):
            return ""
        parts = ["[Conversation Memory]"]
        if self.memory["user_profile"]:
            parts.append(f"User: {json.dumps(self.memory['user_profile'])}")
        if self.memory["preferences"]:
            parts.append(f"Preferences: {json.dumps(self.memory['preferences'])}")
        if self.memory["decisions"]:
            parts.append(f"Decisions: {'; '.join(self.memory['decisions'][-5:])}")
        if self.memory["task_context"]:
            parts.append(f"Context: {json.dumps(self.memory['task_context'])}")
        return "\n".join(parts)

    def chat(self, user_message: str) -> str:
        mem_ctx = self._memory_context()
        recent = self.messages[-10:]

        if mem_ctx and not recent:
            api_messages = [{"role": "user", "content": f"{mem_ctx}\n\n{user_message}"}]
        else:
            api_messages = recent + [{"role": "user", "content": user_message}]

        resp = client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=self.system,
            messages=api_messages,
        )

        reply = resp.content[0].text
        self.messages.append({"role": "user", "content": user_message})
        self.messages.append({"role": "assistant", "content": reply})

        updates = self._extract_updates(user_message, reply)
        if updates:
            self._apply_updates(updates)

        return reply

8.4 Session Persistence

Database Schema (PostgreSQL)

CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id TEXT NOT NULL,
    title TEXT,
    system_prompt TEXT NOT NULL,
    model TEXT NOT NULL DEFAULT 'claude-sonnet-4-6',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    last_updated TIMESTAMPTZ DEFAULT NOW(),
    metadata JSONB DEFAULT '{}',
    is_archived BOOLEAN DEFAULT FALSE
);

CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
    content TEXT NOT NULL,
    content_blocks JSONB,            -- multimodal content (optional)
    input_tokens INT DEFAULT 0,
    output_tokens INT DEFAULT 0,
    model_used TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE conversation_summaries (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    summary TEXT NOT NULL,
    message_count_at_creation INT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_messages_conv_created ON messages(conversation_id, created_at);
CREATE INDEX idx_summaries_conversation ON conversation_summaries(conversation_id);

Persistence Layer (asyncpg)

import asyncpg
from uuid import UUID

class ConversationStore:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._pool: asyncpg.Pool | None = None

    async def initialize(self):
        self._pool = await asyncpg.create_pool(self.dsn)

    async def create_conversation(
        self, user_id: str, system_prompt: str, model: str, title: str = "New Conversation"
    ) -> str:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "INSERT INTO conversations (user_id, system_prompt, model, title) "
                "VALUES ($1, $2, $3, $4) RETURNING id",
                user_id, system_prompt, model, title
            )
            return str(row["id"])

    async def append_message(
        self, conversation_id: str, role: str, content: str,
        input_tokens: int = 0, output_tokens: int = 0, model_used: str = None
    ) -> str:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "INSERT INTO messages (conversation_id, role, content, "
                "input_tokens, output_tokens, model_used) "
                "VALUES ($1, $2, $3, $4, $5, $6) RETURNING id",
                conversation_id, role, content, input_tokens, output_tokens, model_used
            )
            await conn.execute(
                "UPDATE conversations SET last_updated = NOW() WHERE id = $1",
                conversation_id
            )
            return str(row["id"])

    async def get_recent_messages(self, conversation_id: str, limit: int = 20) -> list[dict]:
        async with self._pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT role, content FROM messages "
                "WHERE conversation_id = $1 "
                "ORDER BY created_at DESC LIMIT $2",
                conversation_id, limit
            )
            return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)]

    async def save_summary(self, conversation_id: str, summary: str, message_count: int):
        async with self._pool.acquire() as conn:
            await conn.execute(
                "INSERT INTO conversation_summaries "
                "(conversation_id, summary, message_count_at_creation) VALUES ($1, $2, $3)",
                conversation_id, summary, message_count
            )

    async def get_latest_summary(self, conversation_id: str) -> str | None:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT summary FROM conversation_summaries "
                "WHERE conversation_id = $1 ORDER BY created_at DESC LIMIT 1",
                conversation_id
            )
            return row["summary"] if row else None

8.5 Full Persistent Conversation Assistant

Combining all components:

import asyncio
import anthropic

class PersistentAssistant:
    """
    Production-grade persistent multi-turn conversation assistant.
    Features: automatic compression, database persistence, cost tracking.
    """

    def __init__(
        self,
        store: ConversationStore,
        model: str = "claude-sonnet-4-6",
        max_context_tokens: int = 10_000,
        summary_model: str = "claude-haiku-4-5-20251001",
    ):
        self.store = store
        self.client = anthropic.Anthropic()
        self.model = model
        self.max_context_tokens = max_context_tokens
        self.summary_model = summary_model

    async def start_session(self, user_id: str, system_prompt: str, **kwargs) -> str:
        return await self.store.create_conversation(
            user_id=user_id, system_prompt=system_prompt, model=self.model, **kwargs
        )

    async def chat(self, session_id: str, user_message: str, system_prompt: str) -> dict:
        # Load recent history
        recent = await self.store.get_recent_messages(session_id, limit=30)

        # Build context (with summary injection if needed)
        context = await self._build_context(session_id, recent)
        context.append({"role": "user", "content": user_message})

        # Call API
        resp = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            system=system_prompt,
            messages=context,
        )

        reply = resp.content[0].text

        # Persist both turns
        await self.store.append_message(
            session_id, "user", user_message,
            input_tokens=resp.usage.input_tokens,
        )
        await self.store.append_message(
            session_id, "assistant", reply,
            output_tokens=resp.usage.output_tokens,
            model_used=resp.model,
        )

        # Trigger background summarization if history is getting long
        if len(recent) > 24:
            asyncio.create_task(self._maybe_summarize(session_id, recent))

        return {
            "reply": reply,
            "input_tokens": resp.usage.input_tokens,
            "output_tokens": resp.usage.output_tokens,
            "context_messages_sent": len(context),
        }

    async def _build_context(self, session_id: str, recent: list[dict]) -> list[dict]:
        estimated = sum(len(m.get("content", "")) // 4 for m in recent)

        if estimated <= self.max_context_tokens:
            return list(recent)

        summary = await self.store.get_latest_summary(session_id)
        trimmed = recent[-10:]

        if summary:
            return [
                {"role": "user",
                 "content": f"[Earlier conversation summary]\n{summary}\n\n"
                             f"(Please keep this background in mind as we continue.)"},
                {"role": "assistant",
                 "content": "Understood, I have the context from our earlier exchange."},
                *trimmed,
            ]
        return trimmed

    async def _maybe_summarize(self, session_id: str, messages: list[dict]):
        """Background task: generate and save a summary of older messages."""
        to_summarize = messages[: len(messages) * 2 // 3]
        if len(to_summarize) < 6:
            return

        history_text = "\n".join(
            f"{'User' if m['role'] == 'user' else 'Claude'}: {m['content'][:250]}"
            for m in to_summarize
        )

        resp = self.client.messages.create(
            model=self.summary_model,
            max_tokens=400,
            messages=[{"role": "user", "content":
                f"Summarize the key points, decisions, and user background from "
                f"this conversation in 5โ€“7 sentences:\n\n{history_text}"}]
        )

        await self.store.save_summary(
            session_id, resp.content[0].text, len(to_summarize)
        )

8.6 Cost Tracking and Budget Enforcement

from collections import defaultdict
from datetime import date

class CostTracker:
    PRICING = {
        "claude-opus-4-6":           {"input": 15.00, "output": 75.00},
        "claude-sonnet-4-6":         {"input":  3.00, "output": 15.00},
        "claude-haiku-4-5-20251001": {"input":  0.25, "output":  1.25},
    }

    def __init__(self):
        self._usage: dict[str, dict] = defaultdict(
            lambda: {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, "requests": 0}
        )

    def record(self, user_id: str, model: str, input_tokens: int, output_tokens: int):
        prices = self.PRICING.get(model, {"input": 3.0, "output": 15.0})
        cost = (input_tokens * prices["input"] + output_tokens * prices["output"]) / 1_000_000
        key = f"{user_id}:{date.today()}"
        rec = self._usage[key]
        rec["input_tokens"] += input_tokens
        rec["output_tokens"] += output_tokens
        rec["cost_usd"] += cost
        rec["requests"] += 1

    def daily_cost(self, user_id: str) -> float:
        return self._usage[f"{user_id}:{date.today()}"]["cost_usd"]

    def within_budget(self, user_id: str, daily_limit_usd: float) -> bool:
        return self.daily_cost(user_id) < daily_limit_usd

8.7 Common Pitfalls in Multi-Turn Systems

Pitfall 1: Storing Context in Process Memory Only

If your conversation state lives only in process memory, a server restart loses all conversations. Always persist to a durable store before responding to the user.

Pitfall 2: Not Handling the Alternation Constraint

When loading history from a database, check for edge cases: a crash between saving the user message and the assistant message can leave you with an uneven sequence. Add a validation step before sending to the API.

def validate_message_sequence(messages: list[dict]) -> list[dict]:
    """
    Ensure messages strictly alternate user/assistant.
    Removes trailing assistant messages (incomplete turns).
    """
    if not messages:
        return []

    # Remove consecutive duplicates
    cleaned = [messages[0]]
    for msg in messages[1:]:
        if msg["role"] != cleaned[-1]["role"]:
            cleaned.append(msg)

    # Must end on a user message
    if cleaned and cleaned[-1]["role"] == "assistant":
        cleaned = cleaned[:-1]

    return cleaned

Pitfall 3: Summarizing Too Aggressively

Summarization loses detail. If you compress a 20-turn conversation into 3 sentences, nuanced user requirements stated in turn 3 may be lost by turn 15. Tune compression thresholds conservatively: compress only when the history exceeds a clear threshold, and keep a generous number of recent turns verbatim.

Pitfall 4: Not Accounting for System Prompt in Token Budget

The system prompt is billed on every request. A 3,000-token system prompt adds $0.009 per request with Sonnetโ€”which is $9 per 1,000 requests. At 100,000 daily requests, that's $900/day just for the system prompt. Include it when estimating your context token budget.


Summary

Multi-turn conversation management is a core engineering concern, not an afterthought:

  1. The API is stateless: conversation history grows linearly with turns; unmanaged, this compounds costs and latency
  2. Three strategies for context control:
    • Sliding window: simplest; loses early context
    • Summarization compression: preserves semantic content; right for most production cases
    • Structured memory extraction: most precise; right for preference-tracking assistants
  3. Database persistence: always persist to durable storage; design for the case where a process dies mid-turn
  4. Cost tracking: record per-user, per-day token consumption; enforce budgets before they blow up
  5. Validation: always validate message alternation before sending history to the API

The conversation management architecture you choose will touch every other part of the systemโ€”rate limiting, cost monitoring, session recovery. Making the right choice in the first week saves significant refactoring later.

Rate this chapter
4.8  / 5  (65 ratings)

๐Ÿ’ฌ Comments