Kafka消费者
Kafka消费者是拉模式还是推模式/Kafka消息的消费模式
Kafka采用拉模式拉取消息,采用拉模式可以使每个消费者以自身的消费能力去消费。缺点是如果Broker没有提供可供消费的消息,将导致Consumer不断在循环中轮询,直到新消息到达。
为了避免这点,kafka消费者可以在消费数据时传递timeout参数,在这个时间范围内轮询,直到有数据到达或者超时再返回
消费者故障,出现活锁问题如何解决
首先要理解什么是活锁:
消费者持续的维持心跳,但因为异常进行消息处理,或者消息处理时间长卡住了,这种情况下,这个消费者就会一直持有分区,该分区的消息就无法得到处理
要解决这个问题,需要有活锁检测机制
可以使用最大拉取间隔参数来解决,即max.poll.interval.ms,如果消费者轮询间隔大于了这个值,消费者就会离开分区,这样其他消费者就可以接管对应分区
有消费者为什么还需要消费者组
- 对主题分片的分配问题,让每个分片都有消费者处理,不至于所有消费者处理同一个分片
- 面对主题分片的变化,消费者组可以自动调整,也就是再平衡
- 对于业务开发,有了消费者组就可以只关心主题维度,而不用关心分片维度,很大程度降低了理解和应用难度
再平衡机制
再平衡机制简介
消费者组再平衡机制是一个关键机制,用于管理和分配主题分区给消费者组的各个消费者。再平衡机制可以保证数据负载在各个消费者之间均匀分布,并在消费者加入或者离开时自动调整分区的分配。
有一些分区策略:范围分配、轮询分配、粘性分配、合作粘性分配。其中合作粘性分配和粘性分配一样都是尽可能减少变动,不同点是合作粘性分配下,未受变动的消费者可以继续消费主题
什么情况下会再平衡
- 新消费者加入
- 消费者离开
- 主题分区变化
再平衡的过程
- 暂停消费:在再平衡过程中,消费者会暂时对消费的消费,防止在重新分配区间发生数据丢失或者重复
- 触发再平衡:由消费者组协调器(通常是Kafka集群中的一个Broker)触发再平衡
- 分配分区:消费者配合协调器根据当前消费者组的成员重新分配主题的分区
- 消费者完成分工:重新分配完成后,所有消费者会从协调器拿到新的分配情况
- 恢复消费:消费者收到新的分配后,恢复消费,开始处理被分配到的新的分区
这里有一个问题,我们可以看到第一步暂停会导致服务中断和延迟,好在Kafka提高了不止一种策略来解决这个问题
其实就是之前提到的消费者组消费分区策略
消费端partition.assignment.strategy这个配置:
- Range Assignor:基于范围的分配策略,将分区按照范围分配给消费者
- RoundRobin Assignor:基于轮询的分配策略,分区均匀地分配给消费者
- Sticky Assignor:优先保持当前的分配状态,并尽量减少在再平衡过程中的分区移动
- CooperativeStickyAssignor:和Sticky Assignor策略基本是一样的,区别在于该协议将原来的一次大的全部分区重平衡,改成多次小规模分区重平衡。简单理解就是渐进式重平衡
从再平衡的角度,其实可以分为两大块,Eager Rebalance和Incremental Rebalance
Eager Rebalance(急切再平衡)
Range Assignor、RoundRobin Assignor、Sticky Assignor都属于Eager Rebalance。
可以理解为急切的再平衡,因此一旦开启所有消费者都会暂停从Kafka消费并放弃其分区的分配资格
在这个时间段,类似于“stop the world”过程
缺点:
- 消费被迫中断,再平衡期间消费者不工作
- 当再平衡结束后,原先的这些消费者后面会重新加入消费者组获得新的分区分配,但不一定是原有的分区分配,也就是分配格局变了
Incremental Rebalance(增量再平衡)
CooperativeStickyAssignor是2.3版本引入的一个新的优化方案。在此模式下,只有部分分区会从某个消费者移动到另外一个消费者,其他不受再平衡机制影响的Kafka消费者会继续处理数据而不中断
当然,一次执行下来的结果可能是不均匀的,因此需要多次再平衡,直到到达稳定的分配,因此也叫增量再平衡
相比Eager Rebalance,消费者不会全部暂停,但是完成再平衡的时间要长一些
Group Coordinator
Group Coordinator是Kafka负责管理消费者组的一个协调器,协调器运行在Broker服务器上
每个消费者组都有一个Group Coordinator,负责消费者组内的消费者管理和偏移量管理
消费者管理
每一个Broker节点在启动的时候,都会创建和开启Coordinator组件。换句话说,Coordinator是存在于每个Broker上的组件,那这些Group Coordinator是如何分工?
每一个Consumer Group都有一个groupid,根据groupid的Hash来确定哪个Broker作为它的Coordinator
具体机制:
- 当消费者启动时,它会向Kafka集群发送请求寻找对应的Group Coordinator
- 找到后向其发送加入消费者组请求,以加入消费者组
- Group Coordinator收到请求后,会选择一个消费者作为Leader,这个Leader消费者会根据从Group Coordinator拿到的所有消费者信息进行分配(比如偏移量offset,这样分区重分配后,各个消费者读取自己对应分区的offset,在新的分区上继续前任的工作),并向Group Coordinator发送SyncGroup请求,以完成分区分配
- Group Coordinator还负责接受消费者的心跳信息,以检测消费者的存活状态。如果一个消费者在一段时间内没有发送心跳,Coordinator会认为其已经死亡,并触发再平衡。这个时间是可以配置的,建议时间在10s内
偏移量管理
Group Coordinator还负责管理消费者组的偏移量提交到存储,消费者可以定期提交自己消费到的偏移量,Coordinator会将偏移量存储到自己的内部主题中(_consumer_offsets)中
通过这个记录,等消费者重启之后就知道消费到哪里了,从断点开始继续消费,就不会出现重复消费的情况
再平衡的影响
- 重复消费。如果某个消费者离开消费者组的时候,还没有提交offset,当再平衡结束后就要重复消费
- 性能变差。再平衡需要暂停去完成
Kafka消费者提交之后就会清理掉数据吗
典型的钓鱼问题
如果消息被消费者消费并提交了对应偏移,这条消息不会删除,可以通过更改该消费者的偏移再次消费,也可以被其他消费者消费