Batch Processing and Concurrent Inference Optimization
Chapter 61: Batch Processing and Concurrent Inference Optimization
Single-request inference is like a taxi: one passenger at a time, with the driver mostly waiting. Batch processing is like a bus: multiple passengers served simultaneously, dramatically lowering per-unit operating costs. Mastering batch processing is what makes a GPU earn its keep.
61.1 Batch Processing vs. Single Inference: Throughput Comparison
Why Batching Is More Efficient
GPU compute units (CUDA Cores / Tensor Cores) are parallel architecturesโthey thrive on processing large amounts of data simultaneously, not on serial single-request handling. In single-request mode, GPU utilization often sits at 20โ40%. With batch processing, utilization can reach 80โ95%.
Benchmark Results
Example: Hermes 3 70B (4ร A100):
| Batch Size | Throughput (tokens/sec) | Per-Request Latency (s) | GPU Utilization |
|---|---|---|---|
| 1 | 45 | 33.3 | 22% |
| 4 | 142 | 42.5 | 54% |
| 8 | 248 | 51.6 | 72% |
| 16 | 398 | 64.5 | 85% |
| 32 | 587 | 87.4 | 91% |
| 64 | 743 | 110.2 | 94% |
| 128 | 821 | 198.5 | 96% |
Key insights:
- At Batch=32, throughput is 13ร that of Batch=1, but latency increases only 2.6ร
- For offline batch tasks, optimal batch size is typically 64โ128
- For online services, balance between latency and throughput is required
import asyncio
import time
from typing import List
from dataclasses import dataclass
@dataclass
class InferenceRequest:
request_id: str
prompt: str
max_tokens: int = 1024
priority: int = 0
@dataclass
class InferenceResult:
request_id: str
output: str
latency_ms: float
tokens_generated: int
class BatchInferenceRunner:
def __init__(self, llm_client, batch_size: int = 16):
self.llm = llm_client
self.batch_size = batch_size
def run_single(self, request: InferenceRequest) -> InferenceResult:
start = time.time()
output = self.llm.complete(request.prompt, max_tokens=request.max_tokens)
latency = (time.time() - start) * 1000
return InferenceResult(
request_id=request.request_id, output=output,
latency_ms=latency, tokens_generated=len(output.split())
)
def run_batch(self, requests: List[InferenceRequest]) -> List[InferenceResult]:
start = time.time()
batches = [requests[i:i+self.batch_size] for i in range(0, len(requests), self.batch_size)]
results = []
for batch in batches:
outputs = self.llm.batch_complete([r.prompt for r in batch])
latency = (time.time() - start) * 1000
for req, output in zip(batch, outputs):
results.append(InferenceResult(
request_id=req.request_id, output=output,
latency_ms=latency, tokens_generated=len(output.split())
))
return results
61.2 Continuous Batching Explained
The Static Batching Problem
Traditional static batching requires all requests in a batch to start and finish together. Since generation lengths vary wildly, short requests must wait for long ones, leaving the GPU idle:
Static Batching (Batch=4):
Request A โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ (long)
Request B โโโโโโโโ (short โ waits)
Request C โโโโโโโโโโโโโโโโ (medium)
Request D โโโโ (short โ waits)
GPU utilization: ~55% (white areas = wasted)
How Continuous Batching Works
Continuous batching inserts new requests into the batch at any moment (when a sequence completes):
Continuous Batching:
t=0: [A, B, C, D] โ processing
t=T1: B completes โ insert E โ batch: [A, C, D, E]
t=T2: D completes โ insert F โ batch: [A, C, E, F]
t=T3: C completes โ insert G โ batch: [A, E, F, G]
GPU stays saturated โ utilization: 85โ95%
Concurrent Requests with vLLM
from openai import AsyncOpenAI
import asyncio
from typing import List
class ContinuousBatchingClient:
def __init__(self, base_url: str = "http://localhost:8000/v1"):
self.client = AsyncOpenAI(base_url=base_url, api_key="dummy")
self.model = "NousResearch/Hermes-3-Llama-3.1-70B"
async def complete_single(self, prompt: str, request_id: str = "") -> dict:
start = asyncio.get_event_loop().time()
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
)
latency = (asyncio.get_event_loop().time() - start) * 1000
return {
"request_id": request_id,
"output": response.choices[0].message.content,
"latency_ms": latency,
"tokens": response.usage.completion_tokens
}
async def concurrent_complete(self, requests: List[dict], concurrency: int = 32) -> List[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def bounded(req):
async with semaphore:
return await self.complete_single(req["prompt"], req.get("id", ""))
results = await asyncio.gather(*[bounded(r) for r in requests], return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def benchmark_concurrent(self, num_requests: int = 100) -> dict:
import random
requests = [
{"id": f"req_{i}", "prompt": f"Task {i}: " + "x" * random.randint(50, 200)}
for i in range(num_requests)
]
results = {}
for concurrency in [1, 4, 8, 16, 32]:
start = asyncio.get_event_loop().time()
completed = await self.concurrent_complete(requests, concurrency)
elapsed = asyncio.get_event_loop().time() - start
total_tokens = sum(r["tokens"] for r in completed)
results[f"concurrency_{concurrency}"] = {
"requests_per_sec": round(len(completed) / elapsed, 2),
"tokens_per_sec": round(total_tokens / elapsed, 1),
}
return results
61.3 Request Queue Design
import asyncio
import heapq
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Callable, Optional
class Priority(IntEnum):
LOW = 0; NORMAL = 1; HIGH = 2; CRITICAL = 3
@dataclass(order=True)
class QueuedRequest:
priority: int # Negative for max-heap behavior
timestamp: float
request: dict = field(compare=False)
future: asyncio.Future = field(compare=False, default=None)
class PriorityRequestQueue:
def __init__(self, max_size: int = 1000, batch_size: int = 16,
batch_timeout_ms: float = 50.0):
self.heap: list = []
self.max_size = max_size
self.batch_size = batch_size
self.batch_timeout = batch_timeout_ms / 1000
self._lock = asyncio.Lock()
self._not_empty = asyncio.Event()
self.total_enqueued = 0
self.total_processed = 0
self.wait_times: list = []
async def enqueue(self, request: dict, priority: Priority = Priority.NORMAL) -> asyncio.Future:
async with self._lock:
if len(self.heap) >= self.max_size:
raise RuntimeError(f"Queue full: {len(self.heap)}/{self.max_size}")
future = asyncio.get_event_loop().create_future()
item = QueuedRequest(
priority=-int(priority), # Negate for max-heap
timestamp=time.time(), request=request, future=future
)
heapq.heappush(self.heap, item)
self.total_enqueued += 1
self._not_empty.set()
return future
async def dequeue_batch(self) -> list:
await self._not_empty.wait()
batch = []
async with self._lock:
while len(batch) < self.batch_size and self.heap:
item = heapq.heappop(self.heap)
self.wait_times.append(time.time() - item.timestamp)
batch.append(item)
if not self.heap:
self._not_empty.clear()
return batch
async def processor(self, inference_func: Callable):
while True:
batch = await self.dequeue_batch()
if not batch:
await asyncio.sleep(0.01)
continue
prompts = [item.request["prompt"] for item in batch]
try:
outputs = await inference_func(prompts)
for item, output in zip(batch, outputs):
if not item.future.done():
item.future.set_result(output)
self.total_processed += len(batch)
except Exception as e:
for item in batch:
if not item.future.done():
item.future.set_exception(e)
def get_stats(self) -> dict:
recent = self.wait_times[-100:]
return {
"queue_size": len(self.heap),
"total_processed": self.total_processed,
"avg_wait_ms": round(sum(recent) / len(recent) * 1000, 2) if recent else 0
}
61.4 Resource Contention in Concurrent Agent Instances
Contention Sources
| Resource | Contention Scenario | Impact | Mitigation |
|---|---|---|---|
| GPU VRAM | Multiple KV Caches growing simultaneously | OOM crash | Per-instance VRAM quota |
| GPU Compute | Many simultaneous inference requests | Latency spikes | Concurrent request limit |
| CPU RAM | Large conversation histories | Swap-induced latency | History summarization |
| Disk I/O | Many file tool calls | I/O blocking | SSD + async I/O |
| Network | Concurrent API calls | Timeout increase | Connection pool + rate limiting |
VRAM Quota Management
import asyncio
from contextlib import asynccontextmanager
class GPUMemoryManager:
def __init__(self, total_vram_gb: float, reserved_gb: float = 2.0):
self.total_vram = total_vram_gb
self.available = total_vram_gb - reserved_gb
self.allocations: dict = {}
self._lock = asyncio.Lock()
@asynccontextmanager
async def allocate(self, session_id: str, required_gb: float):
async with self._lock:
current_usage = sum(self.allocations.values())
if current_usage + required_gb > self.available:
raise MemoryError(
f"Insufficient VRAM: need {required_gb}GB, "
f"available {self.available - current_usage:.1f}GB"
)
self.allocations[session_id] = required_gb
try:
yield
finally:
async with self._lock:
self.allocations.pop(session_id, None)
def get_usage(self) -> dict:
used = sum(self.allocations.values())
return {
"total_gb": self.total_vram,
"allocated_gb": round(used, 2),
"free_gb": round(self.available - used, 2),
"active_sessions": len(self.allocations)
}
class AgentScheduler:
def __init__(self, max_concurrent: int = 8, total_vram_gb: float = 80.0):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.memory_manager = GPUMemoryManager(total_vram_gb=total_vram_gb)
async def run_agent(self, session_id: str, task: str, vram_needed_gb: float = 8.0):
async with self.semaphore:
async with self.memory_manager.allocate(session_id, vram_needed_gb):
return await self._execute_task(session_id, task)
async def _execute_task(self, session_id: str, task: str) -> str:
await asyncio.sleep(0.1) # Simulate inference
return f"[{session_id}] Task completed: {task[:50]}"
61.5 GPU Memory Fragmentation
Fragmentation Mechanics
Initial state (80GB): [โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ]
Allocate 3 sessions (20GB each):
[A(20G)][B(20G)][C(20G)][ free(20G) ]
Session B ends:
[A(20G)][free(20G)][C(20G)][free(20G)]
Try to allocate new 25GB session โ FAILS
Total free = 40GB, but largest contiguous block = 20GB
Mitigation Strategies
class FragmentationAwareAllocator:
# Strategy 1: Fixed-size buckets to prevent non-standard block sizes
ALLOCATION_BUCKETS = [4, 8, 12, 16, 24, 32, 48, 64] # GB
@staticmethod
def round_up_allocation(required_gb: float) -> float:
for bucket in FragmentationAwareAllocator.ALLOCATION_BUCKETS:
if required_gb <= bucket:
return float(bucket)
return required_gb
@staticmethod
async def defragment_if_needed(fragmentation_rate: float, threshold: float = 0.3) -> bool:
if fragmentation_rate > threshold:
import logging
logging.getLogger('agent').warning(
f"Fragmentation {fragmentation_rate:.1%} exceeds threshold. "
f"Scheduling defragmentation after all requests drain."
)
return True
return False
61.6 Architecture Evolution: Single Node to Multi-Node
Phase 1: Single GPU (โค1,000 calls/day)
โโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Hermes Agent API โ
โ vLLM (1ร A100 80GB) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโ
Simple, low cost, good for dev/test
Phase 2: Single Node, Multi-GPU (โค10,000 calls/day)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Nginx Load Balancer โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ vLLM (2รA100)โ โ vLLM (2รA100) โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Horizontal scaling within one machine
Phase 3: Multi-Node Distributed (>10,000 calls/day)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Gateway / Rate Limiter โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Request Queue (Redis Streams) โ
โโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโค
โ Worker 1 โ Worker 2 โ Worker N โ
โ vLLM+4รH100โ vLLM+4รH100 โ vLLM+4รH100 โ
โโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโ
Linear scaling, requires consistent hash routing
Ray Serve for Multi-Node Deployment
import ray
from ray import serve
@serve.deployment(
num_replicas=4,
ray_actor_options={"num_gpus": 4, "num_cpus": 16},
autoscaling_config={
"min_replicas": 2, "max_replicas": 8,
"target_num_ongoing_requests_per_replica": 20,
"upscale_delay_s": 30, "downscale_delay_s": 120,
},
)
class HermesRayServe:
def __init__(self):
from vllm import LLM, SamplingParams
self.llm = LLM(
model="NousResearch/Hermes-3-Llama-3.1-70B",
tensor_parallel_size=4,
enable_prefix_caching=True,
)
self.sampling_params = SamplingParams(temperature=0.7, max_tokens=2048)
async def __call__(self, request) -> dict:
data = await request.json()
outputs = self.llm.generate([data["prompt"]], self.sampling_params)
return {"output": outputs[0].outputs[0].text}
serve.run(HermesRayServe.bind())
print("Hermes Ray Serve started with auto-scaling enabled")
Chapter Summary
Batch processing and concurrent optimization are essential for scaling Hermes Agent to production:
- Batching gains: From Batch=1 to Batch=32, throughput increases ~13ร, while latency increases only 2.6ร
- Continuous batching: vLLM's core optimization โ raises GPU utilization from 20โ40% to 85โ95%
- Priority queues: Ensure high-priority (real-time user) requests are not blocked by batch tasks
- Resource contention management: VRAM quotas + concurrency limits + fragmentation-aware allocation prevent OOM crashes
- Architecture evolution: Single GPU โ Multi-GPU โ Multi-Node, each stage with distinct technology choices and cost structures
Review Questions
- When one request in a batch needs to pause for an external tool call, should the other requests continue generating? How would you handle such "mixed-pace" batches?
- Different users have different SLAs. How would you design multi-tier QoS to guarantee latency for high-value users?
- In continuous batching, an extremely long request (e.g., 100K token output) occupies a batch slot for a long time. How do you prevent it from "starving" shorter requests?
- In a multi-node distributed setup, how do you ensure the same user's requests route to the same replica (session affinity) without compromising load balance?