Adaptive Thinking: Opus 4.7 Adaptive Reasoning and Interleaved Thinking in Practice
Chapter 16: Adaptive Thinking: Dynamic Reasoning Depth and Task-Adaptive Strategies
16.1 What Is Adaptive Thinking?
Adaptive Thinking is an advanced usage pattern built on top of Extended Thinking. Rather than applying a single fixed thinking budget to every request, the system dynamically calibrates reasoning depth based on task complexity, cost constraints, and quality requirements.
From an engineering perspective, Adaptive Thinking is a design pattern that involves:
- Complexity assessment โ quickly estimating how hard a problem is before committing resources
- Dynamic budget allocation โ assigning appropriate
budget_tokensbased on the assessment - Tiered model selection โ routing tasks to the right model at the right tier
- Quality verification loops โ detecting inadequate outputs and retrying at a higher thinking budget
This approach is critical in production systems where applying maximum thinking budgets to all requests wastes money, but using zero thinking on hard problems degrades quality significantly.
16.2 Building a Complexity Classifier
LLM-based complexity probe
import anthropic
from enum import Enum
client = anthropic.Anthropic()
class TaskComplexity(Enum):
TRIVIAL = "trivial" # Direct fact recall, no reasoning needed
SIMPLE = "simple" # Single-step reasoning
MEDIUM = "medium" # Multi-step analysis
COMPLEX = "complex" # Deep multi-angle evaluation
EXPERT = "expert" # Research-level, full reasoning power required
def classify_with_llm(task: str) -> TaskComplexity:
"""Use Haiku to estimate complexity โ fast and cheap."""
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
system="""Rate task complexity with one word:
trivial (simple fact/definition)
simple (one-step reasoning)
medium (multi-step analysis)
complex (deep trade-off evaluation)
expert (research-level problem)""",
messages=[{"role": "user", "content": task}]
)
raw = response.content[0].text.strip().lower()
mapping = {
"trivial": TaskComplexity.TRIVIAL,
"simple": TaskComplexity.SIMPLE,
"medium": TaskComplexity.MEDIUM,
"complex": TaskComplexity.COMPLEX,
"expert": TaskComplexity.EXPERT
}
return mapping.get(raw, TaskComplexity.MEDIUM)
Rule-based classifier (zero latency)
For latency-sensitive applications, use heuristics to avoid an extra LLM call:
import re
class ComplexityHeuristics:
"""Rule-based complexity estimation with no LLM overhead."""
EXPERT_KEYWORDS = [
"prove", "derive", "time complexity", "architecture", "distributed",
"concurrent", "optimize algorithm", "formal proof", "NP", "CAP theorem"
]
COMPLEX_KEYWORDS = [
"compare", "trade-off", "evaluate", "pros and cons", "design",
"when to use", "best practice for", "benchmark"
]
SIMPLE_KEYWORDS = [
"what is", "define", "explain", "example of", "how does"
]
@classmethod
def classify(cls, task: str) -> TaskComplexity:
low = task.lower()
# Very short tasks are usually simple
if len(task) < 30:
return TaskComplexity.TRIVIAL
if any(kw in low for kw in cls.EXPERT_KEYWORDS):
return TaskComplexity.EXPERT if len(task) > 100 else TaskComplexity.COMPLEX
if any(kw in low for kw in cls.COMPLEX_KEYWORDS):
return TaskComplexity.COMPLEX
if any(kw in low for kw in cls.SIMPLE_KEYWORDS):
return TaskComplexity.SIMPLE
# Code or math symbols signal higher complexity
if "```" in task or re.search(r'O\([nk]', task):
return TaskComplexity.MEDIUM
return TaskComplexity.MEDIUM # Safe default
16.3 The Adaptive Thinking Router
from dataclasses import dataclass
from typing import Optional
@dataclass
class ThinkingConfig:
model: str
budget_tokens: Optional[int]
max_tokens: int
label: str
TIERS: dict[TaskComplexity, ThinkingConfig] = {
TaskComplexity.TRIVIAL: ThinkingConfig(
"claude-haiku-4-5-20251001", None, 256, "Direct answer"
),
TaskComplexity.SIMPLE: ThinkingConfig(
"claude-haiku-4-5-20251001", None, 512, "No extended thinking"
),
TaskComplexity.MEDIUM: ThinkingConfig(
"claude-sonnet-4-6", 3000, 6000, "Light thinking"
),
TaskComplexity.COMPLEX: ThinkingConfig(
"claude-sonnet-4-6", 8000, 12000, "Deep thinking"
),
TaskComplexity.EXPERT: ThinkingConfig(
"claude-opus-4-6", 20000, 28000, "Maximum reasoning"
),
}
class AdaptiveRouter:
def __init__(
self,
client: anthropic.Anthropic,
classifier: str = "heuristic",
cost_ceiling_usd: float = 0.10
):
self.client = client
self.classifier = classifier
self.cost_ceiling = cost_ceiling_usd
def _classify(self, task: str) -> TaskComplexity:
if self.classifier == "heuristic":
return ComplexityHeuristics.classify(task)
return classify_with_llm(task)
def _safe_config(self, complexity: TaskComplexity) -> ThinkingConfig:
cfg = TIERS[complexity]
if cfg.budget_tokens:
estimated_cost = cfg.budget_tokens * 75 / 1e6
if estimated_cost > self.cost_ceiling:
# Downgrade one level
levels = list(TaskComplexity)
idx = levels.index(complexity)
if idx > 0:
return TIERS[levels[idx - 1]]
return cfg
def solve(
self,
task: str,
system: str = "",
force_complexity: Optional[TaskComplexity] = None
) -> dict:
complexity = force_complexity or self._classify(task)
cfg = self._safe_config(complexity)
kwargs = {
"model": cfg.model,
"max_tokens": cfg.max_tokens,
"messages": [{"role": "user", "content": task}]
}
if system:
kwargs["system"] = system
if cfg.budget_tokens:
kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}
response = self.client.messages.create(**kwargs)
thinking_text = ""
answer = ""
for block in response.content:
if block.type == "thinking":
thinking_text = block.thinking
elif block.type == "text":
answer = block.text
return {
"answer": answer,
"complexity": complexity.value,
"model": cfg.model,
"tier_label": cfg.label,
"thinking_used": bool(thinking_text),
"thinking_chars": len(thinking_text),
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Usage
router = AdaptiveRouter(client)
questions = [
"What is HTTP?",
"Explain REST API design principles.",
"Compare GraphQL vs REST โ when should you use each?",
"Design a message system architecture supporting 100M active users."
]
for q in questions:
r = router.solve(q)
print(f"\nQ: {q[:60]}")
print(f" Tier: {r['complexity']} | Model: {r['model']} | "
f"Thinking: {r['thinking_used']} ({r['thinking_chars']} chars)")
16.4 Quality-Gated Retry
For critical tasks, detect low-quality outputs and automatically retry at a higher thinking budget:
from typing import Callable
def solve_with_quality_gate(
client: anthropic.Anthropic,
task: str,
quality_fn: Callable[[str], float], # Returns 0.0โ1.0
min_quality: float = 0.8,
max_attempts: int = 3
) -> dict:
"""Try with increasing reasoning depth until quality threshold is met."""
budgets = [0, 3000, 10000]
models = ["claude-haiku-4-5-20251001", "claude-sonnet-4-6", "claude-opus-4-6"]
answer, quality = "", 0.0
for attempt in range(min(max_attempts, len(budgets))):
budget = budgets[attempt]
model = models[attempt]
kwargs = {
"model": model,
"max_tokens": (budget + 2048) if budget > 0 else 2048,
"messages": [{"role": "user", "content": task}]
}
if budget > 0:
kwargs["thinking"] = {"type": "enabled", "budget_tokens": budget}
response = client.messages.create(**kwargs)
answer = next((b.text for b in response.content if b.type == "text"), "")
quality = quality_fn(answer)
if quality >= min_quality:
return {"answer": answer, "attempts": attempt + 1,
"model": model, "quality": quality}
print(f"Attempt {attempt+1}: quality {quality:.2f} < {min_quality}, escalating...")
return {"answer": answer, "attempts": max_attempts,
"model": models[max_attempts - 1], "quality": quality,
"warning": "Quality threshold not met"}
# LLM-based quality scorer
def llm_quality_score(answer: str) -> float:
r = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=5,
messages=[{
"role": "user",
"content": f"Rate this technical answer quality 0.0-1.0 (number only):\n\n{answer[:500]}"
}]
)
try:
return float(r.content[0].text.strip())
except ValueError:
return 0.5
16.5 Multi-Stage Reasoning Pipeline
For the hardest problems, a decompose โ analyze โ synthesize pipeline often outperforms a single large thinking request:
import json
def multi_stage_solve(client: anthropic.Anthropic, problem: str) -> dict:
"""
Stage 1: Decompose complex problem into sub-problems
Stage 2: Solve each sub-problem with focused thinking
Stage 3: Synthesize into a coherent final answer
"""
# --- Stage 1: Decomposition ---
r1 = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2000,
thinking={"type": "enabled", "budget_tokens": 2000},
messages=[{
"role": "user",
"content": f"""Break this problem into 3-5 independent sub-problems that can each be analyzed separately.
Problem: {problem}
Output JSON: {{"subproblems": ["...", "..."]}}"""
}, {"role": "assistant", "content": "{"}]
)
try:
subproblems = json.loads("{" + r1.content[-1].text).get("subproblems", [problem])
except (json.JSONDecodeError, AttributeError):
subproblems = [problem]
# --- Stage 2: Parallel sub-problem analysis ---
sub_analyses = []
for i, sp in enumerate(subproblems):
print(f"Analyzing sub-problem {i+1}/{len(subproblems)}: {sp[:50]}...")
r = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=6000,
thinking={"type": "enabled", "budget_tokens": 4000},
messages=[{"role": "user", "content": sp}]
)
analysis = next((b.text for b in r.content if b.type == "text"), "")
sub_analyses.append({"sub": sp, "analysis": analysis})
# --- Stage 3: Synthesis ---
synthesis_prompt = f"Original problem:\n{problem}\n\n"
for item in sub_analyses:
synthesis_prompt += f"Sub-problem: {item['sub']}\nAnalysis: {item['analysis']}\n\n"
synthesis_prompt += "Synthesize the above analyses into a coherent final answer."
r_final = client.messages.create(
model="claude-opus-4-6",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": synthesis_prompt}]
)
return {
"subproblems": subproblems,
"sub_analyses": sub_analyses,
"final_answer": next((b.text for b in r_final.content if b.type == "text"), ""),
"stages": 3
}
16.6 Real-Time Adaptive Conversation Manager
from collections import defaultdict
class AdaptiveConversation:
"""Manage a multi-turn conversation with dynamic thinking depth per turn."""
def __init__(self, client: anthropic.Anthropic, budget_usd: float = 1.0):
self.client = client
self.budget = budget_usd
self.spent = 0.0
self.history: list = []
self.complexities: list[TaskComplexity] = []
def chat(self, user_message: str) -> str:
remaining = self.budget - self.spent
if remaining < 0.001:
complexity = TaskComplexity.TRIVIAL
else:
complexity = ComplexityHeuristics.classify(user_message)
# Trend adjustment: if recent turns were complex, elevate default
if len(self.complexities) >= 3:
hard = sum(
1 for c in self.complexities[-3:]
if c in (TaskComplexity.COMPLEX, TaskComplexity.EXPERT)
)
if hard >= 2 and complexity == TaskComplexity.MEDIUM:
complexity = TaskComplexity.COMPLEX
self.complexities.append(complexity)
cfg = TIERS[complexity]
self.history.append({"role": "user", "content": user_message})
kwargs = {
"model": cfg.model,
"max_tokens": cfg.max_tokens,
"messages": self.history
}
if cfg.budget_tokens:
kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}
response = self.client.messages.create(**kwargs)
# Track cost (approximate using Opus pricing as worst case)
cost = (response.usage.input_tokens * 15 + response.usage.output_tokens * 75) / 1e6
self.spent += cost
answer = next((b.text for b in response.content if b.type == "text"), "")
# Keep text-only history to control context growth
self.history.append({"role": "assistant", "content": answer})
return answer
def stats(self) -> dict:
dist = defaultdict(int)
for c in self.complexities:
dist[c.value] += 1
return {
"turns": len(self.complexities),
"complexity_distribution": dict(dist),
"spent_usd": round(self.spent, 5),
"remaining_usd": round(self.budget - self.spent, 5)
}
16.7 Async Adaptive Streaming
import asyncio, anthropic
async def adaptive_stream_async(
client: anthropic.AsyncAnthropic,
task: str,
show_thinking: bool = False
) -> str:
complexity = ComplexityHeuristics.classify(task)
cfg = TIERS[complexity]
kwargs = {
"model": cfg.model,
"max_tokens": cfg.max_tokens,
"messages": [{"role": "user", "content": task}]
}
if cfg.budget_tokens:
kwargs["thinking"] = {"type": "enabled", "budget_tokens": cfg.budget_tokens}
print(f"[{complexity.value} | {cfg.model}] ", end="")
parts = []
async with client.messages.stream(**kwargs) as stream:
async for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta" and show_thinking:
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
parts.append(event.delta.text)
return "".join(parts)
async def parallel_adaptive(tasks: list[str]) -> list[str]:
"""Process multiple tasks concurrently, each with its own adaptive tier."""
async_client = anthropic.AsyncAnthropic()
return await asyncio.gather(*[adaptive_stream_async(async_client, t) for t in tasks])
16.8 Monitoring and Continuous Tuning
Metrics tracking
from dataclasses import dataclass, field
import time
@dataclass
class AdaptiveMetrics:
requests: dict = field(default_factory=lambda: defaultdict(int))
costs: dict = field(default_factory=lambda: defaultdict(float))
latencies: dict = field(default_factory=lambda: defaultdict(list))
quality_scores: list = field(default_factory=list)
def record(self, tier: str, cost: float, latency: float, quality: float = None):
self.requests[tier] += 1
self.costs[tier] += cost
self.latencies[tier].append(latency)
if quality is not None:
self.quality_scores.append(quality)
def report(self) -> dict:
total_req = sum(self.requests.values())
return {
"total_requests": total_req,
"total_cost_usd": round(sum(self.costs.values()), 4),
"by_tier": {
tier: {
"count": self.requests[tier],
"pct": f"{self.requests[tier]/total_req:.0%}",
"avg_cost_usd": round(self.costs[tier] / self.requests[tier], 6),
"avg_latency_s": round(
sum(self.latencies[tier]) / len(self.latencies[tier]), 2
)
}
for tier in self.requests
},
"avg_quality": round(
sum(self.quality_scores) / len(self.quality_scores), 3
) if self.quality_scores else None
}
When to tune your classifier
Watch for these signals in your metrics:
- Medium-tier quality below target: your heuristics are under-classifying โ raise the complexity estimate for those query patterns
- Expert-tier dominating requests: your classifier may be over-classifying โ review EXPERT_KEYWORDS for over-broad terms
- High costs without quality improvements: adjust budget ceilings per tier
- Low latency with high quality: you may have headroom to further reduce budgets at lower tiers
Summary
Adaptive Thinking elevates Extended Thinking from a binary on/off feature to a fine-grained cost-quality optimization system. Core takeaways:
- Tier your reasoning โ trivial queries need Haiku and zero thinking; expert problems need Opus and maximum budget
- Two classifier options โ LLM classifier (accurate, ~200ms overhead) vs heuristic classifier (zero latency, requires maintenance)
- Quality-gated retry โ start cheap, escalate automatically when quality is insufficient
- Multi-stage decomposition โ break โ analyze โ synthesize outperforms a single giant thinking block for the hardest problems
- Budget tracking in conversations โ prevent cost overruns by monitoring cumulative spend per session
- Continuous monitoring โ track quality and cost per tier to tune your classifier thresholds over time
The goal of Adaptive Thinking: every token of reasoning capacity is spent exactly where it creates value.