Kafka 中采用了多副本的机制,这是大多数分布式系统中惯用的手法,以此来实现水平扩展、提供容灾能力、提升可用性和可靠性等。

我们对此可以引申出一系列的疑问:

  • Kafka 多副本之间如何进行数据同步,尤其是在发生异常时候的处理机制又是什么?
  • 多副本间的数据一致性如何解决,基于的一致性协议又是什么?
  • 如何确保Kafka 的可靠性?
  • Kafka 中的可靠性和可用性之间的关系又如何?

下面从副本的角度切入来看看Kafka如何保障数据一致性、数据可靠性等问题,主要包括副本剖析、日志同步机制和可靠性分析等内容。

副本剖析

副本(Replica)是分布式系统中常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。在常见的分布式系统中,为了对外提供可用的服务,我们往往会对数据和服务进行副本处理。数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这是解决分布式系统数据丢失问题最有效的手段。另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。

Kafka从0.8版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka通过多副本机制实现故障自动转移,在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。

一下内容会涉及到AR、ISR、HW等基础概念,下面我们先简要回顾一下,详情请参考 <Kafka中的基本概念>

  • 副本是相对于分区而言的,即副本是特定分区的副本。
  • 一个分区中包含一个或多个副本,其中一个为leader副本,其余为follower副本,各个副本位于不同的broker节点中。只有leader副本对外提供服务,follower副本只负责数据同步。
  • 分区中的所有副本统称为 AR,而ISR 是指与leader 副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的一员。
  • LEO标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的LEO,ISR中最小的LEO即为HW,俗称高水位,消费者只能拉取到HW之前的消息。从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息。
  • 从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息。

失效副本

正常情况下,分区的所有副本都处于ISR集合中,但是难免会有异常情况发生,从而某些副本被剥离出ISR集合中。在ISR集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区(under-replicated分区)。

可以通过 kafka-topic.sh 脚本的 under-replicated-partitions 参数来显示主题中包含失效副本的分区。

bin/kafka-topic.sh --zookeeper localhost:2181 --describe --topic topic-partitions -under-replicatied-partitions

失效副本不仅是指处于功能失效状态的副本,处于同步失效状态的副本也可以看作失效副本。怎么判定一个分区是否有副本处于同步失效的状态呢?Kafka从0.9.x版本开始就通过唯一的broker端参数 replica.lag.time.max.ms 来抉择,当ISR集合中的一个follower副本滞后leader副本的时间超过此参数指定的值时则判定为同步失败,需要将此follower副本剔除出ISR集合,replica.lag.time.max.ms 参数的默认值为10000。

具体的实现原理也很容易理解,当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该 follower 副本已经追赶上leader 副本,此时更新该副本的lastCaughtUpTimeMs 标识。Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数replica.lag.time.max.ms 指定的值。

什么情况会导致副本失效?

  • follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。
  • follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。
  • 当通过脚本工具增加了副本因子,新增加的副本因子在赶上leader之前都处于失效状态

ISR的伸缩

Kafka 在启动的时候会开启两个与 ISR 相关的定时任务,名称分别为“isr-expiration”和“isr-change-propagation”。

isr-expiration任务会周期性地检测每个分区是否需要缩减其ISR集合。这个周期和 replica.lag.time.max.ms 参数有关,大小是这个参数值的一半,默认值为5000ms。当检测到ISR集合中有失效副本时,就会收缩ISR集合。如果某个分区的ISR集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/<topic>/partition/<parititon>/state 节点中。节点中的数据示例如下:

{"controller epoch": 26, "leader": 0, "version": 1, "leader epoch": 2, "isr": [0, 1]}
  • controller_epoch 表示当前Kafka控制器的epoch
  • leader表示当前分区的leader副本所在的broker的id编号
  • version表示版本号(当前版本固定为1)
  • leader_epoch表示当前分区的leader纪元
  • isr表示变更后的ISR列表。

当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation任务会周期性(固定值为 2500ms)地检查isrChangeSet,如果发现isrChangeSet中有ISR集合的变更记录,那么它会在ZooKeeper的/isr_change_notification路径下创建一个以 isr_change_开头的持久顺序节点(比如/isr_change_notification/isr_change_0000000000),并将isrChangeSet中的信息保存到这个节点中。

Kafka控制器为/isr_change_notification添加了一个Watcher,当这个节点中有子节点发生变化时会触发Watcher的动作,以此通知控制器更新相关元数据信息并向它管理的broker节点发送更新元数据的请求,最后删除/isr_change_notification路径下已经处理过的节点。频繁地触发Watcher会影响Kafka控制器、ZooKeeper甚至其他broker节点的性能。为了避免这种情况,Kafka添加了限定条件,当检测到分区的ISR集合发生变化时,还需要检查以下两个条件:

  1. 上一次ISR集合发生变化距离现在已经超过5s。
  2. 上一次写入ZooKeeper的时间距离现在已经超过60s。

满足以上两个条件之一才可以将ISR集合的变化写入目标节点。

有缩减对应就会有扩充,那么Kafka又是何时扩充ISR的呢?

随着follower副本不断与leader副本进行消息同步,follower副本的LEO也会逐渐后移,并最终追赶上leader副本,此时该follower副本就有资格进入ISR集合。追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的HW,注意这里并不是和leader副本的LEO相比。ISR扩充之后同样会更新ZooKeeper中的/brokers/topics/<topic>/partition/<parititon>/state节点和isrChangeSet,之后的步骤就和ISR收缩时的相同。

当ISR集合发生增减时,或者ISR集合中任一副本的LEO发生变化时,都可能会影响整个分区的HW。

例如如下,leader副本的LEO为9,follower1副本的LEO为7,而follower2副本的LEO为6,如果判定这3个副本都处于ISR集合中,那么这个分区的HW为6;如果follower3已经被判定为失效副本被剥离出ISR集合,那么此时分区的HW为leader副本和follower1副本中LEO的最小值,即为7。

LEO 和 HW

这两个概念可以参考:

对于副本而言,还有两个概念:本地副本(Local Replica)和远程副本(RemoteReplica),本地副本是指对应的Log分配在当前的broker节点上,远程副本是指对应的Log分配在其他的broker节点上。在Kafka中,同一个分区的信息会存在多个broker节点上,并被其上的副本管理器所管理,这样在逻辑层面每个broker节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

整个消息追加的过程可以概括如下:

  1. 生产者客户端发送消息至leader副本(副本1)中。
  2. 消息被追加到leader副本的本地日志,并且会更新日志的偏移量。
  3. follower副本(副本2和副本3)向leader副本请求同步数据。
  4. leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本的信息。
  5. leader副本所在的服务器将拉取结果返回给follower副本。
  6. follower副本收到leader副本返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

了解了这些内容后,我们再来分析在这个过程中各个副本LEO和HW的变化情况。下面的示例,生产者一直在往leader副本中写入消息。某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0。之后follower副本向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是FetchRequest请求中的fetch_offset。leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,如图8-5所示,这个HW信息对应的是FetchResponse中的high_watermark。

此时两个follower副本各自拉取到了消息,并更新各自的LEO为3和4。与此同时,follower副本还会更新自己的HW,更新HW的算法是比较当前LEO和leader副本中传送过来的HW的值,取较小值作为自己的HW值。当前两个follower副本的HW都等于0(min(0,0)=0)。

接下来follower副本再次请求拉取leader副本中的消息。

此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本。注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。

两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3(min(LEO,3)=3)。

在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。leader 副本收到 follower副本的FetchRequest请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。

Kafka 的根目录下有 cleaner-offset-checkpoint、log-start-offset-checkpoint、recovery-point-offset-checkpoint和replication-offset-checkpoint四个检查点文件

recovery-point-offset-checkpoint 和replication-offset-checkpoint 这两个文件分别对应了 LEO和 HW。Kafka 中会有一个定时任务负责将所有分区的 LEO 刷写到恢复点文件 recovery-point-offset-checkpoint 中,定时周期由 broker 端参数 log.flush.offset.checkpoint.interval.ms来配置,默认值为60000。还有一个定时任务负责将所有分区的HW刷写到复制点文件replication-offset-checkpoint中,定时周期由broker端参数replica.high.watermark.checkpoint.interval.ms来配置,默认值为5000。

log-start-offset-checkpoint文件对应logStartOffset(注意不能缩写为LSO,因为在Kafka中LSO是LastStableOffset的缩写),在FetchRequest和FetchResponse中也有它的身影,它用来标识日志的起始偏移量。各个副本在变动 LEO 和 HW 的过程中,logStartOffset 也有可能随之而动。Kafka 也有一个定时任务来负责将所有分区的 logStartOffset书写到起始点文件log-start-offset-checkpoint中,定时周期由broker端参数log.flush.start.offset.checkpoint.interval.ms来配置,默认值为60000。

Leader Epoch

日志同步机制

在分布式系统中,日志同步机制既要保证数据的一致性,也要保证数据的顺序性。虽然有许多方式可以实现这些功能,但最简单高效的方式还是从集群中选出一个leader来负责处理数据写入的顺序性。只要leader还处于存活状态,那么follower只需按照leader中的写入顺序来进行同步即可。

通常情况下,只要leader不宕机我们就不需要关心follower的同步问题。不过当leader宕机时,我们就要从follower中选举出一个新的leader。follower的同步状态可能落后leader很多,甚至还可能处于宕机状态,所以必须确保选择具有最新日志消息的follower作为新的leader。日志同步机制的一个基本原则就是:如果告知客户端已经成功提交了某条消息,那么即使 leader宕机,也要保证新选举出来的leader中能够包含这条消息。这里就有一个需要权衡(tradeoff)的地方,如果leader在消息被提交前需要等待更多的follower确认,那么在它宕机之后就可以有更多的follower替代它,不过这也会造成性能的下降。

对于这种tradeoff,一种常见的做法是“少数服从多数”,“少数服从多数”的方式有一个很大的优势,系统的延迟取决于最快的几个节点,比如副本数为3,那么延迟就取决于最快的那个follower而不是最慢的那个(除了leader,只需要另一个follower确认即可)。不过它也有一些劣势,为了保证leader选举的正常进行,它所能容忍的失败follower数比较少,如果要容忍1个follower失败,那么至少要有3个副本,如果要容忍2个follower失败,必须要有5个副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这也就是“少数服从多数”的这种Quorum模型常被用作共享集群配置(比如ZooKeeper),而很少用于主流的数据存储中的原因。

与“少数服从多数”相关的一致性协议有很多,比如Zab、Raft和ViewstampedReplication等。而Kafka使用的更像是微软的PacificA算法。在Kafka中动态维护着一个ISR集合,处于ISR集合内的节点保持与leader相同的高水位(HW),只有位列其中的副本(unclean.leader.election.enable配置为false)才有资格被选为新的 leader。写入消息时只有等到所有 ISR 集合中的副本都确认收到之后才能被认为已经提交。位于 ISR 中的任何副本节点都有资格成为leader,选举过程简单、开销低,这也是Kafka选用此模型的重要因素。Kafka中包含大量的分区,leader副本的均衡保障了整体负载的均衡,所以这一因素也极大地影响Kafka的性能指标。

在采用ISR模型和(f+1)个副本数的配置下,一个Kafka分区能够容忍最大f个节点失败,相比于“少数服从多数”的方式所需的节点数大幅减少。实际上,为了能够容忍f个节点失败,“少数服从多数”的方式和ISR的方式都需要相同数量副本的确认信息才能提交消息。比如,为了容忍1个节点失败,“少数服从多数”需要3个副本和1个follower的确认信息,采用ISR的方式需要2个副本和1个follower的确认信息。在需要相同确认信息数的情况下,采用ISR的方式所需要的副本总数变少,复制带来的集群开销也就更低,“少数服从多数”的优势在于它可以绕开最慢副本的确认信息,降低提交的延迟,而对Kafka而言,这种能力可以交由客户端自己去选择。

总结

关于:怎样可以确保Kafka 完全可靠?如果这样做就可以确保消息不丢失了吗? 这类问题,就可靠性本身而言,它并不是一个可以用简单的“是”或“否”来衡量的一个指标,而一般是采用几个9来衡量的。任何东西不可能做到完全的可靠,即使能应付单机故障,也难以应付集群、数据中心等集体故障,即使躲得过天灾也未必躲得过人祸。就可靠性而言,我们可以基于一定的假设前提来做分析。本问要讲述的是:在只考虑Kafka本身使用方式的前提下如何最大程度地提高可靠性。

kafka对可靠性的保障体现在多个方面,消息发送阶段、消息存储阶段以及消费消息阶段均有涉及。

消息发送阶段

消息发送的3种模式,即发后即忘、同步和异步。对于发后即忘的模式,不管消息有没有被成功写入,生产者都不会收到通知,那么即使消息写入失败也无从得知,因此发后即忘的模式不适合高可靠性要求的场景。如果要提升可靠性,那么生产者可以采用同步或异步的模式,在出现异常情况时可以及时获得通知,以便可以做相应的补救措施,比如选择重试发送(可能会引起消息重复)。

有些发送异常属于可重试异常,比如 NetworkException,这个可能是由瞬时的网络故障而导致的,一般通过重试就可以解决。对于这类异常,如果直接抛给客户端的使用方也未免过于兴师动众,客户端内部本身提供了重试机制来应对这种类型的异常,通过 retries 参数即可配置。默认情况下,retries参数设置为0,即不进行重试,对于高可靠性要求的场景,需要将这个值设置为大于 0 的值,与 retries 参数相关的还有一个retry.backoff.ms参数,它用来设定两次重试之间的时间间隔,以此避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。如果不知道 retries 参数应该配置为多少,则可以参考 KafkaAdminClient,在 KafkaAdminClient 中retries参数的默认值为5。

如果配置的retries参数值大于0,则可能引起一些负面的影响。由于默认的max.in.flight.requests.per.connection参数值为5,这样可能会影响消息的顺序性,对此要么放弃客户端内部的重试功能,要么将max.in.flight.requests.per.connection参数设置为1,这样也就放弃了吞吐。其次,有些应用对于时延的要求很高,很多时候都是需要快速失败的,设置retries>0会增加客户端对于异常的反馈时延,如此可能会对应用造成不良的影响。

生产者客户端参数 acks 也是用来支撑可靠性的。该参数有三个可选项:0、1、-1(客户端还可以配置为all,它的含义与-1一样,以下只以-1来进行陈述)

  • 对于acks=1的配置,生产者将消息发送到leader副本,leader副本在成功写入本地日志之后会告知生产者已经成功提交。(如果此时ISR集合的follower副本还没来得及拉取到leader中新写入的消息,leader就宕机了,那么此次发送的消息就会丢失。)
  • 对于ack=-1的配置,生产者将消息发送到leader副本,leader副本在成功写入本地日志之后还要等待 ISR 中的 follower 副本全部同步完成才能够告知生产者已经成功提交,即使此时leader副本宕机,消息也不会丢失

在acks=-1的情形中,它要求ISR中所有的副本都收到相关的消息之后才能够告知生产者已经成功提交。试想一下这样的情形,leader 副本的消息流入速度很快,而follower副本的同步速度很慢,在某个临界点时所有的follower副本都被剔除出了ISR集合,那么ISR中只有一个leader副本,最终acks=-1演变为acks=1的情形,如此也就加大了消息丢失的风险。

Kafka也考虑到了这种情况,并为此提供了min.insync.replicas参数(默认值为1)来作为辅助(配合acks=-1来使用),这个参数指定了ISR集合中最小的副本数,如果不满足条件就会抛出NotEnoughReplicasException或NotEnoughReplicasAfterAppendException。

在正常的配置下,需要满足副本数 > min.insync.replicas参数的值。一个典型的配置方案为:副本数配置为 3,min.insync.replicas 参数值配置为 2。注意min.insync.replicas参数在提升可靠性的时候会从侧面影响可用性。(试想如果ISR中只有一个leader副本,那么最起码还可以使用,而此时如果配置min.insync.replicas>1,则会使消息无法写入。)

与可靠性和ISR集合有关的还有一个参数—unclean.leader.election.enable。这个参数的默认值为false,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的 leader,这样有可能造成数据的丢失。如果这个参数设置为false,那么也会影响可用性,非ISR集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。从0.11.0.0 版本开始,unclean.leader.election.enable 的默认值由原来的 true 改为了false,可以看出Kafka的设计者愈发地偏向于可靠性的提升。

消息存储阶段

存储消息阶段需要在消息刷盘之后再给生产者响应,假设消息写入缓存中就返回响应,那么机器突然断电这消息就没了,而生产者以为已经发送成功了。

如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。这样基本上就能保证存储的可靠了。一台挂了还有一台还在呢。

那假如来个地震机房机子都挂了呢?emmmmmm…大公司基本上都有异地多活。

就Kafka而言,越多的副本数越能够保证数据的可靠性,副本数可以在创建主题时配置,也可以在后期修改,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。一般而言,设置副本数为3即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka 时就会设置副本数为 5。

在broker端还有两个参数log.flush.interval.messages 和 log.flush.interval.ms,用来调整同步刷盘的策略,默认是不做控制而交由操作系统本身来进行处理。同步刷盘是增强一个组件可靠性的有效方式,不过这种方式极其损耗性能,最好还是采用多副本的机制来保障。

消息消费阶段

消费者需要真正执行完业务逻辑之后,再发送给Broker消费成功,这才是真正的消费了。

所以只要我们在消息业务逻辑处理完成之后再给Broker响应,那么消费阶段消息就不会丢失。

在kafka中,消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset(enable.auto.commit 参数的默认值为 true)。虽然这种方式非常简便,但它会带来重复消费和消息丢失的问题,对于高可靠性要求的应用来说显然不可取,所以需要将 enable.auto.commit 参数设置为 false 来执行手动位移提交。在执行手动位移提交的时候也要遵循一个原则:如果消息没有被成功消费,那么就不能提交所对应的消费位移。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。

对于消费端,Kafka 还提供了一个可以兜底的功能,即回溯消费,通过这个功能可以让我们能够有机会对漏掉的消息相应地进行回补,进而可以进一步提高可靠性。

评论