From 9e123db01fcac5c9b3f4b79d0fc9c9c858614d52 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Fri, 5 Feb 2021 09:59:14 +0800 Subject: [PATCH] fix for UT failed --- .../streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 4 ++-- .../streamnative/pulsar/handlers/kop/KafkaTopicManager.java | 2 +- .../pulsar/handlers/kop/utils/ZooKeeperUtils.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index dc44be7421..079d060e33 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -179,8 +179,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final int defaultNumPartitions; public final int maxReadEntriesNum; // store the group name for current connected client. - public final ConcurrentHashMap currentConnectedGroup; - public final String groupIdStoredPath; + private final ConcurrentHashMap currentConnectedGroup; + private final String groupIdStoredPath; @Getter private final EntryFormatter entryFormatter; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index fc5b6accf5..25aa6c6a65 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -381,7 +381,7 @@ public void deReference(String topicName) { public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { // make sure internal consumer existed CompletableFuture consumerFuture = new CompletableFuture<>(); - if (!requestHandler.getGroupCoordinator() + if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator() .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { log.warn("not get consumer for group {} this time", groupId); consumerFuture.complete(null); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java index c0104dda36..c0a95561a6 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java @@ -47,7 +47,7 @@ public static void createPath(ZooKeeper zooKeeper, String zkPath, String subPath } public static String getData(ZooKeeper zooKeeper, String zkPath, String subPath) { - String data = ""; + String data = null; try { String addSubPath = zkPath + subPath; Stat zkStat = zooKeeper.exists(addSubPath, true);