Skip to main content

Kafka为什么这么快?

作者:程序员马丁

在线博客:https://open8gu.com

note

大话面试,技术同学面试必备的八股文小册,以精彩回答应对深度问题,助力你在面试中拿个offer。

回答话术

Kafka 之所以受到众多人的青睐,其中一个重要原因无疑是 Kafka 非常“快”。一般来说,决定一个消息队列性能“快”还是“慢”最关键的两点在于磁盘 IO 和网络传输,那么 Kafka 是怎么解决这两个问题的呢?

1. 磁盘

众所周知,磁盘在计算机领域的操作性能是很慢的。因为正常的机械硬盘写入是需要通过寻址、旋转和数据传输三个步骤,固态硬盘需要寻址和数据传输两个步骤。

为此,Kafka 利用顺序写和零拷贝两大技术提升读写速度。

1.1. 顺序写

在 Java 中,我们经常说顺序写比随机写快,是因为顺序写可以减少寻址和旋转,基本上只剩下数据传输,所以顺序写可以大幅度提升写入的性能。

那么在 Kafka 中,什么场景下使用顺序写呢?

在 Kafka 中,我们知道单个 Partition 是有序的、不可变的序列,新的消息会不断追加到 Partition 的末尾。但实际上 Partition 是一个逻辑概念,新的消息是写入到 Segment 中,一个 Partition 拥有多个 Segment,每个 Segment 由一个存储消息的消息日志 Log 文件和两个索引文件组成,Kafka 对 Segment 相关文件的写入,这就是顺序写文件。

当满足日志文件或索引文件超过一定大小,或当前时间-文件创建时间大于规定的时间间隔时(这些条件都是参数设置的),就会切分日志文件和索引文件,产生一个新的 Segment,新的 Segment 用当前最新的 Offset 作为名称。第一个 Segment 存储的第一条消息的起始序号为 0,因此文件名为 20 位长度的 0 来命名。

下图中,Partition0 的 Segment0 存储的第一条消息的 Offset 为 0,最后一条消息的 Offset 为 1234567890,第二个 Segment 的初始消息的 Offset 为 1234567890。

image.png

1.2. 零拷贝(Zero-Copy)

零拷贝技术其实是一个比较通用的技术,作用是减少数据在内存之间的拷贝次数,提高数据传输效率。它被很多框架使用,例如 Rocket MQ、Netty、Spark、Flink 等,Kafka 也不例外。我们可以对比一下,在消费者消费消息重场景中,使用零拷贝和使用传统方式有何区别。

下图左边是传统方式的一个消费流程,下图右边是零拷贝的一个消费方式。我们可以发现在零拷贝技术的情况下,正常的一次消费,可以比传统方式减少两次 CPU 拷贝,在高并发的情况下,就可以减少大量的 CPU 拷贝,进而降低 CPU 使用率提高性能。

image.png

1.3. 页缓存(Page Cache)

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。说白了就是,把磁盘中的数据缓存在内存中,把磁盘的访问变成对内存的访问

在 Kafka 中,当 Producer 发送消息到 Broker 时,Broker 会按 Offset 写入数据,此时数据都会先写入 Page Cache。Consumer 消费消息时,Broker 使用零拷贝将数据从 Page Cache 传输到 Broker 的 Socket Buffer,再通过网络传输。

Leader 与 Follower 之间的同步,和 Consumer 消费数据的过程是同理的。

Page Cache 中的数据会随着操作系统的刷盘任务写回到磁盘。

Kafka 支持同步刷盘和间断性强制刷盘,同步刷盘可提提高消息的可靠性。

另外,如果 Consumer 要消费的消息不在 Page Cache 里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入 Page cache,以方便下一次读取。

如果 Producer 的生产速率与 Consumer 的消费速率相差不大,那么就能几乎只靠对 Broker 的 Page Cache 的读写完成整个生产和消费过程,磁盘访问非常少。

image.png

2. 网络

Kafka 在网络方面也做了很多优化,例如:支持 Reactor 模式的网络模型、客户端支持批量处理和压缩处理等

2.1. 批量处理

在 Kafka 的 Producer 向 Broker 发送消息时,并不是一条消息一条消息的发送,而是批量发送

Producer 会将消息放到 RecordAccumulator(消息累加器)中, RecordAccumulator 中针对每个分区都有对应的一个双端队列,队列中放的是 ProducerBatch ,每当有消息来的时候,追加到队列尾部, Sender 线程从队列头部读取消息,并发送到对应的 Broker 中

image.png

2.2. 压缩处理

Kafka 在很多地方都可以利用压缩算法对数据进行压缩,进而减少磁盘和网络消耗。

例如在 Producer 向 Broker 写入数据时,Consumer 向 Broker 读取数据时。在 Producer、Broker、Consumer 使用同一种压缩算法的场景下,Kafka 甚至可以不用解压缩,最终在 Consumer 拉取到消息时才解压,这样节省了大量的 CPU、网络和磁盘开销。

Kafka 支持多种压缩算法:gzipsnappylz4ZStandard

3. 分区并发

在 Kafka 中,一个 Topic 可以分成多个 Partition(分区),每个 Paritition 类似于一个队列,保证数据有序。在同一个消费者分组下的不同 Consumer 并发消费 Paritition,一个 Paritition 最多只能有一个消费者(不然没法保证单 Paritition 有序),所以 Paritition 是调优 Kafka 消费性能的最小单元,每增加一个 Paritition 就增加了一个消费并发。

由于不同消费者组中的消费者可以并发消费 Paritition** 中的消息,因此我们可通过提高并发度来提高消费效率**

那是不是 Paritition 越多越好?并不是,Paritition 越多需要的资源越多,当 Broker 出现问题的时候,恢复的时间越久。

例如,假设某个 Partition 的最大传输速度为 1,且整个 Kafka 集群共有 3 个 Broker,每个 Broker 最多支持 3 个 Partition 的最大传输速度,因此整个集群的最大传输速度为 3 * 3 = 9。如果在不增加资源的情况下,将 Partition 的数量增加到 18 个,那么每个 Broker 将负责 6 个 Partition,每个 Partition 无法达到最大传输速度,只能以 1/2 的传输速度进行传输,因此无法再提升整个集群的传输速度。因此,在增加 Partition 数量时,需要考虑到每个 Broker 的资源上限,以避免达不到最大传输速度

随着在每个 Broker 上使用的 Partition 数量增加,需要打开的文件句柄数也会增加(受操作系统限制),同时磁盘的读取速度存在上限,甚至可能因 Partition 增多而导致原本的顺序读写转变为随机读写。

问题详解

1. 顺序写 Demo

这里给一个简单的例子,下面这段程序将会读取 file.text 文件的内容,并且顺序写入到 file_mapping.text 文件里。

package com.big.demo.kafka;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.LinkedHashMap;
import java.util.Map;

public class SeqWriterDemo {
public static void main(String[] args) throws Exception {
// 文件名
final String fileName = "file.txt";
// 文件大小
final long fileSize = 1024 * 1024 * 500;
// 文件内容索引
final String fileMapping = "file_mapping.txt";
writeFileOfSequence(fileName, fileSize, fileMapping);
readFileOfSequence(fileName, fileSize, fileMapping);
}

/**
* 顺序写文件
*
* @param fileName
* @param fileSize
* @param fileMapping
*/
public static void writeFileOfSequence(String fileName, long fileSize, String fileMapping) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile(fileName, "rw");
FileChannel fileChannel = randomAccessFile.getChannel();

// 开启一片内存映射,映射大小为文件大小
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);

// 当前写入的文件位置
int currentPosition = 0;
// 写入后,下一个文件位置
int nextPosition;
final String content = "[abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ]";
Map<Integer, Integer> positionMap = new LinkedHashMap<>();

long start = System.currentTimeMillis();
do {
// 按位置写入内容
mappedByteBuffer.position(currentPosition);
mappedByteBuffer.put(content.getBytes());
nextPosition = mappedByteBuffer.position();
// 记录位置映射(记录当前位置和下一个位置,nextPosition - currentPosition即为内容长度)
positionMap.put(currentPosition, nextPosition);
currentPosition = nextPosition;
} while (currentPosition + content.getBytes().length <= fileSize);

mappedByteBuffer.force();
fileChannel.close();
randomAccessFile.close();
long end = System.currentTimeMillis();
System.out.printf("顺序写耗时: %s%n", (end - start));

// 把映射信息记录到一个文件(kafka也是这样的)
StringBuilder mappingInfo = new StringBuilder();
for (Map.Entry<Integer, Integer> item : positionMap.entrySet()) {
String line = String.format("%d,%d\n", item.getKey(), item.getValue());
mappingInfo.append(line);
}
//这边不是顺序写了 和,正常写 映射信息 ,所以这边会卡一下
writeText(fileMapping, mappingInfo.toString(), false);
System.out.printf("正常写 映射耗时: %s%n", (System.currentTimeMillis() - end));
}

/**
* 使用FileWriter写文本文件
*
* @param fileName 文件名
* @param content 内容
* @param append 是否追加形式写文件
*/
public static void writeText(String fileName, String content, boolean append) throws Exception {
FileWriter writer = new FileWriter(fileName, append);
writer.write(content);
writer.close();
}

/**
* 顺序读文件
*
* @param fileName
* @param fileSize
* @param fileMapping
*/
public static void readFileOfSequence(String fileName, long fileSize, String fileMapping) throws Exception {
Map<Integer, Integer> positionMap = new LinkedHashMap<>();
BufferedReader reader = new BufferedReader(new FileReader(fileMapping));
String line;
while ((line = reader.readLine()) != null) {
String[] lineSplit = line.split(",");
if (lineSplit.length == 2) {
positionMap.put(Integer.parseInt(lineSplit[0]), Integer.parseInt(lineSplit[1]));
}
}

long start = System.currentTimeMillis();
try (RandomAccessFile randomAccessFile = new RandomAccessFile(fileName, "rw")) {
FileChannel fileChannel = randomAccessFile.getChannel();

// 开启一片内存映射,映射大小为文件大小
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);

// 按文件内容索引读取内容
for (Map.Entry<Integer, Integer> item : positionMap.entrySet()) {
int length = item.getValue() - item.getKey();
byte[] buff = new byte[length];

mappedByteBuffer.get(buff, 0, length);
}
}
System.out.printf("顺序读耗时: %s%n", (System.currentTimeMillis() - start));
}
}

2. 零拷贝 Demo

大部分读者对于零拷贝可能只有一个概念,实际上,虽然零拷贝确实是操作系统层面的功能,但是 JDK 已经在上层进行了封装,我们直接使用对应的 API 就可以做到这样的效果了。

比如,在下面这个例子中,我们用零拷贝把 from.txt 里的数据拷贝到 to.txt

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;

public class ZeroCopyDemo {
public static void main(String[] args) throws IOException {
RandomAccessFile fromFile = new RandomAccessFile("from.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("to.txt", "rw");
FileChannel toChannel = toFile.getChannel();
fromChannel.transferTo(0, fromChannel.size(), toChannel);

fromFile.close();
toFile.close();
}
}

3. 生产者批量发送的一些参数

  • batch.size:用于指定 ProducerBatch 的大小,默认:16KB。
  • linger.ms:每次发送的时间间隔,默认:0。生产者会在 ProducerBatch 被填满或等待时间超过这个值的时候发送出去。增大这个参数的值,会增加消息的延迟,但是可以增加一定的吞吐量。