Kafka发送一条消息的流程?
作者:程序员马丁
在线博客:https://open8gu.com
大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。
回答话术
Kafka 消息发送涉及两个关键流程:第一个是消息在生产者客户端的发送流程,第二个则是消息到达 Broker 后,Broker 将对消息的处理流程。
1. Kafka Producer 发送流程
当我们使用 Kafka 客户端发送消息时,表面上我们只是调用了一个方法,但是背后是一条复杂的消息推送流程。这个流程由包括生产者 KafkaProducer、生产者拦截器 ProducerInterceptor、序列化 Serializer、分区器 Partitioner、消息累加器 RecordAccumulator 和消息发送线程 Sender 在内的多个核心组件共同完成。
备注:本篇流程以及源码均基于 kafka-client 的 2.3.1 版本源码。
2. Kafka Broker 处理流程
消息到达 Broker 后,其处理流程会根据生产者中 acks
的配置而异。生产者的 acks
配置有三种策略,具体如下:
- acks=0:无需等待生产者发送数据在 Broker 落盘即返回响应,该方案可靠性较差,但效率高。
- acks=1:需要等待 Leader 副本收到数据后返回响应,该方案可靠性一般,效率一般。
- acks=-1 或 all:需要等待 Leader 和 ISR(In-Sync-Replica)集合中的所有节点对齐数据后返回响应,该方案可靠性高,但效率低。
在默认情况下,Kafka 的 acks
参数为 1,也就是默认使用第二种策略,在可靠性和效率方面较为均衡。前两种策略比较好理解,因此暂时略过,我们重点介绍一下第三种策略的实现流程:
- Producer 客户端发送消息至 Leader 副本:生产者将消息发送到指定主题的 Leader 副本;
- 消息被追加到 Leader 副本的本地日志,并更新日志的偏移量 Offset 值:Leader 副本将消息追加到本地日志中,同时更新 Leader 副本的偏移量 Offset;
- Leader 副本应答给生产者:Leader 副本将应答(ACK)发送给生产者,表示消息已成功追加到本地日志;
- Leader 副本等待 ISR 中所有副本的应答:Leader 副本等待 ISR 中的所有副本,包括 Leader 本身,都应答成功;
- Follower 副本同步消息数据:Leader 副本将消息发送给 ISR 中的所有 Follower 副本,并等待它们同步消息数据;
- ISR 中所有副本应答给 Leader:ISR 中的所有副本都成功同步消息数据后,它们向 Leader 副本发送应答;
- Leader 副本应答给生产者:Leader 副本收到 ISR 中所有副本的应答后,再次向生产者发送应答,表示消息在 ISR 中的所有副本都已同步完成;
- 生产者继续发送下一条消息:生产者收到 Leader 副本的最终应答后,可以继续发送下一条消息。
kafka Broker 处理消息流程图如下所示:
问题详解
咱们在这个章节详细讲解下 Kafka Producer 发送流程。
1. KafkaProducer
KafkaProducer 的 send
方法会调用 ProducerInterceptors 的 onSend
方法进行拦截。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 拦截器
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
ProducerInterceptors 内部一个 ProducerInterceptor 集合,实际就是对 ProducerInterceptor 的 onSend
方法进行拦截。
doSend
方法是 Kafka 生产者用于处理发送消息到主题的过程的核心方法。以下是方法执行的主要步骤:
- 检查生产者是否关闭:验证生产者是否已关闭,否则抛出异常;
- 等待元数据:确保指定主题的元数据可用。如果不可用,它会等待元数据被刷新;
- 序列化键和值:使用配置的键和值序列化程序对生产者记录的键和值进行序列化;
- 识别分区:根据序列化的键和值以及可用的集群信息计算应将消息发送到的分区;
- 设置只读标头:确保生产者记录的标头已设置为只读;
- 计算序列化大小:估算消息的序列化大小;
- 确保有效的记录大小:检查消息的序列化大小是否超过配置的最大大小;
- 设置时间戳:设置记录的时间戳。如果未提供,则使用当前时间;
- 记录和创建回调:记录记录的详细信息并为处理发送操作的结果创建回调。此回调包括拦截器处理;
- 事务处理:如果生产者是事务性的,则可能将分区添加到当前事务;
- 追加到记录累加器:使用记录累加器将记录追加到结果中,并获取一个包含用于跟踪发送操作完成的未来;
- 如果需要唤醒发送方:如果批次已满或创建了新批次,则唤醒发送方线程;
- 处理异常:处理在过程中可能发生的各种异常,如 ApiException、InterruptedException、BufferExhaustedException 和通用的 Exception。处理包括通知拦截器有关错误的信息和记录错误。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
// Key 序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
// Value 序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 确定消息发往哪个分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 针对消息累加器进行累加
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}