Fetch 请求与 Consumer 拉取机制
第19章:Fetch 请求与 Consumer 拉取机制
导读:Fetch 请求在 Broker 端如何处理?
本章核心问题:Fetch 请求在 Broker 端如何处理?
读完本章你将理解:
- FetchRequest 处理路径
- Consumer Fetch 与 Follower Fetch 的复用
- Fetch Session 增量拉取优化
- 零拷贝在 Fetch 路径的应用
Level 1 · 你需要知道的(1-3年经验)
拉模型的哲学
Kafka 选择消费者主动拉取(Pull),而不是 Broker 主动推送(Push),这个决策背后有深刻的工程考量。
推模型的核心矛盾在于:Broker 必须知道每个消费者当前能消费多快。慢消费者会导致 Broker 侧队列积压,快消费者又会因为等待而浪费资源。Kafka 的设计者选择了更简单的答案:让消费者自己决定何时、拉多少。慢消费者自然地落后,而不会对其他消费者或 Broker 造成影响。消费者崩溃重启后,也可以从任意 Offset 继续,这种语义在推模型中极难实现。
理解了这个哲学,接下来看具体的实现。
Broker 侧:KafkaApis.handleFetchRequest()
// KafkaApis.scala(简化)
def handleFetchRequest(request: RequestChannel.Request): Unit = {
val versionId = request.header.apiVersion
val fetchRequest = request.body[FetchRequest]
// ① 区分来源:Consumer(replicaId=-1)还是 Follower 副本(replicaId>=0)
val isFromFollower = fetchRequest.isFromFollower
val isFromConsumer = fetchRequest.isFromConsumer
// ② 授权检查(仅对 Consumer;Follower 间复制无需额外授权)
val authorizedRequestInfo = mutable.Map[TopicIdPartition, FetchRequest.PartitionData]()
fetchRequest.fetchData(topicNames).forEach { (tp, partitionData) =>
if (isFromConsumer && !authorize(request.context, READ, TOPIC, tp.topic))
erroneous(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
else
authorizedRequestInfo(tp) = partitionData
}
// ③ 从 ReplicaManager 读取数据
replicaManager.fetchMessages(
params = new FetchParams(
requestVersion = versionId,
replicaId = fetchRequest.replicaId,
maxWaitMs = fetchRequest.maxWait,
minBytes = fetchRequest.minBytes,
maxBytes = fetchRequest.maxBytes,
isolation = FetchIsolation(fetchRequest),
clientMetadata = clientMetadata
),
fetchInfos = authorizedRequestInfo,
quota = quota,
responseCallback = processResponseCallback // 回调,数据就绪后发送响应
)
}
replicaManager.fetchMessages()
// ReplicaManager.scala(简化)
def fetchMessages(
params: FetchParams,
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {
// ① 从本地 Log 读取数据
val logReadResults = readFromLocalLog(
replicaId = params.replicaId,
fetchOnlyFromLeader = params.fetchOnlyFromLeader,
fetchIsolation = params.isolation,
fetchMaxBytes = params.maxBytes,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfos,
quota = quota
)
// ② 判断是否满足 min.bytes 条件
val bytesReadable = logReadResults.map(_._2.info.records.sizeInBytes).sum
val errorReadingData = logReadResults.exists(_._2.error != Errors.NONE)
if (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData) {
// 数据充足或不需要等待,直接响应
val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> result.toFetchPartitionData(false) }
responseCallback(fetchPartitionData)
} else {
// 数据不足,创建 DelayedFetch,等待更多数据到来
val delayedFetch = new DelayedFetch(
params.maxWaitMs, params, fetchInfos, this, quota, responseCallback
)
val fetchPartitionKeys = fetchInfos.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, fetchPartitionKeys)
}
}
readFromLocalLog() — 实际读取过程
// ReplicaManager.scala(简化)
private def readFromLocalLog(
replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota
): Seq[(TopicIdPartition, LogReadResult)] = {
readPartitionInfo.map { case (tp, fetchInfo) =>
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, fetchMaxBytes)
try {
val partition = getPartitionOrException(tp.topicPartition)
// 读取到 FetchDataInfo
val fetchDataInfo = partition.fetchMessages(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchOnlyFromLeader = fetchOnlyFromLeader,
fetchIsolation = fetchIsolation,
includeAbortedTxns = params.isFromConsumer
)
tp -> LogReadResult(fetchDataInfo, ...)
} catch {
case e: KafkaStorageException => tp -> LogReadResult(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, error = Errors.KAFKA_STORAGE_ERROR, ...)
case e: NotLeaderOrFollowerException => tp -> LogReadResult(..., error = Errors.NOT_LEADER_OR_FOLLOWER, ...)
}
}
}
partition.fetchMessages() 最终调用 UnifiedLog.read():
// UnifiedLog.scala(简化)
def read(
startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean
): FetchDataInfo = {
// ① 找到包含 startOffset 的 LogSegment(通过稀疏索引二分查找)
val segmentEntry = segments.floorEntry(startOffset)
val segment = segmentEntry.getValue
// ② 在 Segment 内部精确定位到 startOffset 的物理位置
val baseOffset = segmentEntry.getKey
val offsetIndex = segment.offsetIndex
val position = offsetIndex.lookup(startOffset) // 返回最近的索引项
// ③ 构造 FetchDataInfo,包含 FileRecords(尚未读出数据到内存!)
// FileRecords 是对 FileChannel 的惰性引用,真正的数据传输在网络发送时发生
val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
FetchDataInfo(LogOffsetMetadata(startOffset), fetchInfo.records)
}
Zero-Copy:最重要的性能优化
Kafka 读取数据的核心优化是 Zero-Copy(零拷贝),通过 FileChannel.transferTo() 实现。
传统读取路径(4 次拷贝)
磁盘 → 内核缓冲区(DMA)→ 用户空间(JVM 堆)→ 内核 Socket 缓冲区 → 网卡(DMA)
第1次拷贝 第2次拷贝 第3次拷贝 第4次拷贝
Zero-Copy 路径(2 次拷贝)
磁盘 → 内核 Page Cache(DMA)→ 网卡(DMA,通过 sendfile 系统调用)
第1次拷贝 第2次拷贝(内核态完成,无用户态参与)
源码中的体现
// FileRecords.scala(简化)
class FileRecords(val file: File, private val channel: FileChannel, ...) extends Records {
/**
* 将文件数据写入 GatheringByteChannel(即网络 SocketChannel)
* 这是 Zero-Copy 的核心调用
*/
def writeTo(destChannel: GatheringByteChannel, offset: Long, length: Int): Long = {
val tl = math.min(length.toLong, size - offset)
// FileChannel.transferTo() 在 Linux 上映射到 sendfile() 系统调用
// 数据从 Page Cache 直接传输到 Socket 缓冲区,不经过 JVM 堆
channel.transferTo(start + offset, tl, destChannel)
}
}
Zero-Copy 生效的前提是:数据已在操作系统 Page Cache 中(无需从磁盘读取)且目标是网络 Socket(非加密连接)。这也是为什么 Kafka 对 SSL/TLS 的处理需要额外的内存拷贝——加密破坏了 Zero-Copy 路径。
Leader vs Follower 读:fetchIsolation 的作用
FetchIsolation 枚举决定了能读到多远的数据:
| 隔离级别 | 能读到的最大 Offset | 适用场景 |
|---|---|---|
LOG_END |
LEO(Log End Offset) | Follower 副本复制 |
HIGH_WATERMARK |
HW(High Watermark) | 普通 Consumer(isolation.level=read_uncommitted) |
TXN_COMMITTED |
Last Stable Offset (LSO) | 事务 Consumer(isolation.level=read_committed) |
Follower 副本在 FetchRequest 中携带自己的 replicaId(Broker ID),Broker 识别后使用 LOG_END 级别,允许 Follower 读取到 Leader 的最新写入(包括未提交到 HW 的数据)。普通 Consumer 的 replicaId=-1,只能读到 HW,确保读到的数据是"已被所有 ISR 确认"的。
完整消费路径总结
KafkaConsumer.poll()
│
├── Fetcher.sendFetches()
│ → 构造 FetchRequest(Full 或 Incremental)
│ → ConsumerNetworkClient.send()(异步 NIO)
│
↓(网络传输)
│
KafkaApis.handleFetchRequest() ← Broker 侧
│ [授权检查、区分 Consumer/Follower]
↓
ReplicaManager.fetchMessages()
│ [readFromLocalLog,检查 minBytes]
↓
UnifiedLog.read()
│ [索引定位 → FileRecords(惰性引用)]
↓
[数据充足] → responseCallback() → 发送 FetchResponse
[数据不足] → DelayedFetch → 等待新数据 or 超时
│
↓(响应返回客户端)
│
Fetcher.handleFetchResponse()
→ 放入 completedFetches 队列
│
KafkaConsumer.pollForFetches()
→ 从 completedFetches 取数据
→ 反序列化 key/value
→ 返回 ConsumerRecords 给用户
Fetch 路径的核心设计哲学与 Produce 路径相同:异步 + 回调 + Purgatory。KafkaRequestHandler 线程不会阻塞等待数据,而是通过 DelayedFetch 机制让出线程资源,在条件满足时再触发响应。
Level 2 · 它是怎么运行的(3-5年经验)
Consumer 侧:Fetcher 线程的工作机制
KafkaConsumer 的线程模型
KafkaConsumer 不是线程安全的,所有方法必须从同一个线程调用。但在内部,它通过 ConsumerNetworkClient(包装了非阻塞 NIO 客户端)来异步发送和接收网络请求。从 Kafka 3.5 开始,KafkaConsumer 的内部结构迁移到了新的 AsyncKafkaConsumer 架构,但核心 Fetch 流程的语义没有变化。
Fetcher 的核心流程
// Fetcher.java(clients/src/main/java/org/apache/kafka/clients/consumer/internals/)
// 简化版,保留核心逻辑
public class Fetcher<K, V> implements Closeable {
// 已完成的 Fetch 结果队列(Processor 线程填充,poll() 线程消费)
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();
/**
* 发送 FetchRequests 到所有分配给本 Consumer 的 Partition Leader
* 由 poll() 方法内部调用
*/
public synchronized int sendFetches() {
// 找出所有"可拉取"的 Partition(已分配、有 Leader、没有 in-flight 请求)
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// 构造 FetchRequest(包含 FetchSession ID,支持增量 Fetch,见 KIP-227)
FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.removed(data.toForget())
.replaced(data.toReplace());
// 异步发送,注册回调
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
// 解析响应,放入 completedFetches
handleFetchResponse(fetchTarget, data, resp);
}
@Override
public void onFailure(RuntimeException e) {
handleFetchFailure(fetchTarget, data, e);
}
});
}
return fetchRequestMap.size();
}
/**
* poll() 方法调用此方法,从 completedFetches 取出数据并反序列化
*/
public List<ConsumerRecord<K, V>> fetchRecords(TopicPartition partition, int maxRecords) {
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) return Collections.emptyList();
// 从 CompletedFetch 中解析记录(反序列化 key/value)
return completedFetch.fetchRecords(maxRecords);
}
}
poll() 的内部执行顺序
// KafkaConsumer.java(简化)
public ConsumerRecords<K, V> poll(Duration timeout) {
acquireAndEnsureOpen();
try {
// 1. 提交 offset(如果 enable.auto.commit=true 且到了提交时间)
coordinator.poll(timer);
// 2. 如果 completedFetches 为空,发送新的 FetchRequest 并等待响应
if (!fetcher.hasAvailableFetches()) {
client.poll(timer); // 非阻塞轮询网络,最多等待 timeout
}
// 3. 从 completedFetches 排空,反序列化,返回给用户
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
// 4. 如果已有数据,发送预拉取请求(pipeline 效果)
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup(); // 尽快把下一批 FetchRequest 发出去
}
return new ConsumerRecords<>(records);
} finally {
release();
}
}
注意步骤 4 中的预拉取:在把当前批次数据返回给用户代码之前,poll() 已经把下一批 FetchRequest 发出去了。这实现了 Fetch 请求的流水线,用户处理当前批次数据的同时,下一批数据已经在传输中。
FetchSession:增量 Fetch(KIP-227)
问题背景
在 KIP-227 之前,每次 FetchRequest 都需要携带所有分区的完整信息(Offset、Leader Epoch 等),即使这些分区上次 Fetch 之后没有任何变化。对于一个消费者订阅了 1000 个分区的场景,这意味着每秒数百次 FetchRequest 都在传输大量冗余元数据。
FetchSession 的工作原理
Broker 为每个 Consumer 维护一个 FetchSession,记录上次 Fetch 的状态:
// FetchSessionHandler.java(clients 侧,简化)
class FetchSessionHandler {
private int sessionId = INVALID_SESSION_ID; // 初始为 0,表示无 Session
private long sessionEpoch = INITIAL_EPOCH; // Session 版本号
/**
* 构造下一次 FetchRequest:
* - 首次请求:携带所有分区(Full Fetch)
* - 后续请求:只携带发生变化的分区(Incremental Fetch)
*/
public FetchRequestData newBuilder() {
if (sessionId == INVALID_SESSION_ID) {
// Full fetch:携带所有分区
return new FetchRequestData(toSend = all_partitions, sessionId = 0, epoch = INITIAL_EPOCH);
} else {
// Incremental fetch:只携带新增、删除或 offset 变化的分区
Set<TopicPartition> added = computeAdded(); // 新订阅的分区
Set<TopicPartition> removed = computeRemoved(); // 取消订阅的分区
Set<TopicPartition> altered = computeAltered(); // offset 推进的分区
return new FetchRequestData(
toSend = altered ∪ added,
toForget = removed,
sessionId = sessionId,
epoch = nextEpoch()
);
}
}
}
Broker 侧维护 FetchSessionCache:
// FetchSession.scala(core/,简化)
class FetchSession(val id: Int, val privileged: Boolean, ...) {
// 缓存上次 Fetch 请求中所有分区的状态(offset、leaderEpoch 等)
val partitionMap: util.LinkedHashMap[TopicIdPartition, FetchSession.ENTRY_TYPE] = ...
def update(toReplace: util.ArrayList[..], toAdd: util.ArrayList[..], toForget: util.ArrayList[..]): FetchSession = {
// 增量更新分区状态,只处理发生变化的分区
toForget.forEach(partitionMap.remove)
toReplace.forEach { entry => partitionMap.put(entry.topicIdPartition, entry) }
toAdd.forEach { entry => partitionMap.put(entry.topicIdPartition, entry) }
this
}
}
效果:对于稳定的消费者,每次 FetchRequest 中变化的分区数量通常远小于总分区数。KIP-227 在大量分区场景下可以将 Fetch 请求的网络开销减少 90%+。
Level 3 · 规范怎么定义的(资深)
本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。
Level 4 · 边界与陷阱(所有人)
DelayedFetch 的完成条件
DelayedFetch 的 tryComplete() 在两种情况下被触发:
- 定时器到期:
maxWaitMs超时,不管有多少数据,都返回当前读到的内容(可能为空)。 - 新数据写入:每次
UnifiedLog.appendAsLeader()成功后,ReplicaManager会对该 Partition 的所有等待中的DelayedFetch调用checkAndComplete()。
// DelayedFetch.scala(简化)
class DelayedFetch(...) extends DelayedOperation(fetchMetadata.fetchParams.maxWaitMs) {
override def tryComplete(): Boolean = {
var accumulatedSize = 0
fetchMetadata.fetchPartitionStatus.foreach { case (topicIdPartition, fetchStatus) =>
val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
val endOffset = partition.highWatermark // 或 LEO,取决于 isolation
accumulatedSize += math.max(0, endOffset - fetchStatus.startOffsetMetadata.messageOffset).toInt * avgRecordSize
}
// 如果累计可读数据量 >= minBytes,立即完成
if (accumulatedSize >= fetchMetadata.fetchParams.minBytes) forceComplete()
else false
}
}