Kafka顺序写

前面已经提到Kafka写入数据其实是添加到每个Partition的末端,也就是写入对应的磁盘文件,设计简单高效

下面介绍具体的机制

Kafka兼顾了高性能和低复杂度,采用顺序写机制(直接顺序写入磁盘),将写入模式设计成了顺序写,这里要注意一点,写磁盘不一定是直接刷盘的,只是说提交给了操作系统,这里还是有丢失数据的可能性,只是相对于先写Kafka应用程序内存,已经减少了一个可能遗失的环节

顺序写优势:

  1. 高效的磁盘利用:顺序写性能优于随机写
  2. 简单的存储管理:
    • 顺序写入简化了日志段的管理和消息的追加操作
    • 日志文件按顺序组织,便于快速查找和读取消息
  3. 可靠性和一致性:
    • 顺序写入有助于保证消息的可靠性,因为一旦写入日志文件(.log),消息就不会丢失
    • 消费者可以通过偏移量准确地读取数据,确保消息处理的顺序和一致性

为什么顺序写这么快?

  • 因为磁盘寻址是需要转动的,随机写入每一次寻址都要机械活动,很花费时间。而顺序写入只需要一次寻址

Kafka页缓存

Kafka利用操作系统自带的Page cache,来实现一定程序顺序读写内存

  • Page cache可以看作是热点磁盘数据的内存缓存,当数据写入的时候,是先写入Page cache,后续由操作系统刷入磁盘,这样性能就会提高很多
  • 同时,如果查询的时候发现Page cache中有数据,也不会从磁盘中读取,这样读取性能也会提升很多

Page cache数据和磁盘是如何同步的?

  1. 当空间内存不够时,也就是说低于某个阈值,此时将Page cache刷入磁盘并释放Page cache
  2. 当脏页(写入数据后,Page cache会先标记为脏页)在内存驻留超过一定时间
  3. 用户主动调用sync()和fsync()
plaintext
1
2
sync:将所有修改过的缓冲区写入队列,不等待磁盘操作结束
fsync:将所有修改过的缓冲区写入队列,等待磁盘操作结束,安全性高

Kafka零拷贝

一般的数据拷贝:

img

从系统调用和数据拷贝次数来分析:

涉及到2次系统调用,因此有4次用户态和内核态切换
涉及到4次数据拷贝过程

零拷贝:

img

所谓”零”拷贝,就是将数据从磁盘文件复制到网卡的过程中,不需要经过用户态的应用程序

具体流程:

  1. 调用sendfile
  2. 操作系统将数据从磁盘通过DMA加载到内核空间的缓冲区
  3. 操作系统将数据的描述符拷贝到Socket缓冲区。Socket缓冲区仅仅拷贝一个描述符进去,不拷贝数据
  4. 操作系统直接将数据从内核空间的缓冲区通过SG-DMA拷贝到网卡中

以上共涉及1次系统调用,也就是2次用户态和内核态切换。拷贝次数从4次变成了2次

Kafka多层次

kafka多层划分

  1. 首先是topic划分,不同的topic可以看成不同的小数组,这些小数组可以分别存放在不同的Broker上
  2. 其次,每个topic还做了切分,分成了多个topic,也就是一个主题可以分成多个主题分片
  3. 最后,每个partition还做了进一步拆分,也就是一个partition对应了多个不同的文件,这些文件是分离的,是:
    • .log文件,消息本身,记录了数据
    • .timeindex文件,时间索引,可以通过时间对.log文件做索引查询
    • .index文件,偏移索引,可以通过位置对.log文章做索引查询
    • 查找的时候先去.index或者.timeindex文件中查找,再去.log文件中查找。因此这两个文件需要加载进入内核内存,所以不能太多

partition文件细节

img

index 和 log 文件以当前 segment 的第一条消息的 offset 命名

plaintext
1
2
3
4
5
6
7
8
9
10
11
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex

00000000000000170410.index
00000000000000170410.log
00000000000000170410.timeindex

00000000000000239430.index
00000000000000239430.log
00000000000000239430.timeindex
  • 一个Topic分为多个Partition,一个Partition分为多个Segment
  • 每个Segment对应三个文件:偏移量索引文件、时间戳索引文件、消息存储文件
  • Kafka中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项,也就是说Offset是不连续的,是一个区间范围

以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示

img

每个索引项占用 8 个字节,分为两个部分:

  1. relativeOffset: 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset的值
  2. position: 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节
  • 为什么要使用相对偏移量relativeOffset,而不使用绝对偏移量offset呢?

消息的偏移量(offset)占用8个字节,也可以称为绝对偏移量。索引项中没有直接使用绝对偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间

  • 为什么不采用连续的Offset,而要采用区间范围的形式?

为了降低内存空间的占用,连续的Offset会占用更多的内存空间

通过查找理解索引

img

  1. 如果我们要查找偏移量为23的消息,首先通过二分法在偏移量索引文件中找到不大于23的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息

  2. 如果要查找偏移量为 268 的消息,那么应该怎么办呢?

首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息

  1. 那么又是如何查找 baseOffset 为 251 的日志分段的呢?

这里并不是顺序查找,而是用了跳跃表ConcurrentSkipListMap的结构

Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段

在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset的最大索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息

Kafka批量操作

Kafka有两个地方用到了批量操作:

  1. 批量生产:当数据量累计到Batch(默认是16kb),再发送给Broker
  2. 批量消费:本质是一次多拉几条消息,一起消费
  3. Kafka服务端也会将多条消息先写入Page cache中,然后数据再刷盘

Kafka数据压缩

通过压缩数据的方式可以将数据变小,以节约带宽,可以在发送方进行数据压缩,也可以在Broker进行压缩

数据压缩适用于CPU比较富裕,但是带宽相对不足的情况,而消息的传输大多数是符合这个情况