第 61 章
批处理与并发推理优化
第61章:批处理与并发推理优化
单次推理就像出租车拉客:一次只服务一个乘客,司机大部分时间都在等。批处理就像公共汽车:一次服务多位乘客,单位运营成本大幅降低。掌握批处理,才能让 GPU 物尽其用。
61.1 批处理 vs 单条推理:吞吐量对比
为什么批处理更高效
GPU 的计算单元(CUDA Core / Tensor Core)是并行架构——它们最喜欢同时处理大量数据,而非串行地处理单条请求。单条推理时,GPU 利用率往往只有 20-40%;批处理时,GPU 利用率可以达到 80-95%。
基准测试对比
以 Hermes 3 70B(4× A100)为例:
| 批次大小(Batch Size) | 吞吐量(tokens/秒) | 单请求延迟(秒) | GPU 利用率 |
|---|---|---|---|
| 1 | 45 | 33.3 | 22% |
| 4 | 142 | 42.5 | 54% |
| 8 | 248 | 51.6 | 72% |
| 16 | 398 | 64.5 | 85% |
| 32 | 587 | 87.4 | 91% |
| 64 | 743 | 110.2 | 94% |
| 128 | 821 | 198.5 | 96% |
关键洞察:
- Batch=32 时,吞吐量是 Batch=1 的 13 倍,但延迟仅增加 2.6 倍
- 对于离线批量任务,最优 Batch Size 通常在 64-128 之间
- 对于在线服务,需要在延迟和吞吐量之间找平衡点
import asyncio
import time
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class InferenceRequest:
request_id: str
prompt: str
max_tokens: int = 1024
priority: int = 0 # 0=低, 1=中, 2=高
@dataclass
class InferenceResult:
request_id: str
output: str
latency_ms: float
tokens_generated: int
class BatchInferenceRunner:
"""
批量推理运行器
演示批处理如何提升吞吐量
"""
def __init__(self, llm_client, batch_size: int = 16):
self.llm = llm_client
self.batch_size = batch_size
def run_single(self, request: InferenceRequest) -> InferenceResult:
"""单条推理(基准对比)"""
start = time.time()
output = self.llm.complete(request.prompt, max_tokens=request.max_tokens)
latency = (time.time() - start) * 1000
return InferenceResult(
request_id=request.request_id,
output=output,
latency_ms=latency,
tokens_generated=len(output.split())
)
def run_batch(self, requests: List[InferenceRequest]) -> List[InferenceResult]:
"""批量推理"""
start = time.time()
# 按批次大小分割
batches = [
requests[i:i+self.batch_size]
for i in range(0, len(requests), self.batch_size)
]
results = []
for batch in batches:
prompts = [r.prompt for r in batch]
outputs = self.llm.batch_complete(prompts)
batch_latency = (time.time() - start) * 1000
for req, output in zip(batch, outputs):
results.append(InferenceResult(
request_id=req.request_id,
output=output,
latency_ms=batch_latency,
tokens_generated=len(output.split())
))
return results
def benchmark(self, num_requests: int = 100) -> dict:
"""对比单条推理和批处理的性能差异"""
import random
requests = [
InferenceRequest(
request_id=f"req_{i}",
prompt=f"Analyze task {i}: " + "x" * random.randint(100, 500)
)
for i in range(num_requests)
]
# 单条推理测试
start = time.time()
single_results = [self.run_single(r) for r in requests[:10]] # 只测前10个
single_time = time.time() - start
single_throughput = sum(r.tokens_generated for r in single_results) / single_time
# 批处理测试
start = time.time()
batch_results = self.run_batch(requests[:10])
batch_time = time.time() - start
batch_throughput = sum(r.tokens_generated for r in batch_results) / batch_time
return {
"single_inference": {
"total_time_s": round(single_time, 2),
"throughput_tokens_per_sec": round(single_throughput, 1),
"avg_latency_ms": round(single_time * 1000 / 10, 1)
},
"batch_inference": {
"total_time_s": round(batch_time, 2),
"throughput_tokens_per_sec": round(batch_throughput, 1),
"avg_latency_ms": round(batch_time * 1000 / 10, 1)
},
"speedup": round(batch_throughput / single_throughput, 2)
}
61.2 连续批处理(Continuous Batching)原理
静态批处理的问题
传统静态批处理(Static Batching)要求批次中所有请求同时开始、同时结束。由于不同请求的生成长度差异极大,短请求完成后必须等待长请求,导致 GPU 空闲:
静态批处理(Batch=4):
┌─────────────────────────────────────────┐
│ 请求A ████████████████████████████████ │(长)
│ 请求B ████████ │(短,完成后等待)
│ 请求C ████████████████ │(中)
│ 请求D ████ │(短,完成后等待)
└─────────────────────────────────────────┘
GPU 利用率:约 55%(白色区域是浪费)
连续批处理的工作原理
连续批处理允许在任意时刻将新请求插入批次(当某个序列完成时):
连续批处理:
时刻0: [A, B, C, D] → 批处理
时刻T1: B完成 → 插入 E,批次变为 [A, C, D, E]
时刻T2: D完成 → 插入 F,批次变为 [A, C, E, F]
时刻T3: C完成 → 插入 G,批次变为 [A, E, F, G]
GPU 始终保持满负荷,利用率提升至 85-95%
vLLM 的连续批处理配置
# vLLM 内置连续批处理,通过 OpenAI 兼容 API 使用
from openai import AsyncOpenAI
import asyncio
from typing import List
class ContinuousBatchingClient:
"""
利用 vLLM 连续批处理的客户端
通过并发发送请求来充分利用连续批处理优势
"""
def __init__(self, base_url: str = "http://localhost:8000/v1"):
self.client = AsyncOpenAI(base_url=base_url, api_key="dummy")
self.model = "NousResearch/Hermes-3-Llama-3.1-70B"
async def complete_single(self, prompt: str, request_id: str = "") -> dict:
"""单个异步请求"""
start = asyncio.get_event_loop().time()
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
temperature=0.7
)
latency = (asyncio.get_event_loop().time() - start) * 1000
return {
"request_id": request_id,
"output": response.choices[0].message.content,
"latency_ms": latency,
"tokens": response.usage.completion_tokens
}
async def concurrent_complete(
self,
requests: List[dict],
concurrency: int = 32
) -> List[dict]:
"""
并发发送多个请求
vLLM 服务端自动进行连续批处理
"""
semaphore = asyncio.Semaphore(concurrency)
async def bounded_complete(req):
async with semaphore:
return await self.complete_single(req["prompt"], req.get("id", ""))
tasks = [bounded_complete(req) for req in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤异常
return [r for r in results if not isinstance(r, Exception)]
async def benchmark_concurrent(self, num_requests: int = 100):
"""测试不同并发度下的吞吐量"""
import random
requests = [
{"id": f"req_{i}", "prompt": f"Task {i}: " + "x" * random.randint(50, 200)}
for i in range(num_requests)
]
results = {}
for concurrency in [1, 4, 8, 16, 32]:
start = asyncio.get_event_loop().time()
completed = await self.concurrent_complete(requests, concurrency=concurrency)
elapsed = asyncio.get_event_loop().time() - start
total_tokens = sum(r["tokens"] for r in completed)
results[f"concurrency_{concurrency}"] = {
"requests_per_sec": round(len(completed) / elapsed, 2),
"tokens_per_sec": round(total_tokens / elapsed, 1),
"avg_latency_ms": round(sum(r["latency_ms"] for r in completed) / len(completed), 1)
}
return results
61.3 请求队列设计
优先级队列实现
import asyncio
import heapq
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
from enum import IntEnum
class Priority(IntEnum):
LOW = 0
NORMAL = 1
HIGH = 2
CRITICAL = 3
@dataclass(order=True)
class QueuedRequest:
priority: int
timestamp: float
request: dict = field(compare=False)
future: asyncio.Future = field(compare=False, default=None)
def __post_init__(self):
# 高优先级排前面,同优先级按时间排序
self.priority = -self.priority # 负数使 heapq 变为最大堆
class PriorityRequestQueue:
"""
带优先级的请求队列
支持高优先级请求插队
"""
def __init__(
self,
max_size: int = 1000,
batch_size: int = 16,
batch_timeout_ms: float = 50.0 # 等待凑批的最大时间
):
self.heap: list = []
self.max_size = max_size
self.batch_size = batch_size
self.batch_timeout = batch_timeout_ms / 1000
self._lock = asyncio.Lock()
self._not_empty = asyncio.Event()
# 统计指标
self.total_enqueued = 0
self.total_processed = 0
self.wait_times = []
async def enqueue(
self,
request: dict,
priority: Priority = Priority.NORMAL
) -> asyncio.Future:
"""将请求加入队列,返回 Future 用于等待结果"""
async with self._lock:
if len(self.heap) >= self.max_size:
raise RuntimeError(f"Queue full: {len(self.heap)}/{self.max_size}")
loop = asyncio.get_event_loop()
future = loop.create_future()
item = QueuedRequest(
priority=int(priority),
timestamp=time.time(),
request=request,
future=future
)
heapq.heappush(self.heap, item)
self.total_enqueued += 1
self._not_empty.set()
return future
async def dequeue_batch(self) -> list:
"""
取出一批请求进行处理
等待直到有足够请求或超时
"""
await self._not_empty.wait()
batch = []
deadline = time.time() + self.batch_timeout
async with self._lock:
while len(batch) < self.batch_size and self.heap:
item = heapq.heappop(self.heap)
# 记录等待时间
wait_time = time.time() - item.timestamp
self.wait_times.append(wait_time)
batch.append(item)
if not self.heap:
self._not_empty.clear()
return batch
async def processor(self, inference_func: Callable):
"""
持续处理队列中的请求
"""
while True:
batch = await self.dequeue_batch()
if not batch:
await asyncio.sleep(0.01)
continue
# 提取提示词
prompts = [item.request["prompt"] for item in batch]
try:
# 批量推理
outputs = await inference_func(prompts)
# 将结果返回给等待的 Future
for item, output in zip(batch, outputs):
if not item.future.done():
item.future.set_result(output)
self.total_processed += len(batch)
except Exception as e:
# 错误时通知所有等待方
for item in batch:
if not item.future.done():
item.future.set_exception(e)
def get_stats(self) -> dict:
avg_wait = sum(self.wait_times[-100:]) / len(self.wait_times[-100:]) if self.wait_times else 0
return {
"queue_size": len(self.heap),
"total_enqueued": self.total_enqueued,
"total_processed": self.total_processed,
"avg_wait_ms": round(avg_wait * 1000, 2)
}
# 使用示例
async def main():
queue = PriorityRequestQueue(batch_size=16, batch_timeout_ms=50)
# 高优先级用户请求
future1 = await queue.enqueue(
{"prompt": "紧急任务:分析系统告警"},
priority=Priority.CRITICAL
)
# 普通批处理任务
futures = [
await queue.enqueue({"prompt": f"分析文档 {i}"}, priority=Priority.NORMAL)
for i in range(20)
]
# 等待结果
result = await future1
print(f"高优先级任务完成: {result}")
print(f"队列统计: {queue.get_stats()}")
61.4 并发 Agent 实例的资源争用
资源争用来源
当多个 Hermes Agent 实例并发运行时,主要争用以下资源:
| 资源 | 争用场景 | 影响 | 缓解策略 |
|---|---|---|---|
| GPU 显存 | KV Cache 同时增长 | OOM 崩溃 | 设置每实例显存上限 |
| GPU 计算 | 多请求同时推理 | 延迟增加 | 限制并发推理数 |
| CPU 内存 | 大量对话历史 | Swap 导致延迟 | 历史摘要压缩 |
| 磁盘 I/O | 大量文件工具调用 | I/O 等待 | SSD + 异步 I/O |
| 网络带宽 | 并发 API 调用 | 超时增加 | 连接池 + 限流 |
显存争用管理
import asyncio
import threading
from contextlib import asynccontextmanager
class GPUMemoryManager:
"""
GPU 显存管理器
防止多个 Agent 实例耗尽显存
"""
def __init__(self, total_vram_gb: float, reserved_gb: float = 2.0):
self.total_vram = total_vram_gb
self.reserved = reserved_gb # 系统预留
self.available = total_vram_gb - reserved_gb
self.allocations: dict = {} # {session_id: allocated_gb}
self._lock = asyncio.Lock()
@asynccontextmanager
async def allocate(self, session_id: str, required_gb: float):
"""
为 Agent 会话分配显存配额
超出可用显存时等待或拒绝
"""
async with self._lock:
current_usage = sum(self.allocations.values())
if current_usage + required_gb > self.available:
raise ResourceError(
f"GPU memory insufficient: need {required_gb}GB, "
f"available {self.available - current_usage:.1f}GB"
)
self.allocations[session_id] = required_gb
try:
yield
finally:
async with self._lock:
self.allocations.pop(session_id, None)
def get_usage(self) -> dict:
current_usage = sum(self.allocations.values())
return {
"total_vram_gb": self.total_vram,
"reserved_gb": self.reserved,
"allocated_gb": round(current_usage, 2),
"free_gb": round(self.available - current_usage, 2),
"usage_pct": f"{current_usage / self.available * 100:.1f}%",
"active_sessions": len(self.allocations)
}
class ResourceError(Exception):
pass
# 集成到 Agent 调度器
class AgentScheduler:
def __init__(self, max_concurrent: int = 8):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.memory_manager = GPUMemoryManager(total_vram_gb=80.0) # 单卡 80GB
self.active_agents = {}
async def run_agent(self, session_id: str, task: str, vram_needed_gb: float = 8.0):
async with self.semaphore: # 限制并发数
async with self.memory_manager.allocate(session_id, vram_needed_gb):
# 执行 Agent 任务
result = await self._execute_task(session_id, task)
return result
async def _execute_task(self, session_id: str, task: str) -> str:
# 实际任务执行逻辑
await asyncio.sleep(0.1) # 模拟推理延迟
return f"[{session_id}] Task completed: {task[:50]}"
61.5 GPU 显存碎片化问题
碎片化的成因
初始状态(80GB):
[████████████████████████████████████████] 80GB 可用
分配3个会话(各20GB):
[A(20G)][B(20G)][C(20G)][ 空闲(20G) ]
B 会话结束,释放:
[A(20G)][ 空闲(20G) ][C(20G)][ 空闲(20G) ]
尝试分配新的25GB会话 → 失败!
虽然总空闲 = 40GB,但最大连续块只有 20GB
碎片化缓解策略
class FragmentationAwareAllocator:
"""
显存碎片感知分配器
通过合理的分配策略减少碎片
"""
# 策略1:固定大小分配(避免非标准大小块)
ALLOCATION_BUCKETS = [4, 8, 12, 16, 24, 32, 48, 64] # GB
@staticmethod
def round_up_allocation(required_gb: float) -> float:
"""将请求的显存大小向上取整到最近的桶大小"""
for bucket in FragmentationAwareAllocator.ALLOCATION_BUCKETS:
if required_gb <= bucket:
return float(bucket)
return required_gb # 超大请求原样处理
# 策略2:定期碎片整理(需要重启 vLLM 进程)
@staticmethod
async def defragment_if_needed(usage_report: dict, threshold: float = 0.3):
"""
当碎片率超过阈值时触发整理
碎片率 = (总空闲 - 最大连续空闲) / 总空闲
"""
fragmentation_rate = usage_report.get("fragmentation_rate", 0)
if fragmentation_rate > threshold:
import logging
logging.getLogger('agent').warning(
f"GPU memory fragmentation rate {fragmentation_rate:.1%} "
f"exceeds threshold {threshold:.1%}. Scheduling defragmentation."
)
# 实际场景:需要等待所有请求完成后重启推理进程
return True
return False
61.6 高并发架构演进:单机 → 多机
架构演进路径
阶段1:单机单卡(≤ 1,000 次/天)
┌────────────────────────────┐
│ Hermes Agent API │
│ ┌──────────────────────┐ │
│ │ vLLM (1× A100 80G) │ │
│ └──────────────────────┘ │
└────────────────────────────┘
特点:简单,低成本,适合开发/测试
阶段2:单机多卡(≤ 10,000 次/天)
┌────────────────────────────────────────┐
│ Nginx 负载均衡 │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │ vLLM 实例1 │ │ vLLM 实例2 │ │
│ │ 2× A100 │ │ 2× A100 │ │
│ └──────────────┘ └──────────────────┘ │
└────────────────────────────────────────┘
特点:横向扩展,单机资源上限
阶段3:多机分布式(> 10,000 次/天)
┌──────────────────────────────────────────────────────┐
│ API Gateway / Rate Limiter │
├──────────────────────────────────────────────────────┤
│ Request Queue (Redis Streams) │
├──────────┬───────────────────────┬────────────────────┤
│ Worker 1│ Worker 2 │ Worker N │
│ vLLM │ vLLM │ vLLM │
│ 4× H100 │ 4× H100 │ 4× H100 │
└──────────┴───────────────────────┴────────────────────┘
特点:线性扩展,需要一致性哈希路由
多机推理的 Ray Serve 配置
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment(
num_replicas=4, # 4个副本
ray_actor_options={
"num_gpus": 4, # 每个副本使用4个GPU
"num_cpus": 16,
},
autoscaling_config={
"min_replicas": 2,
"max_replicas": 8,
"target_num_ongoing_requests_per_replica": 20,
"upscale_delay_s": 30,
"downscale_delay_s": 120,
},
)
class HermesRayServe:
def __init__(self):
from vllm import LLM, SamplingParams
self.llm = LLM(
model="NousResearch/Hermes-3-Llama-3.1-70B",
tensor_parallel_size=4,
enable_prefix_caching=True,
)
self.sampling_params = SamplingParams(
temperature=0.7,
max_tokens=2048,
)
async def __call__(self, request) -> dict:
data = await request.json()
prompt = data["prompt"]
outputs = self.llm.generate([prompt], self.sampling_params)
return {"output": outputs[0].outputs[0].text}
# 部署
serve.run(HermesRayServe.bind())
print("Hermes Ray Serve 已启动,支持自动扩缩容")
本章小结
批处理和并发优化是 Hermes Agent 生产化规模化的关键:
- 批处理收益:从 Batch=1 到 Batch=32,吞吐量提升约 13 倍,延迟仅增加 2.6 倍
- 连续批处理:vLLM 的核心优化,将 GPU 利用率从 20-40% 提升至 85-95%
- 优先级队列:保证高优先级请求(实时用户)不被批处理任务阻塞
- 资源争用管理:显存配额 + 并发限制 + 碎片感知分配,防止 OOM 崩溃
- 架构演进:单卡 → 多卡 → 多机,每个阶段有不同的技术选型和成本结构
思考题
- 当批次中有一个请求因为工具调用需要等待外部系统时,其他请求是否应该继续生成?如何处理这种"混合步调"的批次?
- 不同用户的 SLA(服务级别协议)不同,如何设计多层 QoS 确保高价值用户的延迟保证?
- 在连续批处理中,一个极长的请求(如 100K token 输出)会占用批次槽位很久,如何防止它"饿死"其他短请求?
- 多机分布式场景下,如何保证同一用户的多次请求路由到同一副本(实现会话亲和性),同时又不影响负载均衡?