diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index b9f35814cc..b71347a1d9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -684,7 +684,8 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch, if (t != null || perTopic == null) { log.error("Failed while get persistentTopic topic: {} ts: {}. ", perTopic == null ? "null" : perTopic.getName(), timestamp, t); - + // remove cache when topic is null + topicManager.removeTopicManagerCache(perTopic.getName()); partitionData.complete(new ListOffsetResponse.PartitionData( Errors.LEADER_NOT_AVAILABLE, ListOffsetResponse.UNKNOWN_TIMESTAMP, @@ -1368,12 +1369,13 @@ private CompletableFuture findBroker(TopicName topic) { topicManager.removeTopicManagerCache(topic.toString()); } - if (!topicManager.topicExists(topic.toString()) - && localListeners.contains(kopBrokerUrl)) { + if (localListeners.contains(kopBrokerUrl)) { topicManager.getTopic(topic.toString()).whenComplete((persistentTopic, exception) -> { if (exception != null || persistentTopic == null) { log.warn("[{}] findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:", ctx.channel(), topic.toString(), kopBrokerUrl, exception); + // remove cache when topic is null + topicManager.removeTopicManagerCache(topic.toString()); returnFuture.complete(null); } else { if (log.isDebugEnabled()) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index fb9b05fb77..511778dea8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -51,7 +51,7 @@ public class KafkaTopicManager { @Getter private final ConcurrentHashMap> consumerTopicManagers; - // cache for topics: + // cache for topics: , for removing producer private final ConcurrentHashMap> topics; // cache for references in PersistentTopic: private final ConcurrentHashMap references; @@ -114,7 +114,6 @@ public CompletableFuture getTopicConsumerManager(Stri topicName, t -> { CompletableFuture topic = getTopic(t); - checkState(topic != null); return topic.thenApply(t2 -> { if (log.isDebugEnabled()) { @@ -123,6 +122,8 @@ public CompletableFuture getTopicConsumerManager(Stri } if (t2 == null) { + // remove cache when topic is null + removeTopicManagerCache(topic.toString()); return null; } // return consumer manager @@ -142,11 +143,6 @@ public static void clearTopicManagerCache() { KOP_ADDRESS_CACHE.clear(); } - // whether topic exists in cache. - public boolean topicExists(String topicName) { - return topics.containsKey(topicName); - } - // exception throw for pulsar.getClient(); private Producer registerInPersistentTopic(PersistentTopic persistentTopic) throws Exception { Producer producer = new InternalProducer(persistentTopic, internalServerCnx, @@ -275,47 +271,28 @@ public CompletableFuture getTopic(String topicName) { rwLock.readLock().unlock(); } - return topics.computeIfAbsent(topicName, t -> { - getTopicBroker(t).whenCompleteAsync((ignore, th) -> { - if (th != null || ignore == null) { - log.warn("[{}] Failed getTopicBroker {}, return null PersistentTopic. throwable: ", - requestHandler.ctx.channel(), t, th); - - // get topic broker returns null. topic should be removed from LookupCache. - if (ignore == null) { - removeTopicManagerCache(topicName); - } - - topicCompletableFuture.complete(null); - return; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] getTopicBroker for {} in KafkaTopicManager. brokerAddress: {}", - requestHandler.ctx.channel(), t, ignore); - } - - brokerService.getTopic(t, brokerService.isAllowAutoTopicCreation(t)).whenComplete((t2, throwable) -> { - if (throwable != null) { - log.error("[{}] Failed to getTopic {}. exception:", - requestHandler.ctx.channel(), t, throwable); - // failed to getTopic from current broker, remove cache, which added in getTopicBroker. - removeTopicManagerCache(t); - topicCompletableFuture.complete(null); - return; - } - if (t2.isPresent()) { - PersistentTopic persistentTopic = (PersistentTopic) t2.get(); - topicCompletableFuture.complete(persistentTopic); - } else { - log.error("[{}]Get empty topic for name {}", - requestHandler.ctx.channel(), t); - topicCompletableFuture.complete(null); - } - }); - }); - return topicCompletableFuture; + brokerService.getTopic(topicName, brokerService.isAllowAutoTopicCreation(topicName)) + .whenComplete((t2, throwable) -> { + if (throwable != null) { + log.error("[{}] Failed to getTopic {}. exception:", + requestHandler.ctx.channel(), topicName, throwable); + // failed to getTopic from current broker, remove cache, which added in getTopicBroker. + removeTopicManagerCache(topicName); + topicCompletableFuture.complete(null); + return; + } + if (t2.isPresent()) { + PersistentTopic persistentTopic = (PersistentTopic) t2.get(); + topicCompletableFuture.complete(persistentTopic); + } else { + log.error("[{}]Get empty topic for name {}", + requestHandler.ctx.channel(), topicName); + topicCompletableFuture.complete(null); + } }); + // cache for removing producer + topics.put(topicName, topicCompletableFuture); + return topicCompletableFuture; } public void registerProducerInPersistentTopic (String topicName, PersistentTopic persistentTopic) { @@ -361,18 +338,20 @@ public synchronized void close() { for (Map.Entry> entry : topics.entrySet()) { String topicName = entry.getKey(); - removeTopicManagerCache(topicName); CompletableFuture topicFuture = entry.getValue(); if (log.isDebugEnabled()) { log.debug("[{}] remove producer {} for topic {} at close()", requestHandler.ctx.channel(), references.get(topicName), topicName); } if (references.get(topicName) != null) { - topicFuture.get().removeProducer(references.get(topicName)); + PersistentTopic persistentTopic = topicFuture.get(); + if (persistentTopic != null) { + persistentTopic.removeProducer(references.get(topicName)); + } references.remove(topicName); } - topics.remove(topicName); } + // clear topics after close topics.clear(); } catch (Exception e) { log.error("[{}] Failed to close KafkaTopicManager. exception:", @@ -397,7 +376,10 @@ public void deReference(String topicName) { if (!topics.containsKey(topicName)) { return; } - topics.get(topicName).get().removeProducer(references.get(topicName)); + PersistentTopic persistentTopic = topics.get(topicName).get(); + if (persistentTopic != null) { + persistentTopic.removeProducer(references.get(topicName)); + } topics.remove(topicName); } catch (Exception e) { log.error("[{}] Failed to close reference for individual topic {}. exception:",