问题背景

我们在用kafka的时候,偶尔会遇到这样这样一个问题。

我们写的kafka的客户端程序,在启动的时候,会无缘无故的 卡住(阻塞) 如下图所示:

这时程序会长时间阻塞在这里,无法继续进行后续操作。

问题排查

因为日志没有任何报错信息,但是又可以肯定当前项目并没有完全启动成功。感觉像是程序当中有个地方卡到了。通过 VisualVM 工具dump 线程相关的信息,很快发现了问题所在。原来卡在了consumer初始化的地方。

一下是我这边的处理方式,大家可以参考下。如果有更好的方式欢迎大家相互交流。

以下方法是在初始化Consumer的时候进行处理的:

public KafkaConsumerImpl init() {
    if (group == null || group.isEmpty()) {
        throw new RuntimeException("phoenix.mq.group is empty");
    }
    Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, namesrvAddr);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    // 是否允许自动提交offset,这里设为false,下面手动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // ...
    consumer = new KafkaConsumer<>(props);

    // consumer 订阅的topic及partition
    topicPartition = new TopicPartition(this.topic, this.partitionId);
    this.partitions = Collections.singletonList(topicPartition);

    // 元数据初始化和连接测试,3次失败后抛出异常
    Callable<Boolean> call = new Callable<Boolean>() {
        boolean res = false;
        int tryTimes = 3;

        @Override
        public Boolean call() throws Exception {
            while (tryTimes-- > 0) {
                try {
                    consumer.assign(partitions);
                    // 默认初始化offset当前最大值
                    nextBeginOffset = consumer.position(topicPartition);
                    res = true;
                    break;
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        break; // 如果position在阻塞状态时,调用了 task.cancel 会抛出此异常。直接退出即可
                    }
                    LOG.error(e.getMessage(), e);
                    LOG.error(" ==> error when trying to fetch metadata for kafka. topic<{}>, partition<{}>", topic,
                            partitionId);
                }
                // sleep
                try {
                    Thread.sleep(2000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return res;
        }
    };

    FutureTask<Boolean> task = new FutureTask<>(call);
    new Thread(task).start();

    boolean isOk = false;
    try {
        isOk = task.get(10000, TimeUnit.MILLISECONDS);
    }
    catch (Exception e) {
        LOG.error("Get task result timeout", e);
    }

    task.cancel(true);

    if (isOk) {
        LOG.info(" ==> init kafka consumer succeed: servers<{}>, topic<{}>, partition<{}>, nextBeginOffset<{}>",
                namesrvAddr, topic, partitionId, nextBeginOffset);
    }
    else {
        throw new RuntimeException(String.format(
                " ==> init kafka consumer failed. please check the conf (listeners or advertised.listeners or ...) and try to ping the host name in the conf value"));
    }
    return this;
}

利用 FutureTask 的特性,定义一个定时任务, 在初始化Consumer的时候,尝试去连接kafka,如果配置的kafka的地址有误,或者配置出错在这里可以通过抛出错误体现出来。

最后通过task.get() 方法返回的结果来判断 Consumer 是否成功初始化。

评论