Kafka如何做到消息的有序性?
作者:程序员马丁
在线博客:https://open8gu.com
大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。
回答话术
1. 基础知识
Kafka 只保证 Topic 的分区内消息有序,不保证整个 Topic 中的消息有序,除非只有一个分区。
2. Kafka 是如何做到分区内消息有序的?
Kafka 的一个 Topic 是由多个 Partition 组成,一个 Topic 下的不同 Partition 包含的消息是不同的,Partition 中的每个消息都会有个特定的偏移量 offset,Kafka 利用 offset 来保证分区内的顺序性。
下图中有三个 Partition ,每个 Partition 的 offset 都是从 0 开始,生产者每次投递消息时,offset 按照顺序追加。
3. Kafka 消费模型
正常情况下,Kafka 一个分区同一个消费组只能有一个消费者。
消费者在消费 Kafka 消息时,需要维护一个当前消费的 offset 值,以及一个已提交的 offset 值。
当前消费的 offset 值表示消费者正在消费的消息的位置,已提交的 offset 值表示消费者已经确认消费过的消息的位置。消费者在消费完一条消息后,需要提交 offset 来更新已提交的 offset 值。
提交 offset 的目的是为了记录消费进度,以便在消费者发生故障或重启时,默认情况下能够从上次消费的位置继续消费,也可选从最开始的位置开始消费。
流程图如下所示:
问题详解
1. 订单逻辑代码示例
在实际业务中,要做到生产者发消息时消息的有序性,生产者发消息的时候应该指定 Key 进行投递到 Partition,相同的 Key 会发送到一个 Partition 中。
利用 Kafka Partition 内是有序的,从而保证消息的顺序性。
例如在订单的场景中,我们需要保证用户每个订单在进行支付/退款等操作是顺序的,就可以根据订单 ID 当作 Key 进行投递,相同的 Key 每次都会到同一个 Partition 中,从而做到同个订单的每个操作都是顺序的。
生产者 Demo 代码(基于 Spring Kafka)如下所示:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
订单下单、取消、付款以及退款等逻辑代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
@Controller
public class OrderController {
private static final String TOPIC_ORDER = "topic_order";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 下单
*/
@GetMapping("/create")
public void create(String orderId) {
//send 方法参数, 参数1: topic 参数2: key 参数3: 内容
kafkaTemplate.send(TOPIC_ORDER, orderId, "创建订单:" + orderId);
}
/**
* 取消
*/
@GetMapping("/cancel")
public void cancel(String orderId) {
kafkaTemplate.send(TOPIC_ORDER, orderId, "取消订单:" + orderId);
}
/**
* 付款
*/
@GetMapping("/pay")
public void pay(String orderId) {
kafkaTemplate.send(TOPIC_ORDER, orderId, "订单:" + orderId + " 付款");
}
/**
* 退款
*/
@GetMapping("/refund")
public void refund(String orderId) {
kafkaTemplate.send(TOPIC_ORDER, orderId, "订单:" + orderId + "退款");
}
}
Kafka 执行逻辑示意图:
因为 Kafka 一个分区同一个消费组只能有一个消费者,所以消费者正常消费即可做到顺序性。
消费者 Demo 代码如下所示:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class OrderConsumer {
/**
* 这边是手动 Ack
*
* @param consumer
* @param acknowledgment
*/
@KafkaListener(topics = "topic_order", groupId = "group1")
public void group1(ConsumerRecord<String, String> consumer, Acknowledgment acknowledgment) {
System.out.println("消费者 topic名称:" + consumer.topic() +
", key:" + consumer.key() +
", 消息内容:" + consumer.value() +
", 分区:" + consumer.partition() +
", 所属分区的偏移量" + consumer.offset());
acknowledgment.acknowledge();
}
}
Kafka offset 提交示意图:
2. 扩展知识
2.1. offset 存在哪里?
offset 的实际存储位置都是在 Kafka 的一个内置 Topic 中:__consumer_offsets。这个 Topic 有 50 个(可配置)分区,每个分区存储一部分消费组的 offset 信息。
Kafka Broker 会根据消费组 ID 和 Topic 名称来计算出一个哈希值,并将其映射到 __consumer_offsets 主题的某个分区上。
备注:0.9.0 版本之前 offset 存在 ZooKeeper 中。
2.2. 为什么单个分区不能并行消费?
正常情况,一个分区只接受同消费组的一个消费者,当消费者数量大于分区数量时,会导致消费者闲置。
如下图所示:
那么为什么会这样呢?
我们举个例子,假设 Consumer0、Consumer1、Consumer2 共同消费 Partition0,当 Consumer0 提交 offset 为 12,Consumer2 提交 offset 为 8,Consumer3 从 Partition0 中获取的 offset 是 8,这就会导致出现重复消费。
如果要解决这个问题,就要引入更复杂的算法去解决这个问题。这还是正常消费的情况,如果有 Consumer 出现消费失败,还会出现更复杂的情况,所以一般不这么用。