Kafka的大致框架是什么样

  • Kafka分为三层:生产者(Producer)、中转者(Server)、消费者(Consumer)
  • 生产者负责发送消息,服务端负责存储消息,消费者负责拉取消息
  • 其中服务端是由多个Broker节点组成,我们常说的主题Topic在Broker节点上。当然,Topic是一个逻辑概念,实际物理存储是主题分片,也就是Partition

如何获取topic主题的列表

  • Kafka提供了获取主题列表的接口,可以通过kafka-topic.sh这个工具获取,如果要在业务中获取,主流语言比如Java、Golang都支持,都可以直接调用KafkaAdminClient这个接口来获取
  • kafka-topics.sh 脚本主要负责 topic 相关的操作。它的具体实现是通过 kafka-run-class 来调用 TopicCommand 类,并根据参数执行指定的功能

通过kafka-topics.sh获取:

1
./kafka-topics.sh --list --bootstrap-server localhost:9092

通过KafkaAdminClient获取,Java和Golang都是通过AdminClient来获取的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static final String NEW_TOPIC = "topic-test2";
private static final String brokerUrl = "localhost:9092";

private static AdminClient adminClient;

@BeforeClass
public static void beforeClass(){
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
adminClient = AdminClient.create(properties);
}

@AfterClass
public static void afterClass(){
adminClient.close();
}

@Test
public void createTopics() {
NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);
Collection<NewTopic> newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);
}

有了topic,为什么还需要partition/为什么消息队列还需要分区

  • 如果不进行分区,我们消息写数据只能到一个节点上,这样的话可能会导致单点服务器负载过高,通过分区可以把数据均匀地分布在各个节点上。因此分区带来了负载均衡和横向扩展的能力
  • 发送消息的时候可以根据分区分配的原则落在不同的Kafka服务器上,提高了并发写消息的能力,消费消息的时候又和消费者绑定了关系,可以从不同节点的不同分区消费消息,提高了读消息的能力
  • 分区引入了副本,保证了kafka的高可用和高持久性

partition是逻辑概念还是物理概念

partition是物理概念,数据是写入partition文件中

介绍一下分区的分配策略

消费端partition.assignment.strategy这个配置:

  • Range Assignor:基于范围的分配策略,将分区按照范围分配给消费者
  • RoundRobin Assignor:基于轮询的分配策略,分区均匀地分配给消费者
  • Sticky Assignor:优先保持当前的分配状态,并尽量减少在再平衡过程中的分区移动
  • CooperativeStickyAssignor:和Sticky Assignor策略基本是一样的,区别在于该协议将原来的一次大的全部分区重平衡,改成多次小规模分区重平衡。简单理解就是渐进式重平衡

创建topic时如何将分区(Partition)放置到不同的Broker中

  • partition是存放在Broker节点上,如果单个Broker肯定是放在一个Broker上
  • 如果是多个Broker,则partition是放在不同的Broker上。具体地,一个partition只对应一个Broker,一个Broker可以拥有多个partition

img

规则:

  1. 先随机选取一个Broker,比如Broker11
  2. 将主题对应的第一个分片,放入Broker11中,即TopicB的0号分片放入到Broker11中
  3. Broker依次往后放TopicB的后续分片,比如TopicB的1号分片放入到Broker12中,Topic在我们的例子中只有2个,如果有三个下一个则放入到Broker10中

消息存入Partition的规则/Kafka分区分配的规则

  • 如果指定了partition则发送到特定的partition,但是一般情况下,业务不需要感知partition
  • 如果没有指定partition,但是指定了一个key,那么根据key的hash值对partition数目取模来决定是哪个partition
  • 如果没有指定partition,也没有指定key,那么就采取轮询调度算法,也就是把每一次来自用户的请求轮流分配给partition

Kafka服务端可以接收的消息最大默认是多少字节,如何修改

Kafka可以接收的最大消息默认是1MB,如果想调整它的大小,可以在Broker修改配置参数:message.max.bytes的值

客户端如何连接集群

每个Broker都有其他Broker的信息,这是一个大前提

  1. 访问任意一台Broker,申请获得路由信息
  2. 得到所有的Broker的信息列表,返回路由信息
  3. 根据连接规则连接到具体的Broker,即算出来具体发送到哪一个partition,然后访问到对应的Broker

img

kafka的topic中的partition数据是如何存储到磁盘中的

kafka多层划分

  1. 首先是topic划分,不同的topic可以看成不同的小数组,这些小数组可以分别存放在不同的Broker上
  2. 其次,每个topic还做了切分,分成了多个partition,也就是一个主题可以分成多个主题分片
  3. 最后,每个partition还做了进一步拆分,也就是一个partition对应了多个不同的文件,这些文件是分离的,是:
    • .log文件,消息本身,记录了数据
    • .timeindex文件,时间索引,可以通过时间对.log文件做索引查询
    • .index文件,偏移索引,可以通过位置对.log文章做索引查询
    • 查找的时候先去.index或者.timeindex文件中查找,再去.log文件中查找。因此这两个文件需要加载进入内核内存,所以不能太多

partition文件细节

img

index 和 log 文件以当前 segment 的第一条消息的 offset 命名

1
2
3
4
5
6
7
8
9
10
11
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex

00000000000000170410.index
00000000000000170410.log
00000000000000170410.timeindex

00000000000000239430.index
00000000000000239430.log
00000000000000239430.timeindex
  • 一个Topic分为多个Partition,一个Partition分为多个Segment
  • 每个Segment对应三个文件:偏移量索引文件、时间戳索引文件、消息存储文件
  • Kafka中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项,也就是说Offset是不连续的,是一个区间范围

以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示

img

每个索引项占用 8 个字节,分为两个部分:

  1. relativeOffset: 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset的值
  2. position: 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节
  • 为什么要使用相对偏移量relativeOffset,而不使用绝对偏移量offset呢?

消息的偏移量(offset)占用8个字节,也可以称为绝对偏移量。索引项中没有直接使用绝对偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间

  • 为什么不采用连续的Offset,而要采用区间范围的形式?

为了降低内存空间的占用,连续的Offset会占用更多的内存空间

通过查找理解索引

img

  1. 如果我们要查找偏移量为23的消息,首先通过二分法在偏移量索引文件中找到不大于23的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息

  2. 如果要查找偏移量为 268 的消息,那么应该怎么办呢?

首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息

  1. 那么又是如何查找 baseOffset 为 251 的日志分段的呢?

这里并不是顺序查找,而是用了跳跃表ConcurrentSkipListMap的结构

Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段

在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset的最大索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息

  1. 总结

img

Kafka如何清理数据/Kafka数据堆积过多怎么办

  • 可以用基于时间的保留策略,这种策略允许用户指定消息的保留时间(如7天)。超过时间将自动删除
  • 也可以用基于大小的保留策略,Kafka允许用户指定日志的最大容量,一旦日志的大小超过了配置的值,Kafka将开始删除最早的消息