💧 Posted on 

Kafka Producer 源码篇

Kafka 系列文章列表
Kafka 版本:2021-11 trunk 分支(当前最新版本3.0.0

在 Kafka 中,客户端由 Java 语言实现,服务端由 Scala 语言来实现的,在使用 Kafka 时,客户端是用户最先接触到部分,因此,kafka系列文章的源码分析也会从客户端端开始,今天讲的是 Producer 端发送模型的实现。

在看这篇源码分析之前,不了解发送端原理的同学,可以先看下上篇博文Kafka Producer 原理篇

Producer 的使用

在分析 Producer 发送模型之前,先看一下用户是如何使用 Producer 向 Kafka 写数据的,下面是一个关于 Producer 最简单的应用示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

public class ProducerDemo {
private static String topicName;
private static int msgNum;
private static int key;

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

topicName = "test";
msgNum = 10; // 发送的消息数

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < msgNum; i++) {
String msg = i + " Kafka Producer Demo.";
producer.send(new ProducerRecord<String, String>(topicName, msg));
}
producer.close();
}
}

从上面的代码可以看出 Kafka 为用户提供了非常简单的 API,在使用时,只需要如下两步:

  • 初始化 KafkaProducer 实例;
  • 调用 send 接口发送数据。

本文主要是围绕着 Producer 在内部是如何实现 send 接口而展开的。

发送流程

下面通过对 send 源码分析来一步步剖析 Producer 数据的发送流程。

send()

用户是直接使用 producer.send() 发送的数据,先看一下 send() 接口的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
// 异步发送一条记录到 topic,等同于 send(record, null)
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

// 异步发送一条记录到主题,并在确认发送后调用提供的回调。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 在发送消息之前,先经过拦截器处理(),这个方法不会抛出异常
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

数据发送的最终实现还是调用了 Producer 的 doSend() 接口。

doSend()

doSend() 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// 异步发送记录到主题
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 1. 检查 producer 是否被关闭
throwIfProducerClosed();
// 2. 确认数据要发送到的 topic 的 metadata 是可用的
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;

// 3. 序列化 record 的 key 和 value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}

// 4. 获取该 record 的 partition 的值(可以指定,也可以根据算法计算)
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}

// 5. 向 accumulator 中追加数据
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

// 6. 如果最近一个 ProducerBatch 满了,重新创建一个 ProducerBatch 进行追加
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 7. 如果 batch 已经满了,唤醒 sender 线程发送数据
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

在 dosend() 方法的实现上,一条 Record 数据的发送,可以分为以下几步:

  • 确认数据要发送到的 topic 的 metadata 是可用的(如果该 partition 的 leader 存在则是可用的),如果没有 topic 的 metadata 信息,就需要获取相应的 metadata
  • 序列化 record 的 key 和 value;
  • 确定 record 要发送到的 partition(可以指定,也可以根据算法计算);
  • 向 accumulator 中追加 record 数据,数据会先进行缓存;
  • 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。

下面会对这几部分的具体实现进行详细分析。

发送流程详解

获取 topic 的 metadata 信息

在数据发送前,需要先确定 topic 是可用的。这部分具体实现在 KafkaProducer 中的 waitOnMetadata()

序列化 key 和 value

Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化。

kafka 提供了一部分通用的序列化实现(所有默认提供的序列化实现均在 org.apache.kafka.common.serialization),当然我们也是可以自定义序列化的具体实现。

确定分区

关于 partition 值的计算,分为三种情况:

  • 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  • 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

具体实现如下:

1
2
3
4
5
6
7
8
// 为给定的记录计算分区。如果记录有分区直接返回,否则调用配置的分区算法来计算分区。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

Producer 默认使用的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,DefaultPartitioner 默认采用的是 sticky partitioning (黏性分区分配)。

1
2
3
4
5
6
7
8
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

关于 sticky partitioning 的具体实现,可以查看专题文章。

用户也可以自定义 partition 的策略。

向 accumulator 追加数据

Producer 会先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender 线程去发送 RecordBatch,这里先详细分析一下 Producer 是如何向 buffer 中写入数据的。

Producer 是通过 RecordAccumulator 实例追加数据,其中一个比较重要的变量就是 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches ,每个 TopicPartition 都会对应一个 Deque<RecordBatch> ,当添加数据时,会向其 TopicPartition 对应的 queue 中尾部最后一个 RecordBatch 中添加 record,而发送数据时,则会先从 queue 头部的 RecordBatch 开始发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// org.apache.kafka.clients.producer.internals.RecordAccumulator
// 向累加器中添加一条记录,返回追加结果
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 每个 topicPartition 对应一个 queue
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 在对一个 queue 进行操作时,会保证线程安全
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 向队列中最后一个 batch 中追加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}

// 没有一个正在进行的记录批次,尝试分配一个新的批次
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}

// 为 topic-partition 创建一个新的 RecordBatch, 需要初始化相应的 RecordBatch,要为其分配的大小是: max(batch.size, 加上头文件的本条消息的大小)
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
// 为新的 RecordBatch 分配 buffer
buffer = free.allocate(size, maxTimeToBlock);

// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}

// 给 topic-partition 创建一个 RecordBatch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
// 向新的 RecordBatch 中追加数据
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));

// 将 RecordBatch 添加到对应的 queue 中
dq.addLast(batch);
// 向未 ack 的 batch 集合添加这个 batch
incomplete.add(batch);

// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
// 如果 dp.size()>1 就证明这个 queue 有一个 batch 是可以发送了
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
  • 获取该 TopicPartition 对应的 queue,没有的话会创建一个空的 queue;
  • 向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则新创建一个,成功写入的话直接返回结果,写入成功;
  • 创建一个新的 RecordBatch,初始化内存大小根据 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)) 来确定(防止单条 record 过大的情况);
  • 向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功。

发送 RecordBatch

当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒 sender 线程,发送 RecordBatch。

sender 线程对 RecordBatch 的处理是在 sendProducerData() 方法中进行的,该方法具体实现如下:

sendProducerData() 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// 获取那些已经可以发送的 RecordBatch 对应的 nodes
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 如果有 topic-partition 的 leader 是未知的,就强制 metadata 更新
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

// 如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// 返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是 node.id),并将 RecordBatch 从对应的 queue 中移除
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

// 移除超时的 RecordBatch
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// 这一块暂时还没太搞懂,不过不影响理解发送的流程
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
sensors.updateProduceRequestMetrics(batches);

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}

这段代码前面有很多是其他的逻辑处理,如:移除暂时不可用的 node、处理超时的 RecordBatch,真正进行发送发送 RecordBatch 的是 sendProduceRequests(batches, now) 这个方法。里面的实现逻辑主要就是将 batches 中 leader 为同一个 node 的所有 RecordBatch 放在一个请求中进行发送。

以上就是 kafka producer 客户端发送的整个流程,如有问题,欢迎在评论区交流。