第 61 章

批处理与并发推理优化

第61章:批处理与并发推理优化

单次推理就像出租车拉客:一次只服务一个乘客,司机大部分时间都在等。批处理就像公共汽车:一次服务多位乘客,单位运营成本大幅降低。掌握批处理,才能让 GPU 物尽其用。


61.1 批处理 vs 单条推理:吞吐量对比

为什么批处理更高效

GPU 的计算单元(CUDA Core / Tensor Core)是并行架构——它们最喜欢同时处理大量数据,而非串行地处理单条请求。单条推理时,GPU 利用率往往只有 20-40%;批处理时,GPU 利用率可以达到 80-95%。

基准测试对比

以 Hermes 3 70B(4× A100)为例:

批次大小(Batch Size) 吞吐量(tokens/秒) 单请求延迟(秒) GPU 利用率
1 45 33.3 22%
4 142 42.5 54%
8 248 51.6 72%
16 398 64.5 85%
32 587 87.4 91%
64 743 110.2 94%
128 821 198.5 96%

关键洞察

import asyncio
import time
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class InferenceRequest:
    request_id: str
    prompt: str
    max_tokens: int = 1024
    priority: int = 0  # 0=低, 1=中, 2=高

@dataclass
class InferenceResult:
    request_id: str
    output: str
    latency_ms: float
    tokens_generated: int

class BatchInferenceRunner:
    """
    批量推理运行器
    演示批处理如何提升吞吐量
    """
    
    def __init__(self, llm_client, batch_size: int = 16):
        self.llm = llm_client
        self.batch_size = batch_size
    
    def run_single(self, request: InferenceRequest) -> InferenceResult:
        """单条推理(基准对比)"""
        start = time.time()
        output = self.llm.complete(request.prompt, max_tokens=request.max_tokens)
        latency = (time.time() - start) * 1000
        
        return InferenceResult(
            request_id=request.request_id,
            output=output,
            latency_ms=latency,
            tokens_generated=len(output.split())
        )
    
    def run_batch(self, requests: List[InferenceRequest]) -> List[InferenceResult]:
        """批量推理"""
        start = time.time()
        
        # 按批次大小分割
        batches = [
            requests[i:i+self.batch_size] 
            for i in range(0, len(requests), self.batch_size)
        ]
        
        results = []
        for batch in batches:
            prompts = [r.prompt for r in batch]
            outputs = self.llm.batch_complete(prompts)
            
            batch_latency = (time.time() - start) * 1000
            
            for req, output in zip(batch, outputs):
                results.append(InferenceResult(
                    request_id=req.request_id,
                    output=output,
                    latency_ms=batch_latency,
                    tokens_generated=len(output.split())
                ))
        
        return results
    
    def benchmark(self, num_requests: int = 100) -> dict:
        """对比单条推理和批处理的性能差异"""
        import random
        
        requests = [
            InferenceRequest(
                request_id=f"req_{i}",
                prompt=f"Analyze task {i}: " + "x" * random.randint(100, 500)
            )
            for i in range(num_requests)
        ]
        
        # 单条推理测试
        start = time.time()
        single_results = [self.run_single(r) for r in requests[:10]]  # 只测前10个
        single_time = time.time() - start
        single_throughput = sum(r.tokens_generated for r in single_results) / single_time
        
        # 批处理测试
        start = time.time()
        batch_results = self.run_batch(requests[:10])
        batch_time = time.time() - start
        batch_throughput = sum(r.tokens_generated for r in batch_results) / batch_time
        
        return {
            "single_inference": {
                "total_time_s": round(single_time, 2),
                "throughput_tokens_per_sec": round(single_throughput, 1),
                "avg_latency_ms": round(single_time * 1000 / 10, 1)
            },
            "batch_inference": {
                "total_time_s": round(batch_time, 2),
                "throughput_tokens_per_sec": round(batch_throughput, 1),
                "avg_latency_ms": round(batch_time * 1000 / 10, 1)
            },
            "speedup": round(batch_throughput / single_throughput, 2)
        }

61.2 连续批处理(Continuous Batching)原理

静态批处理的问题

传统静态批处理(Static Batching)要求批次中所有请求同时开始、同时结束。由于不同请求的生成长度差异极大,短请求完成后必须等待长请求,导致 GPU 空闲:

静态批处理(Batch=4):
┌─────────────────────────────────────────┐
│ 请求A ████████████████████████████████  │(长)
│ 请求B ████████                          │(短,完成后等待)
│ 请求C ████████████████                  │(中)
│ 请求D ████                              │(短,完成后等待)
└─────────────────────────────────────────┘
GPU 利用率:约 55%(白色区域是浪费)

连续批处理的工作原理

连续批处理允许在任意时刻将新请求插入批次(当某个序列完成时):

连续批处理:
时刻0: [A, B, C, D] → 批处理
时刻T1: B完成 → 插入 E,批次变为 [A, C, D, E]
时刻T2: D完成 → 插入 F,批次变为 [A, C, E, F]
时刻T3: C完成 → 插入 G,批次变为 [A, E, F, G]

GPU 始终保持满负荷,利用率提升至 85-95%

vLLM 的连续批处理配置

# vLLM 内置连续批处理,通过 OpenAI 兼容 API 使用
from openai import AsyncOpenAI
import asyncio
from typing import List

class ContinuousBatchingClient:
    """
    利用 vLLM 连续批处理的客户端
    通过并发发送请求来充分利用连续批处理优势
    """
    
    def __init__(self, base_url: str = "http://localhost:8000/v1"):
        self.client = AsyncOpenAI(base_url=base_url, api_key="dummy")
        self.model = "NousResearch/Hermes-3-Llama-3.1-70B"
    
    async def complete_single(self, prompt: str, request_id: str = "") -> dict:
        """单个异步请求"""
        start = asyncio.get_event_loop().time()
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=1024,
            temperature=0.7
        )
        
        latency = (asyncio.get_event_loop().time() - start) * 1000
        
        return {
            "request_id": request_id,
            "output": response.choices[0].message.content,
            "latency_ms": latency,
            "tokens": response.usage.completion_tokens
        }
    
    async def concurrent_complete(
        self, 
        requests: List[dict],
        concurrency: int = 32
    ) -> List[dict]:
        """
        并发发送多个请求
        vLLM 服务端自动进行连续批处理
        """
        semaphore = asyncio.Semaphore(concurrency)
        
        async def bounded_complete(req):
            async with semaphore:
                return await self.complete_single(req["prompt"], req.get("id", ""))
        
        tasks = [bounded_complete(req) for req in requests]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 过滤异常
        return [r for r in results if not isinstance(r, Exception)]
    
    async def benchmark_concurrent(self, num_requests: int = 100):
        """测试不同并发度下的吞吐量"""
        import random
        
        requests = [
            {"id": f"req_{i}", "prompt": f"Task {i}: " + "x" * random.randint(50, 200)}
            for i in range(num_requests)
        ]
        
        results = {}
        for concurrency in [1, 4, 8, 16, 32]:
            start = asyncio.get_event_loop().time()
            completed = await self.concurrent_complete(requests, concurrency=concurrency)
            elapsed = asyncio.get_event_loop().time() - start
            
            total_tokens = sum(r["tokens"] for r in completed)
            results[f"concurrency_{concurrency}"] = {
                "requests_per_sec": round(len(completed) / elapsed, 2),
                "tokens_per_sec": round(total_tokens / elapsed, 1),
                "avg_latency_ms": round(sum(r["latency_ms"] for r in completed) / len(completed), 1)
            }
        
        return results

61.3 请求队列设计

优先级队列实现

import asyncio
import heapq
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import IntEnum

class Priority(IntEnum):
    LOW = 0
    NORMAL = 1
    HIGH = 2
    CRITICAL = 3

@dataclass(order=True)
class QueuedRequest:
    priority: int
    timestamp: float
    request: dict = field(compare=False)
    future: asyncio.Future = field(compare=False, default=None)
    
    def __post_init__(self):
        # 高优先级排前面,同优先级按时间排序
        self.priority = -self.priority  # 负数使 heapq 变为最大堆

class PriorityRequestQueue:
    """
    带优先级的请求队列
    支持高优先级请求插队
    """
    
    def __init__(
        self,
        max_size: int = 1000,
        batch_size: int = 16,
        batch_timeout_ms: float = 50.0  # 等待凑批的最大时间
    ):
        self.heap: list = []
        self.max_size = max_size
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout_ms / 1000
        self._lock = asyncio.Lock()
        self._not_empty = asyncio.Event()
        
        # 统计指标
        self.total_enqueued = 0
        self.total_processed = 0
        self.wait_times = []
    
    async def enqueue(
        self,
        request: dict,
        priority: Priority = Priority.NORMAL
    ) -> asyncio.Future:
        """将请求加入队列,返回 Future 用于等待结果"""
        async with self._lock:
            if len(self.heap) >= self.max_size:
                raise RuntimeError(f"Queue full: {len(self.heap)}/{self.max_size}")
            
            loop = asyncio.get_event_loop()
            future = loop.create_future()
            
            item = QueuedRequest(
                priority=int(priority),
                timestamp=time.time(),
                request=request,
                future=future
            )
            
            heapq.heappush(self.heap, item)
            self.total_enqueued += 1
            self._not_empty.set()
            
            return future
    
    async def dequeue_batch(self) -> list:
        """
        取出一批请求进行处理
        等待直到有足够请求或超时
        """
        await self._not_empty.wait()
        
        batch = []
        deadline = time.time() + self.batch_timeout
        
        async with self._lock:
            while len(batch) < self.batch_size and self.heap:
                item = heapq.heappop(self.heap)
                
                # 记录等待时间
                wait_time = time.time() - item.timestamp
                self.wait_times.append(wait_time)
                
                batch.append(item)
            
            if not self.heap:
                self._not_empty.clear()
        
        return batch
    
    async def processor(self, inference_func: Callable):
        """
        持续处理队列中的请求
        """
        while True:
            batch = await self.dequeue_batch()
            
            if not batch:
                await asyncio.sleep(0.01)
                continue
            
            # 提取提示词
            prompts = [item.request["prompt"] for item in batch]
            
            try:
                # 批量推理
                outputs = await inference_func(prompts)
                
                # 将结果返回给等待的 Future
                for item, output in zip(batch, outputs):
                    if not item.future.done():
                        item.future.set_result(output)
                
                self.total_processed += len(batch)
                
            except Exception as e:
                # 错误时通知所有等待方
                for item in batch:
                    if not item.future.done():
                        item.future.set_exception(e)
    
    def get_stats(self) -> dict:
        avg_wait = sum(self.wait_times[-100:]) / len(self.wait_times[-100:]) if self.wait_times else 0
        return {
            "queue_size": len(self.heap),
            "total_enqueued": self.total_enqueued,
            "total_processed": self.total_processed,
            "avg_wait_ms": round(avg_wait * 1000, 2)
        }

# 使用示例
async def main():
    queue = PriorityRequestQueue(batch_size=16, batch_timeout_ms=50)
    
    # 高优先级用户请求
    future1 = await queue.enqueue(
        {"prompt": "紧急任务:分析系统告警"},
        priority=Priority.CRITICAL
    )
    
    # 普通批处理任务
    futures = [
        await queue.enqueue({"prompt": f"分析文档 {i}"}, priority=Priority.NORMAL)
        for i in range(20)
    ]
    
    # 等待结果
    result = await future1
    print(f"高优先级任务完成: {result}")
    print(f"队列统计: {queue.get_stats()}")

61.4 并发 Agent 实例的资源争用

资源争用来源

当多个 Hermes Agent 实例并发运行时,主要争用以下资源:

资源 争用场景 影响 缓解策略
GPU 显存 KV Cache 同时增长 OOM 崩溃 设置每实例显存上限
GPU 计算 多请求同时推理 延迟增加 限制并发推理数
CPU 内存 大量对话历史 Swap 导致延迟 历史摘要压缩
磁盘 I/O 大量文件工具调用 I/O 等待 SSD + 异步 I/O
网络带宽 并发 API 调用 超时增加 连接池 + 限流

显存争用管理

import asyncio
import threading
from contextlib import asynccontextmanager

class GPUMemoryManager:
    """
    GPU 显存管理器
    防止多个 Agent 实例耗尽显存
    """
    
    def __init__(self, total_vram_gb: float, reserved_gb: float = 2.0):
        self.total_vram = total_vram_gb
        self.reserved = reserved_gb  # 系统预留
        self.available = total_vram_gb - reserved_gb
        self.allocations: dict = {}  # {session_id: allocated_gb}
        self._lock = asyncio.Lock()
    
    @asynccontextmanager
    async def allocate(self, session_id: str, required_gb: float):
        """
        为 Agent 会话分配显存配额
        超出可用显存时等待或拒绝
        """
        async with self._lock:
            current_usage = sum(self.allocations.values())
            
            if current_usage + required_gb > self.available:
                raise ResourceError(
                    f"GPU memory insufficient: need {required_gb}GB, "
                    f"available {self.available - current_usage:.1f}GB"
                )
            
            self.allocations[session_id] = required_gb
        
        try:
            yield
        finally:
            async with self._lock:
                self.allocations.pop(session_id, None)
    
    def get_usage(self) -> dict:
        current_usage = sum(self.allocations.values())
        return {
            "total_vram_gb": self.total_vram,
            "reserved_gb": self.reserved,
            "allocated_gb": round(current_usage, 2),
            "free_gb": round(self.available - current_usage, 2),
            "usage_pct": f"{current_usage / self.available * 100:.1f}%",
            "active_sessions": len(self.allocations)
        }

class ResourceError(Exception):
    pass

# 集成到 Agent 调度器
class AgentScheduler:
    def __init__(self, max_concurrent: int = 8):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.memory_manager = GPUMemoryManager(total_vram_gb=80.0)  # 单卡 80GB
        self.active_agents = {}
    
    async def run_agent(self, session_id: str, task: str, vram_needed_gb: float = 8.0):
        async with self.semaphore:  # 限制并发数
            async with self.memory_manager.allocate(session_id, vram_needed_gb):
                # 执行 Agent 任务
                result = await self._execute_task(session_id, task)
                return result
    
    async def _execute_task(self, session_id: str, task: str) -> str:
        # 实际任务执行逻辑
        await asyncio.sleep(0.1)  # 模拟推理延迟
        return f"[{session_id}] Task completed: {task[:50]}"

61.5 GPU 显存碎片化问题

碎片化的成因

初始状态(80GB):
[████████████████████████████████████████] 80GB 可用

分配3个会话(各20GB):
[A(20G)][B(20G)][C(20G)][    空闲(20G)    ]

B 会话结束,释放:
[A(20G)][  空闲(20G)  ][C(20G)][  空闲(20G)  ]

尝试分配新的25GB会话 → 失败!
虽然总空闲 = 40GB,但最大连续块只有 20GB

碎片化缓解策略

class FragmentationAwareAllocator:
    """
    显存碎片感知分配器
    通过合理的分配策略减少碎片
    """
    
    # 策略1:固定大小分配(避免非标准大小块)
    ALLOCATION_BUCKETS = [4, 8, 12, 16, 24, 32, 48, 64]  # GB
    
    @staticmethod
    def round_up_allocation(required_gb: float) -> float:
        """将请求的显存大小向上取整到最近的桶大小"""
        for bucket in FragmentationAwareAllocator.ALLOCATION_BUCKETS:
            if required_gb <= bucket:
                return float(bucket)
        return required_gb  # 超大请求原样处理
    
    # 策略2:定期碎片整理(需要重启 vLLM 进程)
    @staticmethod
    async def defragment_if_needed(usage_report: dict, threshold: float = 0.3):
        """
        当碎片率超过阈值时触发整理
        碎片率 = (总空闲 - 最大连续空闲) / 总空闲
        """
        fragmentation_rate = usage_report.get("fragmentation_rate", 0)
        
        if fragmentation_rate > threshold:
            import logging
            logging.getLogger('agent').warning(
                f"GPU memory fragmentation rate {fragmentation_rate:.1%} "
                f"exceeds threshold {threshold:.1%}. Scheduling defragmentation."
            )
            # 实际场景:需要等待所有请求完成后重启推理进程
            return True
        return False

61.6 高并发架构演进:单机 → 多机

架构演进路径

阶段1:单机单卡(≤ 1,000 次/天)
┌────────────────────────────┐
│  Hermes Agent API          │
│  ┌──────────────────────┐  │
│  │  vLLM (1× A100 80G)  │  │
│  └──────────────────────┘  │
└────────────────────────────┘
特点:简单,低成本,适合开发/测试

阶段2:单机多卡(≤ 10,000 次/天)
┌────────────────────────────────────────┐
│  Nginx 负载均衡                         │
│  ┌──────────────┐ ┌──────────────────┐  │
│  │  vLLM 实例1  │ │   vLLM 实例2     │  │
│  │  2× A100     │ │   2× A100        │  │
│  └──────────────┘ └──────────────────┘  │
└────────────────────────────────────────┘
特点:横向扩展,单机资源上限

阶段3:多机分布式(> 10,000 次/天)
┌──────────────────────────────────────────────────────┐
│                    API Gateway / Rate Limiter          │
├──────────────────────────────────────────────────────┤
│               Request Queue (Redis Streams)            │
├──────────┬───────────────────────┬────────────────────┤
│  Worker 1│       Worker 2        │      Worker N       │
│ vLLM     │       vLLM            │      vLLM           │
│ 4× H100  │       4× H100         │      4× H100        │
└──────────┴───────────────────────┴────────────────────┘
特点:线性扩展,需要一致性哈希路由

多机推理的 Ray Serve 配置

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle

@serve.deployment(
    num_replicas=4,              # 4个副本
    ray_actor_options={
        "num_gpus": 4,           # 每个副本使用4个GPU
        "num_cpus": 16,
    },
    autoscaling_config={
        "min_replicas": 2,
        "max_replicas": 8,
        "target_num_ongoing_requests_per_replica": 20,
        "upscale_delay_s": 30,
        "downscale_delay_s": 120,
    },
)
class HermesRayServe:
    def __init__(self):
        from vllm import LLM, SamplingParams
        self.llm = LLM(
            model="NousResearch/Hermes-3-Llama-3.1-70B",
            tensor_parallel_size=4,
            enable_prefix_caching=True,
        )
        self.sampling_params = SamplingParams(
            temperature=0.7,
            max_tokens=2048,
        )
    
    async def __call__(self, request) -> dict:
        data = await request.json()
        prompt = data["prompt"]
        
        outputs = self.llm.generate([prompt], self.sampling_params)
        return {"output": outputs[0].outputs[0].text}

# 部署
serve.run(HermesRayServe.bind())
print("Hermes Ray Serve 已启动,支持自动扩缩容")

本章小结

批处理和并发优化是 Hermes Agent 生产化规模化的关键:

  1. 批处理收益:从 Batch=1 到 Batch=32,吞吐量提升约 13 倍,延迟仅增加 2.6 倍
  2. 连续批处理:vLLM 的核心优化,将 GPU 利用率从 20-40% 提升至 85-95%
  3. 优先级队列:保证高优先级请求(实时用户)不被批处理任务阻塞
  4. 资源争用管理:显存配额 + 并发限制 + 碎片感知分配,防止 OOM 崩溃
  5. 架构演进:单卡 → 多卡 → 多机,每个阶段有不同的技术选型和成本结构

思考题

  1. 当批次中有一个请求因为工具调用需要等待外部系统时,其他请求是否应该继续生成?如何处理这种"混合步调"的批次?
  2. 不同用户的 SLA(服务级别协议)不同,如何设计多层 QoS 确保高价值用户的延迟保证?
  3. 在连续批处理中,一个极长的请求(如 100K token 输出)会占用批次槽位很久,如何防止它"饿死"其他短请求?
  4. 多机分布式场景下,如何保证同一用户的多次请求路由到同一副本(实现会话亲和性),同时又不影响负载均衡?
本章评分
4.7  / 5  (3 评分)

💬 留言讨论