Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix null KafkaTopicConsumerManager not removed (#495)
Browse files Browse the repository at this point in the history
Fixes #493

This bug was introduced by #473. In `MessageFetchContext#handleFetch`, when the `KafkaTopicConsumerManager`'s future is completed with null, we should remove the future from `KafkaTopicManager#consumerTopicManagers`.

In addition, this PR adds some refactors for `consumerTopicManagers`:
1. Don't use getter to expose this field, use methods to operate it instead.
2. Check null for completed future before close `KafkaTopicConsumerManager`.
  • Loading branch information
BewareMyPower authored May 14, 2021
1 parent a50af46 commit 0e3ef0d
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ protected void writeAndFlushResponseToClient(Channel channel) {
// case 4: responseFuture is completed normally
responseFuture.thenApply(response -> {
if (log.isDebugEnabled()) {
log.debug("Write kafka cmd responseAndRequest back to client. \n"
+ "\trequest content: {} \n"
+ "\tresponseAndRequest content: {}",
log.debug("Write kafka cmd to client."
+ " request content: {}"
+ " responseAndRequest content: {}",
request, response.toString(request.getRequest().version()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void close() {
}
KafkaTopicManager.LOOKUP_CACHE.clear();
KopBrokerLookupManager.clear();
KafkaTopicManager.getConsumerTopicManagers().clear();
KafkaTopicManager.closeKafkaTopicConsumerManagers();
KafkaTopicManager.getReferences().clear();
KafkaTopicManager.getTopics().clear();
OffsetAcker.CONSUMERS.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.net.InetSocketAddress;
Expand All @@ -23,8 +24,10 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,7 +57,6 @@ public class KafkaTopicManager {
private final BrokerService brokerService;

// consumerTopicManagers for consumers cache.
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>>
consumerTopicManagers = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -309,10 +311,7 @@ public void close() {
try {
this.cursorExpireTask.cancel(true);

for (CompletableFuture<KafkaTopicConsumerManager> manager : consumerTopicManagers.values()) {
manager.get().close();
}
consumerTopicManagers.clear();
closeKafkaTopicConsumerManagers();

for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : topics.entrySet()) {
String topicName = entry.getKey();
Expand Down Expand Up @@ -345,9 +344,14 @@ public static void deReference(String topicName) {
try {
removeTopicManagerCache(topicName);

if (consumerTopicManagers.containsKey(topicName)) {
consumerTopicManagers.remove(topicName).get().close();
}
Optional.ofNullable(consumerTopicManagers.remove(topicName)).ifPresent(
// Use thenAccept to avoid blocking
tcmFuture -> tcmFuture.thenAccept(tcm -> {
if (tcm != null) {
tcm.close();
}
})
);

if (!topics.containsKey(topicName)) {
return;
Expand All @@ -365,6 +369,27 @@ public static void deReference(String topicName) {
}
}

public static void removeKafkaTopicConsumerManager(String topicName) {
consumerTopicManagers.remove(topicName);
}

public static void closeKafkaTopicConsumerManagers() {
consumerTopicManagers.forEach((topic, tcmFuture) -> {
try {
Optional.ofNullable(tcmFuture.get(300, TimeUnit.SECONDS))
.ifPresent(KafkaTopicConsumerManager::close);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Failed to get TCM future of {} when trying to close it", topic);
}
});
consumerTopicManagers.clear();
}

@VisibleForTesting
public static int getNumberOfKafkaTopicConsumerManagers() {
return consumerTopicManagers.size();
}

public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
if (closed.get()) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public CompletableFuture<AbstractResponse> handleFetch(
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
// remove null future cache from consumerTopicManagers
KafkaTopicManager.removeKafkaTopicConsumerManager(KopTopic.toString(topicPartition));
// result got. this will be filtered in following filter method.
return null;
}
Expand Down Expand Up @@ -242,8 +244,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
"cursor.readEntry fail. deleteCursor");
} else {
// remove null future cache from consumerTopicManagers
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaTopic));
KafkaTopicManager.removeKafkaTopicConsumerManager(
KopTopic.toString(kafkaTopic));
log.warn("Cursor deleted while TCM close.");
}
});
Expand Down Expand Up @@ -335,8 +337,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
cm.add(pair.getRight(), pair);
} else {
// remove null future cache from consumerTopicManagers
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
KafkaTopicManager.removeKafkaTopicConsumerManager(
KopTopic.toString(kafkaPartition));
log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM.");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ public void testGetTopicConsumerManager() throws Exception {
KafkaTopicConsumerManager topicConsumerManager2 = tcm.get();

assertTrue(topicConsumerManager == topicConsumerManager2);
assertEquals(kafkaTopicManager.getConsumerTopicManagers().size(), 1);
assertEquals(KafkaTopicManager.getNumberOfKafkaTopicConsumerManagers(), 1);

// 2. verify another get with different topic will return different tcm
String topicName2 = "persistent://public/default/testGetTopicConsumerManager2";
registerPartitionedTopic(topicName2);
tcm = kafkaTopicManager.getTopicConsumerManager(topicName2);
topicConsumerManager2 = tcm.get();
assertTrue(topicConsumerManager != topicConsumerManager2);
assertEquals(kafkaTopicManager.getConsumerTopicManagers().size(), 2);
assertEquals(KafkaTopicManager.getNumberOfKafkaTopicConsumerManagers(), 2);
}


Expand Down

0 comments on commit 0e3ef0d

Please sign in to comment.