💧 Posted on 

Kafka 幂等实现剖析

什么是幂等

幂等 这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。

下面通过几个简单的例子说明一下。

比如在乘法运算中,让数字乘以 1 就是一个幂等操作,因为不管你执行多少次这样的运算,结果都是相同的。再比如,取整函数(floor 和 ceiling)是幂等函数,那么运行 1 次 floor(3.4) 和 100 次 floor(3.4),结果是一样的,都是 3。相反地,让一个数加 1 这个操作就不是幂等的,因为执行一次和执行多次的结果必然不同。

在计算机领域中,幂等性的含义稍微有一些不同:

  • 在命令式编程语言(比如 C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。
  • 在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的 side effect。

幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。如果是非幂等性操作,我们还需要担心某些操作执行多次对状态的影响,但对于幂等性操作而言,我们根本无需担心此事。

Producer 幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,但是 Kafka 所提供的幂等性是有条件的:

  1. kafka 中的幂等性只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  2. kafka 中的幂等性不能跨多个 TopicPartition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

Producer 幂等性使用

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put("enable.idempotence", ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer producer = new KafkaProducer(props);

producer.send(new ProducerRecord(topic, "test");

Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对上层应用做了很好的封装,应用层并不需要去关心具体的实现细节,对用户非常友好。

幂等性要解决的问题

一般来说,消息可靠性交付保障,提供三种级别:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

kafka 默认提供的就是第二种,即至少一次。

在 kafka 中,消息已提交的含义,通常是Broker 成功接收到消息,并且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。

Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。

对于大多数应用而言,数据保证不丢是可以满足其需求的,但是对于一些其他的应用场景(比如支付数据等),它们是要求精确计数的,这时候如果上游数据有重复,下游应用只能在消费数据时进行相应的去重操作,应用在去重时,最常用的手段就是根据唯一 id 键做 check 去重。

在这种场景下,因为上游生产导致的数据重复问题,会导致所有有精确计数需求的下游应用都需要做这种复杂的、重复的去重处理。试想一下:如果在发送时,系统就能保证 exactly once,这对下游将是多么大的解脱。这就是幂等性要解决的问题,主要是解决数据重复的问题,正如前面所述,数据重复问题,通用的解决方案就是加唯一 id,然后根据 id 判断数据是否重复,Producer 的幂等性也是这样实现的,这一小节就让我们看下 Kafka 的 Producer 如何保证数据的 exactly once 的。

Producer 幂等性实现原理

正如前面所述,幂等性要解决的问题是:Producer 设置 at least once 时,由于异常触发重试机制导致数据重复,幂等性的目的就是为了解决这个数据重复的问题,简单来说就是:

at least once + 幂等 = exactly once

kafka Producer 在实现时有两个重要机制:

  • PID(Producer ID),用来标识每个 producer client;
  • sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复。

PID

每个 Producer 在初始化时都会被分配一个唯一的 PID,这个 PID 对应用是透明的,完全没有暴露给用户。对于一个给定的 PID,sequence number 将会从0开始自增,每个 Topic-Partition 都会有一个独立的 sequence number。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number,Server 也就是通过这个来验证数据是否重复。

这里的 PID 是全局唯一的,Producer 故障后重新启动后会被分配一个新的 PID,这也是幂等性无法做到跨会话的一个原因。

PID 申请

下面我们看下 ProducerId 是如何获取的。

KafkaProducer 中的 Sender 线程在执行发送逻辑之前,会先判断判断是否需要一个新的 ProducerID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void runOnce() {
// 如果开启了幂等或事务,需要多一些检查
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();

// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}

// 判断是否需要一个新的 ProducerId
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}

long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}

synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
if (!isTransactional()) {
if (epochBumpRequired) {
bumpIdempotentProducerEpoch();
}
// 当前不处于初始化状态,并且没有 ProducerId
if (currentState != State.INITIALIZING && !hasProducerId()) {
transitionTo(State.INITIALIZING);
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(Integer.MAX_VALUE);
// 构建初始化ProducerId请求,放入请求队列中
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false);
enqueueRequest(handler);
}
}
}

之后请求会被发送到服务端(Broker), 服务端处理该请求的入口是 KafkaApis 中的 handleInitProducerIdRequest()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def handleInitProducerIdRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.data.transactionalId

// 权限校验
if (transactionalId != null) {
if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
} else if (!authHelper.authorize(request.context, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME, true, false)
&& !authHelper.authorizeByResourceType(request.context, AclOperation.WRITE, ResourceType.TOPIC)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}

// 此处省略部分代码
def sendResponseCallback(result: InitProducerIdResult): Unit = {...}

val producerIdAndEpoch = (initProducerIdRequest.data.producerId, initProducerIdRequest.data.producerEpoch) match { // 初始化的是否都是 -1(具体可以看 InitProducerIdRequest 的构造方法),所以进入第一个 case
case (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH) => Right(None)
case (RecordBatch.NO_PRODUCER_ID, _) | (_, RecordBatch.NO_PRODUCER_EPOCH) => Left(Errors.INVALID_REQUEST)
case (_, _) => Right(Some(new ProducerIdAndEpoch(initProducerIdRequest.data.producerId, initProducerIdRequest.data.producerEpoch)))
}

producerIdAndEpoch match {
// 初始化 ProducerId
case Right(producerIdAndEpoch) => txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs,
producerIdAndEpoch, sendResponseCallback, requestLocal)
case Left(error) => requestHelper.sendErrorResponseMaybeThrottle(request, error.exception)
}
}

def handleInitProducerId(transactionalId: String,
transactionTimeoutMs: Int,
expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch],
responseCallback: InitProducerIdCallback,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {

if (transactionalId == null) {
// 最终可以发现,producerId 是由 producerIdManager 来管理的。
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
} else if (transactionalId.isEmpty) {
...

看代码可以发现 ProducerIdManager 是一个接口,它有两个实现类

  • ZkProducerIdManager
  • RPCProducerIdManager

ZkProducerIdManager 是通过 zk 来管理 producerId。

PID 端申请是向 ZooKeeper 申请,zk 中有一个 latest_producer_id_block 节点,每个 Broker 向 zk 申请一个 PID 段(默认情况下,每次申请 1000 个 PID)后,都会把自己申请的 PID 段信息写入到这个节点,这样当其他 Broker 再申请 PID 段时,会首先读写这个节点的信息,然后根据 block_end 选择一个 PID 段,最后再把信息写会到 zk 的这个节点,这个节点信息格式如下所示:

1
{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}

ProducerIdManager 申请 PID 段的流程如下:

  1. 先从 zk 的 latest_producer_id_block 节点读取最新已经分配的 PID 段信息;
  2. 如果该节点不存在,直接从 0 开始分配,选择 0~1000 的 PID 段(ProducerIdManager 的 PidBlockSize 默认为 1000,即是每次申请的 PID 段大小);
  3. 如果该节点存在,读取其中数据,根据 block_end 选择 这个 PID 段(如果 PID 段超过 Long 类型的最大值,这里会直接返回一个异常);
  4. 在选择了相应的 PID 段后,将这个 PID 段信息写回到 zk 的这个节点中,如果写入成功,那么 PID 段就证明申请成功,如果写入失败(写入时会判断当前节点的 zkVersion 是否与步骤1获取的 zkVersion 相同,如果相同,那么可以成功写入,否则写入就会失败,证明这个节点被修改过),证明此时可能其他的 Broker 已经更新了这个节点(当前的 PID 段可能已经被其他 Broker 申请),那么从步骤 1 重新开始,直到写入成功。

RPCProducerIdManager 是最新版本新实现的一个功能,新版本的kafka 移除zookeeper之后,producerId 将在控制器上分配。

Sequence Numbers

有了PID之后,在 PID+Partition 级别上再加上 sequence numbers 信息,就可以实现Producer的幂等性了。

ProducerBatch也提供了setProducerState() 方法(具体执行时机是在 RecordAccumulator 中的 drain 方法中),它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断。

发送流程

客户端发送逻辑

当开通幂等功能之后,producer 的发送流程如下:

  1. 客户端通过 KafkaProducer 的 send() 方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的;
  2. Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 的 maybeResolveSequences() 方法判断当前的 PID 是否需要重置,重置的原因是因为:如果有 topic-partition 的 batch 重试多次失败最后因为超时而被移除,这时 sequence number 将无法做到连续,因为 sequence number 有部分已经分配出去,这时系统依赖自身的机制无法继续进行下去(因为幂等性是要保证不丢不重的),相当于程序遇到了一个 fatal 异常,PID 会进行重置,TransactionManager 相关的缓存信息被清空(Producer 不会重启),只是保存状态信息的 TransactionManager 做了 clear+new 操作,遇到这个问题时是无法保证 exactly once 的(有数据已经发送失败了,并且超过了重试次数);
  3. Sender 线程通过 bumpIdempotentEpochAndResetIdIfNeeded() 方法判断是否需要申请 PID,如果需要的话,会想服务端发送 InitProducerIdRequest
  4. Sender 线程通过 sendProducerData() 方法发送数据,整体流程与之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 drain() 方法中,在加了幂等性之后,drain() 方法多了如下几步判断:
    • 常规的判断:判断这个 topic-partition 是否可以继续发送(如果出现前面2中的情况是不允许发送的)、判断 PID 是否有效、如果这个 batch 是重试的 batch,那么需要判断这个 batch 之前是否还有 batch 没有发送完成,如果有,这里会先跳过这个 Topic-Partition 的发送,直到前面的 batch 发送完成,最坏情况下,这个 Topic-Partition 的 in-flight request 将会减少到1(这个涉及也是考虑到 server 端的一个设置,文章下面会详细分析);
    • 如果这个 ProducerBatch 还没有这个相应的 PID 和 sequence number 信息,会在这里进行相应的设置;
  5. 最后 Sender 线程再调用 sendProduceRequests() 方法发送 ProduceRequest 请求,后面的就跟之前正常的流程保持一致了。

服务端处理逻辑

当 Broker 收到 ProduceRequest 请求之后,会通过 KafkaApis.handleProduceRequest() 做相应的处理,其处理流程如下(这里只讲述关于幂等性相关的内容):

  1. 先进行权限校验(这里还不是太理解校验权限的目的)
    • 如果请求是事务请求,检查是否对 TXN.id 有 Write 权限,没有的话返回 TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
    • 如果请求设置了幂等性,检查是否对 ClusterResource 有 IdempotentWrite 权限,没有的话返回 CLUSTER_AUTHORIZATION_FAILED;
    • 验证对 topic 是否有 Write 权限以及 Topic 是否存在,否则返回 TOPIC_AUTHORIZATION_FAILED 或 UNKNOWN_TOPIC_OR_PARTITION 异常;
  2. 检查是否有 PID 信息,没有的话走正常的写入流程;
  3. UnifiedLog 对象会在 analyzeAndValidateProducerState() 方法先根据 batch 的 sequence number 信息检查这个 batch 是否重复(server 端会缓存 PID 对应这个 Topic-Partition 的最近5个 batch 信息),如果有重复,这里当做写入成功返回(不更新 LOG 对象中相应的状态信息,比如这个 replica 的 the end offset 等);
  4. 有了 PID 信息,并且不是重复 batch 时,在更新 producer 信息时,会做以下校验:
    • 检查该 PID 是否已经缓存中存在
    • 如果不存在,那么判断 sequence number 是否 从0 开始,是的话,在缓存中记录 PID 的 meta(PID,epoch, sequence number),并执行写入操作,否则返回 UnknownProducerIdException(PID 在 server 端已经过期或者这个 PID 写的数据都已经过期了,但是 Client 还在接着上次的 sequence number 发送数据);
    • 如果该 PID 存在,先检查 PID epoch 与 server 端记录的是否相同;
    • 如果不同并且 sequence number 不从 0 开始,那么返回 OutOfOrderSequenceException 异常;
    • 如果不同并且 sequence number 从 0 开始,那么正常写入;
    • 如果相同,那么根据缓存中记录的最近一次 sequence number(currentLastSeq)检查是否为连续(会区分为 0、Int.MaxValue 等情况),不连续的情况下返回 OutOfOrderSequenceException 异常。
  5. 下面与正常写入相同。

幂等性时,Broker 在处理 ProduceRequest 请求时,多了一些校验操作,这里重点看一下其中一些重要实现,先看下 analyzeAndValidateProducerState() 方法的实现,如下所示:

analyzeAndValidateProducerState() 到达路径:

  • KafkaApis.handleProduceRequest()
    • ReplicaManager.appendRecords() -> appendToLocalLog() -> appendRecordsToLeader()
      • UnifiedLog.appendAsLeader() -> append() -> analyzeAndValidateProducerState
analyzeAndValidateProducerState()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
origin: AppendOrigin):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
var relativePositionInSegment = appendOffsetMetadata.relativePositionInSegment

records.batches.forEach { batch =>
if (batch.hasProducerId) {
// if this is a client produce request, there will be up to 5 batches which could have been duplicated.
// If we find a duplicate, we return the metadata of the appended batch to the client.
if (origin == AppendOrigin.Client) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)

maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
}

// We cache offset metadata for the start of each transaction. This allows us to
// compute the last stable offset without relying on additional index lookups.
val firstOffsetMetadata = if (batch.isTransactional)
Some(LogOffsetMetadata(batch.baseOffset, appendOffsetMetadata.segmentBaseOffset, relativePositionInSegment))
else
None

val maybeCompletedTxn = updateProducers(producerStateManager, batch, updatedProducers, firstOffsetMetadata, origin)
maybeCompletedTxn.foreach(completedTxns += _)
}

relativePositionInSegment += batch.sizeInBytes
}
(updatedProducers, completedTxns.toList, None)
}

如果这个 batch 有 PID 信息,会首先检查这个 batch 是否为重复的 batch 数据,其实现如下,batchMetadata 会缓存最新 5个 batch 的数据(如果超过5个,添加时会进行删除,这个也是幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5 的原因,与这个值的设置有关),根据 batchMetadata 缓存的 batch 数据来判断这个 batch 是否为重复的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
if (batch.producerEpoch != producerEpoch)
None
else
batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
}

// Return the batch metadata of the cached batch having the exact sequence range, if any.
def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
val duplicate = batchMetadata.filter { metadata =>
firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
}
duplicate.headOption
}

private def addBatchMetadata(batch: BatchMetadata): Unit = {
if (batchMetadata.size == ProducerStateEntry.NumBatchesToRetain)
batchMetadata.dequeue() //note: 只会保留最近 5 个 batch 的记录
batchMetadata.enqueue(batch) //note: 添加到 batchMetadata 中记录,便于后续根据 seq id 判断是否重复
}

如果 batch 不是重复的数据,analyzeAndValidateProducerState() 会通过 updateProducers() 更新 producer 的相应记录,在更新的过程中,会做一步校验,校验方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//note: 检查 seq number
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {
if (producerEpoch != updatedEntry.producerEpoch) { //note: epoch 不同时
if (appendFirstSeq != 0) { //note: 此时要求 seq number 必须从0开始(如果不是的话,pid 可能是新建的或者 PID 在 Server 端已经过期)
//note: pid 已经过期(updatedEntry.producerEpoch 不是-1,证明时原来的 pid 过期了)
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
s"(request epoch), $appendFirstSeq (seq. number)")
} else { //note: pid 已经过期(updatedEntry.producerEpoch 为-1,证明 server 端 meta 新建的,PID 在 server 端已经过期,client 还在接着上次的 seq 发数据)
throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
s"that the last message with t()he producerId=$producerId has been removed due to hitting the retention limit.")
}
}
} else {
val currentLastSeq = if (!updatedEntry.isEmpty)
updatedEntry.lastSeq
else if (producerEpoch == currentEntry.producerEpoch)
currentEntry.lastSeq
else
RecordBatch.NO_SEQUENCE

if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) {
//note: 此时期望的 seq number 是从 0 开始,因为 currentLastSeq 是 -1,也就意味着这个 pid 还没有写入过数据
// the epoch was bumped by a control record, so we expect the sequence number to be reset
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $appendFirstSeq " +
s"(incoming seq. number), but expected 0")
} else if (!inSequence(currentLastSeq, appendFirstSeq)) {
//note: 判断是否连续
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " +
s"(incoming seq. number), $currentLastSeq (current end sequence number)")
}
}
}

思考题

  1. Producer 在设置幂等性时,为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5,如果设置大于 5(不考虑 Producer 端参数校验的报错),会带来什么后果?
  2. Producer 在设置幂等性时,如果我们设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 大于 1,那么是否可以保证有序,如果可以,是怎么做到的?

为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5

之所以要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:

Server 端的 ProducerStateManager 实例会缓存每个 PID 在 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档,忘记在哪了),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力

当 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 配置大于1时,是否保证有序

先来分析一下,在什么情况下 Producer 会出现乱序的问题?

没有幂等性时,乱序的问题是在重试时出现的,举个例子:client 依然发送了 6 个请求 1、2、3、4、5、6(它们分别对应了一个 batch),这 6 个请求只有 2-6 成功 ack 了,1 失败了,这时候需要重试,重试时就会把 batch 1 的数据添加到待发送的数据列队中),那么下次再发送时,batch 1 的数据将会被发送,这时候数据就已经出现了乱序,因为 batch 1 的数据已经晚于了 batch 2-6。

当 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 时,是可以解决这个问题的,因为同时只允许一个请求正在发送,只有当前的请求发送完成(成功 ack 后),才能继续下一条请求的发送,类似单线程处理这种模式,每次请求发送时都会等待上次的完成,效率非常差,但是可以解决乱序的问题(当然这里有序只是针对单 client 情况,多 client 并发写是无法做到的)。

系统能提供的方案,基本上就是有序性与性能之间二选一,无法做到兼容,实际上系统出现请求重试的几率是很小的(一般都是网络问题触发的),可能连 0.1% 的时间都不到,但是就是为了这 0.1% 时间都不到的情况,应用需要牺牲性能问题来解决,在大数据场景下,我们是希望有更友好的方式来解决这个问题。简单来说,就是当出现重试时,max-in-flight-request 可以动态减少到 1,在正常情况下还是按 5 (5是举例说明)来处理,这有点类似于分布式系统 CAP 理论中关于 P 的考虑,当出现问题时,可以容忍性能变差,但是其他的情况下,我们希望的是能拥有原来的性能,而不是一刀切。令人高兴的,在 Kafka 2.0.0 版本中,如果 Producer 开始了幂等性,Kafka 是可以做到这一点的,如果不开启幂等性,是无法做到的,因为它的实现是依赖了 sequence number。

当请求出现重试时,batch 会重新添加到队列中,这时候是根据 sequence number 添加到队列的合适位置(有些 batch 如果还没有 sequence number,那么就保持其相对位置不变),也就是队列中排在这个 batch 前面的 batch,其 sequence number 都比这个 batch 的 sequence number 小,其实现如下,这个方法保证了在重试时,其 batch 会被放到合适的位置:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Re-enqueue the given record batch in the accumulator to retry
*/
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now); //note: 重试,更新相应的 meta
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch); //note: 将 batch 添加到队列的合适位置(根据 seq num 信息)
else
deque.addFirst(batch);
}
}

另外 Sender 在发送请求时,会首先通过 RecordAccumulator 的 drain() 方法获取其发送的数据,在遍历 Topic-Partition 对应的 queue 中的 batch 时,如果发现 batch 已经有了 sequence number 的话,则证明这个 batch 是重试的 batch,因为没有重试的 batch 其 sequence number 还没有设置,这时候会做一个判断,会等待其 in-flight-requests 中请求发送完成,才允许再次发送这个 Topic-Partition 的数据,其判断实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
ProducerIdAndEpoch producerIdAndEpoch = null;
if (transactionManager != null) {
if (!transactionManager.isSendToPartitionAllowed(tp))
return true;

producerIdAndEpoch = transactionManager.producerIdAndEpoch();
if (!producerIdAndEpoch.isValid())
// we cannot send the batch until we have refreshed the producer id
return true;

if (!first.hasSequence()) {
if (transactionManager.hasInflightBatches(tp) && transactionManager.hasStaleProducerIdAndEpoch(tp)) {
// Don't drain any new batches while the partition has in-flight batches with a different epoch
// and/or producer ID. Otherwise, a batch with a new epoch and sequence number
// 0 could be written before earlier batches complete, which would cause out of sequence errors
return true;
}

if (transactionManager.hasUnresolvedSequence(first.topicPartition))
// Don't drain any new batches while the state of previous sequence numbers
// is unknown. The previous batches would be unknown if they were aborted
// on the client after being sent to the broker at least once.
return true;
}

// 获取 inFlightBatches 中第一个 batch 的 baseSequence, inFlightBatches 为 null 的话返回 RecordBatch.NO_SEQUENCE
int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
&& first.baseSequence() != firstInFlightSequence)
//重试操作(seq number 不为0),如果这个 batch 的 baseSequence 与 in-flight
//queue 中第一个 request batch 的 baseSequence不同的话(证明它前面还有请求未成功),
//会等待下次循环再判断, 最坏的情况下会导致 in-flight request 为1(只影响这个 partition)
//这种情况下,继续发送这个是没有意义的,因为幂等性时保证顺序的,只有前面的都成功,后面的再发送才有意义
//这里是 break,相当于在这次发送中直接跳过了这个 topic-partition 的发送
// If the queued batch already has an assigned sequence, then it is being retried.
// In this case, we wait until the next immediate batch is ready and drain that.
// We only move on when the next in line batch is complete (either successfully or due to
// a fatal broker error). This effectively reduces our in flight request count to 1.
return true;
}
return false;
}

仅有 client 端这两个机制还不够,Server 端在处理 ProduceRequest 请求时,还会检查 batch 的 sequence number 值,它会要求这个值必须是连续的,如果不连续都会返回异常,Client 会进行相应的重试,举个栗子:假设 Client 发送的请求顺序是 1、2、3、4、5(分别对应了一个 batch),如果中间的请求 2 出现了异常,那么会导致 3、4、5 都返回异常进行重试(因为 sequence number 不连续),也就是说此时 2、3、4、5 都会进行重试操作添加到对应的 queue 中。

Producer 的 TransactionManager 实例的 TopicPartitionEntry.inflightBatchesBySequence 成员变量会维护这个 Topic-Partition 与目前正在发送的 batch 的对应关系(通过 addInFlightBatch() 方法添加 batch 记录),只有这个 batch 成功 ack 后,才会通过 removeInFlightBatch() 方法将这个 batch 从 inflightBatchesBySequence 中移除。

接着前面的例子,此时 inflightBatchesBySequence 中还有 2、3、4、5 这几个 batch(有顺序的,2 在前面),根据前面的 RecordAccumulator 的 drain() 方法可以知道只有这个 Topic-Partition 下次要发送的 batch 是 batch 2(跟 transactionManager 的这个 firstInFlightSequence() 方法获取 inFlightBatches 中第一个 batch 的 baseSequence 来判断) 时,才可以发送,否则会直接 break,跳过这个 Topic-Partition 的数据发送。这里相当于有一个等待,等待 batch 2 重新加入到 queue 中,才可以发送,不能跳过 batch 2,直接重试 batch 3、4、5,这是不允许的。

简单来说,其实现机制概括为:

  1. Server 端验证 batch 的 sequence number 值,不连续时,直接返回异常;
  2. Client 端请求重试时,batch 在 reenqueue 时会根据 sequence number 值放到合适的位置(有序保证之一);
  3. Sender 线程发送时,在遍历 queue 中的 batch 时,会检查这个 batch 是否是重试的 batch,如果是的话,只有这个 batch 是最旧的那个需要重试的 batch,才允许发送,否则本次发送跳过这个 Topic-Partition 数据的发送等待下次发送。