yanliang



问题背景

我们在用kafka的时候,偶尔会遇到这样这样一个问题。

我们写的kafka的客户端程序,在启动的时候,会无缘无故的 卡住(阻塞) 如下图所示:

这时程序会长时间阻塞在这里,无法继续进行后续操作。

问题排查

因为日志没有任何报错信息,但是又可以肯定当前项目并没有完全启动成功。感觉像是程序当中有个地方卡到了。通过 VisualVM 工具dump 线程相关的信息,很快发现了问题所在。原来卡在了consumer初始化的地方。

一下是我这边的处理方式,大家可以参考下。如果有更好的方式欢迎大家相互交流。

以下方法是在初始化Consumer的时候进行处理的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public KafkaConsumerImpl init() {
if (group == null || group.isEmpty()) {
throw new RuntimeException("phoenix.mq.group is empty");
}
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, namesrvAddr);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
// 是否允许自动提交offset,这里设为false,下面手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// ...
consumer = new KafkaConsumer<>(props);

// consumer 订阅的topic及partition
topicPartition = new TopicPartition(this.topic, this.partitionId);
this.partitions = Collections.singletonList(topicPartition);

// 元数据初始化和连接测试,3次失败后抛出异常
Callable<Boolean> call = new Callable<Boolean>() {
boolean res = false;
int tryTimes = 3;

@Override
public Boolean call() throws Exception {
while (tryTimes-- > 0) {
try {
consumer.assign(partitions);
// 默认初始化offset当前最大值
nextBeginOffset = consumer.position(topicPartition);
res = true;
break;
} catch (Exception e) {
if (e instanceof InterruptedException) {
break; // 如果position在阻塞状态时,调用了 task.cancel 会抛出此异常。直接退出即可
}
LOG.error(e.getMessage(), e);
LOG.error(" ==> error when trying to fetch metadata for kafka. topic<{}>, partition<{}>", topic,
partitionId);
}
// sleep
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
return res;
}
};

FutureTask<Boolean> task = new FutureTask<>(call);
new Thread(task).start();

boolean isOk = false;
try {
isOk = task.get(10000, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
LOG.error("Get task result timeout", e);
}

task.cancel(true);

if (isOk) {
LOG.info(" ==> init kafka consumer succeed: servers<{}>, topic<{}>, partition<{}>, nextBeginOffset<{}>",
namesrvAddr, topic, partitionId, nextBeginOffset);
}
else {
throw new RuntimeException(String.format(
" ==> init kafka consumer failed. please check the conf (listeners or advertised.listeners or ...) and try to ping the host name in the conf value"));
}
return this;
}

利用 FutureTask 的特性,定义一个定时任务, 在初始化Consumer的时候,尝试去连接kafka,如果配置的kafka的地址有误,或者配置出错在这里可以通过抛出错误体现出来。

最后通过task.get() 方法返回的结果来判断 Consumer 是否成功初始化。


 评论


博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Material X 作为主题 , 总访问量为 次 , 总字数 18.1k
载入天数...载入时分秒...