Chapter 61

Batch Processing and Concurrent Inference Optimization

Chapter 61: Batch Processing and Concurrent Inference Optimization

Single-request inference is like a taxi: one passenger at a time, with the driver mostly waiting. Batch processing is like a bus: multiple passengers served simultaneously, dramatically lowering per-unit operating costs. Mastering batch processing is what makes a GPU earn its keep.


61.1 Batch Processing vs. Single Inference: Throughput Comparison

Why Batching Is More Efficient

GPU compute units (CUDA Cores / Tensor Cores) are parallel architectures—they thrive on processing large amounts of data simultaneously, not on serial single-request handling. In single-request mode, GPU utilization often sits at 20–40%. With batch processing, utilization can reach 80–95%.

Benchmark Results

Example: Hermes 3 70B (4× A100):

Batch Size Throughput (tokens/sec) Per-Request Latency (s) GPU Utilization
1 45 33.3 22%
4 142 42.5 54%
8 248 51.6 72%
16 398 64.5 85%
32 587 87.4 91%
64 743 110.2 94%
128 821 198.5 96%

Key insights:

import asyncio
import time
from typing import List
from dataclasses import dataclass

@dataclass
class InferenceRequest:
    request_id: str
    prompt: str
    max_tokens: int = 1024
    priority: int = 0

@dataclass
class InferenceResult:
    request_id: str
    output: str
    latency_ms: float
    tokens_generated: int

class BatchInferenceRunner:
    def __init__(self, llm_client, batch_size: int = 16):
        self.llm = llm_client
        self.batch_size = batch_size
    
    def run_single(self, request: InferenceRequest) -> InferenceResult:
        start = time.time()
        output = self.llm.complete(request.prompt, max_tokens=request.max_tokens)
        latency = (time.time() - start) * 1000
        return InferenceResult(
            request_id=request.request_id, output=output,
            latency_ms=latency, tokens_generated=len(output.split())
        )
    
    def run_batch(self, requests: List[InferenceRequest]) -> List[InferenceResult]:
        start = time.time()
        batches = [requests[i:i+self.batch_size] for i in range(0, len(requests), self.batch_size)]
        results = []
        
        for batch in batches:
            outputs = self.llm.batch_complete([r.prompt for r in batch])
            latency = (time.time() - start) * 1000
            for req, output in zip(batch, outputs):
                results.append(InferenceResult(
                    request_id=req.request_id, output=output,
                    latency_ms=latency, tokens_generated=len(output.split())
                ))
        return results

61.2 Continuous Batching Explained

The Static Batching Problem

Traditional static batching requires all requests in a batch to start and finish together. Since generation lengths vary wildly, short requests must wait for long ones, leaving the GPU idle:

Static Batching (Batch=4):
Request A ████████████████████████████████  (long)
Request B ████████                          (short — waits)
Request C ████████████████                  (medium)
Request D ████                              (short — waits)
GPU utilization: ~55% (white areas = wasted)

How Continuous Batching Works

Continuous batching inserts new requests into the batch at any moment (when a sequence completes):

Continuous Batching:
t=0:  [A, B, C, D] → processing
t=T1: B completes → insert E → batch: [A, C, D, E]
t=T2: D completes → insert F → batch: [A, C, E, F]
t=T3: C completes → insert G → batch: [A, E, F, G]

GPU stays saturated → utilization: 85–95%

Concurrent Requests with vLLM

from openai import AsyncOpenAI
import asyncio
from typing import List

class ContinuousBatchingClient:
    def __init__(self, base_url: str = "http://localhost:8000/v1"):
        self.client = AsyncOpenAI(base_url=base_url, api_key="dummy")
        self.model = "NousResearch/Hermes-3-Llama-3.1-70B"
    
    async def complete_single(self, prompt: str, request_id: str = "") -> dict:
        start = asyncio.get_event_loop().time()
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=1024,
        )
        latency = (asyncio.get_event_loop().time() - start) * 1000
        return {
            "request_id": request_id,
            "output": response.choices[0].message.content,
            "latency_ms": latency,
            "tokens": response.usage.completion_tokens
        }
    
    async def concurrent_complete(self, requests: List[dict], concurrency: int = 32) -> List[dict]:
        semaphore = asyncio.Semaphore(concurrency)
        
        async def bounded(req):
            async with semaphore:
                return await self.complete_single(req["prompt"], req.get("id", ""))
        
        results = await asyncio.gather(*[bounded(r) for r in requests], return_exceptions=True)
        return [r for r in results if not isinstance(r, Exception)]
    
    async def benchmark_concurrent(self, num_requests: int = 100) -> dict:
        import random
        requests = [
            {"id": f"req_{i}", "prompt": f"Task {i}: " + "x" * random.randint(50, 200)}
            for i in range(num_requests)
        ]
        results = {}
        for concurrency in [1, 4, 8, 16, 32]:
            start = asyncio.get_event_loop().time()
            completed = await self.concurrent_complete(requests, concurrency)
            elapsed = asyncio.get_event_loop().time() - start
            total_tokens = sum(r["tokens"] for r in completed)
            results[f"concurrency_{concurrency}"] = {
                "requests_per_sec": round(len(completed) / elapsed, 2),
                "tokens_per_sec": round(total_tokens / elapsed, 1),
            }
        return results

61.3 Request Queue Design

import asyncio
import heapq
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Callable, Optional

class Priority(IntEnum):
    LOW = 0; NORMAL = 1; HIGH = 2; CRITICAL = 3

@dataclass(order=True)
class QueuedRequest:
    priority: int          # Negative for max-heap behavior
    timestamp: float
    request: dict = field(compare=False)
    future: asyncio.Future = field(compare=False, default=None)

class PriorityRequestQueue:
    def __init__(self, max_size: int = 1000, batch_size: int = 16,
                 batch_timeout_ms: float = 50.0):
        self.heap: list = []
        self.max_size = max_size
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout_ms / 1000
        self._lock = asyncio.Lock()
        self._not_empty = asyncio.Event()
        self.total_enqueued = 0
        self.total_processed = 0
        self.wait_times: list = []
    
    async def enqueue(self, request: dict, priority: Priority = Priority.NORMAL) -> asyncio.Future:
        async with self._lock:
            if len(self.heap) >= self.max_size:
                raise RuntimeError(f"Queue full: {len(self.heap)}/{self.max_size}")
            future = asyncio.get_event_loop().create_future()
            item = QueuedRequest(
                priority=-int(priority),  # Negate for max-heap
                timestamp=time.time(), request=request, future=future
            )
            heapq.heappush(self.heap, item)
            self.total_enqueued += 1
            self._not_empty.set()
            return future
    
    async def dequeue_batch(self) -> list:
        await self._not_empty.wait()
        batch = []
        async with self._lock:
            while len(batch) < self.batch_size and self.heap:
                item = heapq.heappop(self.heap)
                self.wait_times.append(time.time() - item.timestamp)
                batch.append(item)
            if not self.heap:
                self._not_empty.clear()
        return batch
    
    async def processor(self, inference_func: Callable):
        while True:
            batch = await self.dequeue_batch()
            if not batch:
                await asyncio.sleep(0.01)
                continue
            prompts = [item.request["prompt"] for item in batch]
            try:
                outputs = await inference_func(prompts)
                for item, output in zip(batch, outputs):
                    if not item.future.done():
                        item.future.set_result(output)
                self.total_processed += len(batch)
            except Exception as e:
                for item in batch:
                    if not item.future.done():
                        item.future.set_exception(e)
    
    def get_stats(self) -> dict:
        recent = self.wait_times[-100:]
        return {
            "queue_size": len(self.heap),
            "total_processed": self.total_processed,
            "avg_wait_ms": round(sum(recent) / len(recent) * 1000, 2) if recent else 0
        }

61.4 Resource Contention in Concurrent Agent Instances

Contention Sources

Resource Contention Scenario Impact Mitigation
GPU VRAM Multiple KV Caches growing simultaneously OOM crash Per-instance VRAM quota
GPU Compute Many simultaneous inference requests Latency spikes Concurrent request limit
CPU RAM Large conversation histories Swap-induced latency History summarization
Disk I/O Many file tool calls I/O blocking SSD + async I/O
Network Concurrent API calls Timeout increase Connection pool + rate limiting

VRAM Quota Management

import asyncio
from contextlib import asynccontextmanager

class GPUMemoryManager:
    def __init__(self, total_vram_gb: float, reserved_gb: float = 2.0):
        self.total_vram = total_vram_gb
        self.available = total_vram_gb - reserved_gb
        self.allocations: dict = {}
        self._lock = asyncio.Lock()
    
    @asynccontextmanager
    async def allocate(self, session_id: str, required_gb: float):
        async with self._lock:
            current_usage = sum(self.allocations.values())
            if current_usage + required_gb > self.available:
                raise MemoryError(
                    f"Insufficient VRAM: need {required_gb}GB, "
                    f"available {self.available - current_usage:.1f}GB"
                )
            self.allocations[session_id] = required_gb
        try:
            yield
        finally:
            async with self._lock:
                self.allocations.pop(session_id, None)
    
    def get_usage(self) -> dict:
        used = sum(self.allocations.values())
        return {
            "total_gb": self.total_vram,
            "allocated_gb": round(used, 2),
            "free_gb": round(self.available - used, 2),
            "active_sessions": len(self.allocations)
        }

class AgentScheduler:
    def __init__(self, max_concurrent: int = 8, total_vram_gb: float = 80.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.memory_manager = GPUMemoryManager(total_vram_gb=total_vram_gb)
    
    async def run_agent(self, session_id: str, task: str, vram_needed_gb: float = 8.0):
        async with self.semaphore:
            async with self.memory_manager.allocate(session_id, vram_needed_gb):
                return await self._execute_task(session_id, task)
    
    async def _execute_task(self, session_id: str, task: str) -> str:
        await asyncio.sleep(0.1)  # Simulate inference
        return f"[{session_id}] Task completed: {task[:50]}"

61.5 GPU Memory Fragmentation

Fragmentation Mechanics

Initial state (80GB): [████████████████████████████████████████]

Allocate 3 sessions (20GB each):
[A(20G)][B(20G)][C(20G)][   free(20G)   ]

Session B ends:
[A(20G)][free(20G)][C(20G)][free(20G)]

Try to allocate new 25GB session → FAILS
Total free = 40GB, but largest contiguous block = 20GB

Mitigation Strategies

class FragmentationAwareAllocator:
    # Strategy 1: Fixed-size buckets to prevent non-standard block sizes
    ALLOCATION_BUCKETS = [4, 8, 12, 16, 24, 32, 48, 64]  # GB
    
    @staticmethod
    def round_up_allocation(required_gb: float) -> float:
        for bucket in FragmentationAwareAllocator.ALLOCATION_BUCKETS:
            if required_gb <= bucket:
                return float(bucket)
        return required_gb
    
    @staticmethod
    async def defragment_if_needed(fragmentation_rate: float, threshold: float = 0.3) -> bool:
        if fragmentation_rate > threshold:
            import logging
            logging.getLogger('agent').warning(
                f"Fragmentation {fragmentation_rate:.1%} exceeds threshold. "
                f"Scheduling defragmentation after all requests drain."
            )
            return True
        return False

61.6 Architecture Evolution: Single Node to Multi-Node

Phase 1: Single GPU (≤1,000 calls/day)
┌─────────────────────────┐
│  Hermes Agent API        │
│  vLLM (1× A100 80GB)    │
└─────────────────────────┘
Simple, low cost, good for dev/test

Phase 2: Single Node, Multi-GPU (≤10,000 calls/day)
┌───────────────────────────────────────┐
│  Nginx Load Balancer                   │
│  ┌─────────────┐  ┌─────────────────┐  │
│  │ vLLM (2×A100)│  │ vLLM (2×A100)  │  │
│  └─────────────┘  └─────────────────┘  │
└───────────────────────────────────────┘
Horizontal scaling within one machine

Phase 3: Multi-Node Distributed (>10,000 calls/day)
┌──────────────────────────────────────────────────┐
│              API Gateway / Rate Limiter            │
├──────────────────────────────────────────────────┤
│          Request Queue (Redis Streams)             │
├────────────┬──────────────────┬───────────────────┤
│  Worker 1  │     Worker 2     │      Worker N      │
│ vLLM+4×H100│   vLLM+4×H100   │   vLLM+4×H100     │
└────────────┴──────────────────┴───────────────────┘
Linear scaling, requires consistent hash routing

Ray Serve for Multi-Node Deployment

import ray
from ray import serve

@serve.deployment(
    num_replicas=4,
    ray_actor_options={"num_gpus": 4, "num_cpus": 16},
    autoscaling_config={
        "min_replicas": 2, "max_replicas": 8,
        "target_num_ongoing_requests_per_replica": 20,
        "upscale_delay_s": 30, "downscale_delay_s": 120,
    },
)
class HermesRayServe:
    def __init__(self):
        from vllm import LLM, SamplingParams
        self.llm = LLM(
            model="NousResearch/Hermes-3-Llama-3.1-70B",
            tensor_parallel_size=4,
            enable_prefix_caching=True,
        )
        self.sampling_params = SamplingParams(temperature=0.7, max_tokens=2048)
    
    async def __call__(self, request) -> dict:
        data = await request.json()
        outputs = self.llm.generate([data["prompt"]], self.sampling_params)
        return {"output": outputs[0].outputs[0].text}

serve.run(HermesRayServe.bind())
print("Hermes Ray Serve started with auto-scaling enabled")

Chapter Summary

Batch processing and concurrent optimization are essential for scaling Hermes Agent to production:

  1. Batching gains: From Batch=1 to Batch=32, throughput increases ~13×, while latency increases only 2.6×
  2. Continuous batching: vLLM's core optimization — raises GPU utilization from 20–40% to 85–95%
  3. Priority queues: Ensure high-priority (real-time user) requests are not blocked by batch tasks
  4. Resource contention management: VRAM quotas + concurrency limits + fragmentation-aware allocation prevent OOM crashes
  5. Architecture evolution: Single GPU → Multi-GPU → Multi-Node, each stage with distinct technology choices and cost structures

Review Questions

  1. When one request in a batch needs to pause for an external tool call, should the other requests continue generating? How would you handle such "mixed-pace" batches?
  2. Different users have different SLAs. How would you design multi-tier QoS to guarantee latency for high-value users?
  3. In continuous batching, an extremely long request (e.g., 100K token output) occupies a batch slot for a long time. How do you prevent it from "starving" shorter requests?
  4. In a multi-node distributed setup, how do you ensure the same user's requests route to the same replica (session affinity) without compromising load balance?
Rate this chapter
4.7  / 5  (3 ratings)

💬 Comments