Chapter 16

Adaptive Thinking: Opus 4.7 Adaptive Reasoning and Interleaved Thinking in Practice

Chapter 16: Adaptive Thinking: Dynamic Reasoning Depth and Task-Adaptive Strategies

16.1 What Is Adaptive Thinking?

Adaptive Thinking is an advanced usage pattern built on top of Extended Thinking. Rather than applying a single fixed thinking budget to every request, the system dynamically calibrates reasoning depth based on task complexity, cost constraints, and quality requirements.

From an engineering perspective, Adaptive Thinking is a design pattern that involves:

  1. Complexity assessment — quickly estimating how hard a problem is before committing resources
  2. Dynamic budget allocation — assigning appropriate budget_tokens based on the assessment
  3. Tiered model selection — routing tasks to the right model at the right tier
  4. Quality verification loops — detecting inadequate outputs and retrying at a higher thinking budget

This approach is critical in production systems where applying maximum thinking budgets to all requests wastes money, but using zero thinking on hard problems degrades quality significantly.

16.2 Building a Complexity Classifier

LLM-based complexity probe

import anthropic
from enum import Enum

client = anthropic.Anthropic()

class TaskComplexity(Enum):
    TRIVIAL = "trivial"    # Direct fact recall, no reasoning needed
    SIMPLE = "simple"      # Single-step reasoning
    MEDIUM = "medium"      # Multi-step analysis
    COMPLEX = "complex"    # Deep multi-angle evaluation
    EXPERT = "expert"      # Research-level, full reasoning power required

def classify_with_llm(task: str) -> TaskComplexity:
    """Use Haiku to estimate complexity — fast and cheap."""
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=10,
        system="""Rate task complexity with one word:
trivial (simple fact/definition)
simple (one-step reasoning)
medium (multi-step analysis)
complex (deep trade-off evaluation)
expert (research-level problem)""",
        messages=[{"role": "user", "content": task}]
    )

    raw = response.content[0].text.strip().lower()
    mapping = {
        "trivial": TaskComplexity.TRIVIAL,
        "simple": TaskComplexity.SIMPLE,
        "medium": TaskComplexity.MEDIUM,
        "complex": TaskComplexity.COMPLEX,
        "expert": TaskComplexity.EXPERT
    }
    return mapping.get(raw, TaskComplexity.MEDIUM)

Rule-based classifier (zero latency)

For latency-sensitive applications, use heuristics to avoid an extra LLM call:

import re

class ComplexityHeuristics:
    """Rule-based complexity estimation with no LLM overhead."""

    EXPERT_KEYWORDS = [
        "prove", "derive", "time complexity", "architecture", "distributed",
        "concurrent", "optimize algorithm", "formal proof", "NP", "CAP theorem"
    ]
    COMPLEX_KEYWORDS = [
        "compare", "trade-off", "evaluate", "pros and cons", "design",
        "when to use", "best practice for", "benchmark"
    ]
    SIMPLE_KEYWORDS = [
        "what is", "define", "explain", "example of", "how does"
    ]

    @classmethod
    def classify(cls, task: str) -> TaskComplexity:
        low = task.lower()

        # Very short tasks are usually simple
        if len(task) < 30:
            return TaskComplexity.TRIVIAL

        if any(kw in low for kw in cls.EXPERT_KEYWORDS):
            return TaskComplexity.EXPERT if len(task) > 100 else TaskComplexity.COMPLEX

        if any(kw in low for kw in cls.COMPLEX_KEYWORDS):
            return TaskComplexity.COMPLEX

        if any(kw in low for kw in cls.SIMPLE_KEYWORDS):
            return TaskComplexity.SIMPLE

        # Code or math symbols signal higher complexity
        if "```" in task or re.search(r'O\([nk]', task):
            return TaskComplexity.MEDIUM

        return TaskComplexity.MEDIUM   # Safe default

16.3 The Adaptive Thinking Router

from dataclasses import dataclass
from typing import Optional

@dataclass
class ThinkingConfig:
    model: str
    budget_tokens: Optional[int]
    max_tokens: int
    label: str

TIERS: dict[TaskComplexity, ThinkingConfig] = {
    TaskComplexity.TRIVIAL: ThinkingConfig(
        "claude-haiku-4-5-20251001", None, 256, "Direct answer"
    ),
    TaskComplexity.SIMPLE: ThinkingConfig(
        "claude-haiku-4-5-20251001", None, 512, "No extended thinking"
    ),
    TaskComplexity.MEDIUM: ThinkingConfig(
        "claude-sonnet-4-6", 3000, 6000, "Light thinking"
    ),
    TaskComplexity.COMPLEX: ThinkingConfig(
        "claude-sonnet-4-6", 8000, 12000, "Deep thinking"
    ),
    TaskComplexity.EXPERT: ThinkingConfig(
        "claude-opus-4-6", 20000, 28000, "Maximum reasoning"
    ),
}

class AdaptiveRouter:
    def __init__(
        self,
        client: anthropic.Anthropic,
        classifier: str = "heuristic",
        cost_ceiling_usd: float = 0.10
    ):
        self.client = client
        self.classifier = classifier
        self.cost_ceiling = cost_ceiling_usd

    def _classify(self, task: str) -> TaskComplexity:
        if self.classifier == "heuristic":
            return ComplexityHeuristics.classify(task)
        return classify_with_llm(task)

    def _safe_config(self, complexity: TaskComplexity) -> ThinkingConfig:
        cfg = TIERS[complexity]
        if cfg.budget_tokens:
            estimated_cost = cfg.budget_tokens * 75 / 1e6
            if estimated_cost > self.cost_ceiling:
                # Downgrade one level
                levels = list(TaskComplexity)
                idx = levels.index(complexity)
                if idx > 0:
                    return TIERS[levels[idx - 1]]
        return cfg

    def solve(
        self,
        task: str,
        system: str = "",
        force_complexity: Optional[TaskComplexity] = None
    ) -> dict:
        complexity = force_complexity or self._classify(task)
        cfg = self._safe_config(complexity)

        kwargs = {
            "model": cfg.model,
            "max_tokens": cfg.max_tokens,
            "messages": [{"role": "user", "content": task}]
        }
        if system:
            kwargs["system"] = system
        if cfg.budget_tokens:
            kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}

        response = self.client.messages.create(**kwargs)

        thinking_text = ""
        answer = ""
        for block in response.content:
            if block.type == "thinking":
                thinking_text = block.thinking
            elif block.type == "text":
                answer = block.text

        return {
            "answer": answer,
            "complexity": complexity.value,
            "model": cfg.model,
            "tier_label": cfg.label,
            "thinking_used": bool(thinking_text),
            "thinking_chars": len(thinking_text),
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        }

# Usage
router = AdaptiveRouter(client)

questions = [
    "What is HTTP?",
    "Explain REST API design principles.",
    "Compare GraphQL vs REST — when should you use each?",
    "Design a message system architecture supporting 100M active users."
]

for q in questions:
    r = router.solve(q)
    print(f"\nQ: {q[:60]}")
    print(f"  Tier: {r['complexity']} | Model: {r['model']} | "
          f"Thinking: {r['thinking_used']} ({r['thinking_chars']} chars)")

16.4 Quality-Gated Retry

For critical tasks, detect low-quality outputs and automatically retry at a higher thinking budget:

from typing import Callable

def solve_with_quality_gate(
    client: anthropic.Anthropic,
    task: str,
    quality_fn: Callable[[str], float],   # Returns 0.0–1.0
    min_quality: float = 0.8,
    max_attempts: int = 3
) -> dict:
    """Try with increasing reasoning depth until quality threshold is met."""

    budgets  = [0,    3000,   10000]
    models   = ["claude-haiku-4-5-20251001", "claude-sonnet-4-6", "claude-opus-4-6"]

    answer, quality = "", 0.0

    for attempt in range(min(max_attempts, len(budgets))):
        budget = budgets[attempt]
        model = models[attempt]

        kwargs = {
            "model": model,
            "max_tokens": (budget + 2048) if budget > 0 else 2048,
            "messages": [{"role": "user", "content": task}]
        }
        if budget > 0:
            kwargs["thinking"] = {"type": "enabled", "budget_tokens": budget}

        response = client.messages.create(**kwargs)
        answer = next((b.text for b in response.content if b.type == "text"), "")
        quality = quality_fn(answer)

        if quality >= min_quality:
            return {"answer": answer, "attempts": attempt + 1,
                    "model": model, "quality": quality}

        print(f"Attempt {attempt+1}: quality {quality:.2f} < {min_quality}, escalating...")

    return {"answer": answer, "attempts": max_attempts,
            "model": models[max_attempts - 1], "quality": quality,
            "warning": "Quality threshold not met"}

# LLM-based quality scorer
def llm_quality_score(answer: str) -> float:
    r = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=5,
        messages=[{
            "role": "user",
            "content": f"Rate this technical answer quality 0.0-1.0 (number only):\n\n{answer[:500]}"
        }]
    )
    try:
        return float(r.content[0].text.strip())
    except ValueError:
        return 0.5

16.5 Multi-Stage Reasoning Pipeline

For the hardest problems, a decompose → analyze → synthesize pipeline often outperforms a single large thinking request:

import json

def multi_stage_solve(client: anthropic.Anthropic, problem: str) -> dict:
    """
    Stage 1: Decompose complex problem into sub-problems
    Stage 2: Solve each sub-problem with focused thinking
    Stage 3: Synthesize into a coherent final answer
    """

    # --- Stage 1: Decomposition ---
    r1 = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        thinking={"type": "enabled", "budget_tokens": 2000},
        messages=[{
            "role": "user",
            "content": f"""Break this problem into 3-5 independent sub-problems that can each be analyzed separately.

Problem: {problem}

Output JSON: {{"subproblems": ["...", "..."]}}"""
        }, {"role": "assistant", "content": "{"}]
    )

    try:
        subproblems = json.loads("{" + r1.content[-1].text).get("subproblems", [problem])
    except (json.JSONDecodeError, AttributeError):
        subproblems = [problem]

    # --- Stage 2: Parallel sub-problem analysis ---
    sub_analyses = []
    for i, sp in enumerate(subproblems):
        print(f"Analyzing sub-problem {i+1}/{len(subproblems)}: {sp[:50]}...")
        r = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=6000,
            thinking={"type": "enabled", "budget_tokens": 4000},
            messages=[{"role": "user", "content": sp}]
        )
        analysis = next((b.text for b in r.content if b.type == "text"), "")
        sub_analyses.append({"sub": sp, "analysis": analysis})

    # --- Stage 3: Synthesis ---
    synthesis_prompt = f"Original problem:\n{problem}\n\n"
    for item in sub_analyses:
        synthesis_prompt += f"Sub-problem: {item['sub']}\nAnalysis: {item['analysis']}\n\n"
    synthesis_prompt += "Synthesize the above analyses into a coherent final answer."

    r_final = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=16000,
        thinking={"type": "enabled", "budget_tokens": 10000},
        messages=[{"role": "user", "content": synthesis_prompt}]
    )

    return {
        "subproblems": subproblems,
        "sub_analyses": sub_analyses,
        "final_answer": next((b.text for b in r_final.content if b.type == "text"), ""),
        "stages": 3
    }

16.6 Real-Time Adaptive Conversation Manager

from collections import defaultdict

class AdaptiveConversation:
    """Manage a multi-turn conversation with dynamic thinking depth per turn."""

    def __init__(self, client: anthropic.Anthropic, budget_usd: float = 1.0):
        self.client = client
        self.budget = budget_usd
        self.spent = 0.0
        self.history: list = []
        self.complexities: list[TaskComplexity] = []

    def chat(self, user_message: str) -> str:
        remaining = self.budget - self.spent

        if remaining < 0.001:
            complexity = TaskComplexity.TRIVIAL
        else:
            complexity = ComplexityHeuristics.classify(user_message)
            # Trend adjustment: if recent turns were complex, elevate default
            if len(self.complexities) >= 3:
                hard = sum(
                    1 for c in self.complexities[-3:]
                    if c in (TaskComplexity.COMPLEX, TaskComplexity.EXPERT)
                )
                if hard >= 2 and complexity == TaskComplexity.MEDIUM:
                    complexity = TaskComplexity.COMPLEX

        self.complexities.append(complexity)
        cfg = TIERS[complexity]

        self.history.append({"role": "user", "content": user_message})

        kwargs = {
            "model": cfg.model,
            "max_tokens": cfg.max_tokens,
            "messages": self.history
        }
        if cfg.budget_tokens:
            kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}

        response = self.client.messages.create(**kwargs)

        # Track cost (approximate using Opus pricing as worst case)
        cost = (response.usage.input_tokens * 15 + response.usage.output_tokens * 75) / 1e6
        self.spent += cost

        answer = next((b.text for b in response.content if b.type == "text"), "")

        # Keep text-only history to control context growth
        self.history.append({"role": "assistant", "content": answer})

        return answer

    def stats(self) -> dict:
        dist = defaultdict(int)
        for c in self.complexities:
            dist[c.value] += 1
        return {
            "turns": len(self.complexities),
            "complexity_distribution": dict(dist),
            "spent_usd": round(self.spent, 5),
            "remaining_usd": round(self.budget - self.spent, 5)
        }

16.7 Async Adaptive Streaming

import asyncio, anthropic

async def adaptive_stream_async(
    client: anthropic.AsyncAnthropic,
    task: str,
    show_thinking: bool = False
) -> str:
    complexity = ComplexityHeuristics.classify(task)
    cfg = TIERS[complexity]

    kwargs = {
        "model": cfg.model,
        "max_tokens": cfg.max_tokens,
        "messages": [{"role": "user", "content": task}]
    }
    if cfg.budget_tokens:
        kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}

    print(f"[{complexity.value} | {cfg.model}] ", end="")

    parts = []
    async with client.messages.stream(**kwargs) as stream:
        async for event in stream:
            if event.type == "content_block_delta":
                if event.delta.type == "thinking_delta" and show_thinking:
                    print(event.delta.thinking, end="", flush=True)
                elif event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                    parts.append(event.delta.text)

    return "".join(parts)

async def parallel_adaptive(tasks: list[str]) -> list[str]:
    """Process multiple tasks concurrently, each with its own adaptive tier."""
    async_client = anthropic.AsyncAnthropic()
    return await asyncio.gather(*[adaptive_stream_async(async_client, t) for t in tasks])

16.8 Monitoring and Continuous Tuning

Metrics tracking

from dataclasses import dataclass, field
import time

@dataclass
class AdaptiveMetrics:
    requests: dict = field(default_factory=lambda: defaultdict(int))
    costs: dict = field(default_factory=lambda: defaultdict(float))
    latencies: dict = field(default_factory=lambda: defaultdict(list))
    quality_scores: list = field(default_factory=list)

    def record(self, tier: str, cost: float, latency: float, quality: float = None):
        self.requests[tier] += 1
        self.costs[tier] += cost
        self.latencies[tier].append(latency)
        if quality is not None:
            self.quality_scores.append(quality)

    def report(self) -> dict:
        total_req = sum(self.requests.values())
        return {
            "total_requests": total_req,
            "total_cost_usd": round(sum(self.costs.values()), 4),
            "by_tier": {
                tier: {
                    "count": self.requests[tier],
                    "pct": f"{self.requests[tier]/total_req:.0%}",
                    "avg_cost_usd": round(self.costs[tier] / self.requests[tier], 6),
                    "avg_latency_s": round(
                        sum(self.latencies[tier]) / len(self.latencies[tier]), 2
                    )
                }
                for tier in self.requests
            },
            "avg_quality": round(
                sum(self.quality_scores) / len(self.quality_scores), 3
            ) if self.quality_scores else None
        }

When to tune your classifier

Watch for these signals in your metrics:


Summary

Adaptive Thinking elevates Extended Thinking from a binary on/off feature to a fine-grained cost-quality optimization system. Core takeaways:

  1. Tier your reasoning — trivial queries need Haiku and zero thinking; expert problems need Opus and maximum budget
  2. Two classifier options — LLM classifier (accurate, ~200ms overhead) vs heuristic classifier (zero latency, requires maintenance)
  3. Quality-gated retry — start cheap, escalate automatically when quality is insufficient
  4. Multi-stage decomposition — break → analyze → synthesize outperforms a single giant thinking block for the hardest problems
  5. Budget tracking in conversations — prevent cost overruns by monitoring cumulative spend per session
  6. Continuous monitoring — track quality and cost per tier to tune your classifier thresholds over time

The goal of Adaptive Thinking: every token of reasoning capacity is spent exactly where it creates value.

Rate this chapter
4.5  / 5  (23 ratings)

💬 Comments