Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
remove unnecessary topics cache for Topic Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Dec 16, 2020
1 parent c37d706 commit 0830650
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,8 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
if (t != null || perTopic == null) {
log.error("Failed while get persistentTopic topic: {} ts: {}. ",
perTopic == null ? "null" : perTopic.getName(), timestamp, t);

// remove cache when topic is null
topicManager.removeTopicManagerCache(perTopic.getName());
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.LEADER_NOT_AVAILABLE,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
Expand Down Expand Up @@ -1366,12 +1367,13 @@ private CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
topicManager.removeTopicManagerCache(topic.toString());
}

if (!topicManager.topicExists(topic.toString())
&& localListeners.contains(kopBrokerUrl)) {
if (localListeners.contains(kopBrokerUrl)) {
topicManager.getTopic(topic.toString()).whenComplete((persistentTopic, exception) -> {
if (exception != null || persistentTopic == null) {
log.warn("[{}] findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:",
ctx.channel(), topic.toString(), kopBrokerUrl, exception);
// remove cache when topic is null
topicManager.removeTopicManagerCache(topic.toString());
returnFuture.complete(null);
} else {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static com.google.common.base.Preconditions.checkState;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -51,8 +50,6 @@ public class KafkaTopicManager {
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;

// cache for topics: <topicName, persistentTopic>
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;

Expand Down Expand Up @@ -81,7 +78,6 @@ public class KafkaTopicManager {
this.internalServerCnx = new InternalServerCnx(requestHandler);

consumerTopicManagers = new ConcurrentHashMap<>();
topics = new ConcurrentHashMap<>();
references = new ConcurrentHashMap<>();

this.rwLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -114,7 +110,6 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
topicName,
t -> {
CompletableFuture<PersistentTopic> topic = getTopic(t);
checkState(topic != null);

return topic.thenApply(t2 -> {
if (log.isDebugEnabled()) {
Expand All @@ -123,6 +118,8 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
}

if (t2 == null) {
// remove cache when topic is null
removeTopicManagerCache(topic.toString());
return null;
}
// return consumer manager
Expand All @@ -137,11 +134,6 @@ public static void removeTopicManagerCache(String topicName) {
KOP_ADDRESS_CACHE.remove(topicName);
}

// whether topic exists in cache.
public boolean topicExists(String topicName) {
return topics.containsKey(topicName);
}

// exception throw for pulsar.getClient();
private Producer registerInPersistentTopic(PersistentTopic persistentTopic) throws Exception {
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
Expand Down Expand Up @@ -270,47 +262,27 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
rwLock.readLock().unlock();
}

return topics.computeIfAbsent(topicName, t -> {
getTopicBroker(t).whenCompleteAsync((ignore, th) -> {
if (th != null || ignore == null) {
log.warn("[{}] Failed getTopicBroker {}, return null PersistentTopic. throwable: ",
requestHandler.ctx.channel(), t, th);

// get topic broker returns null. topic should be removed from LookupCache.
if (ignore == null) {
removeTopicManagerCache(topicName);
}

topicCompletableFuture.complete(null);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] getTopicBroker for {} in KafkaTopicManager. brokerAddress: {}",
requestHandler.ctx.channel(), t, ignore);
}

brokerService.getTopic(t, brokerService.isAllowAutoTopicCreation(t)).whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), t, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(t);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), t);
topicCompletableFuture.complete(null);
}
});
});
return topicCompletableFuture;
brokerService.getTopic(topicName, brokerService.isAllowAutoTopicCreation(topicName))
.whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), topicName);
topicCompletableFuture.complete(null);
}
});

return topicCompletableFuture;
}

public void registerProducerInPersistentTopic (String topicName, PersistentTopic persistentTopic) {
Expand Down Expand Up @@ -353,22 +325,6 @@ public synchronized void close() {
manager.get().close();
}
consumerTopicManagers.clear();

for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : topics.entrySet()) {
String topicName = entry.getKey();
removeTopicManagerCache(topicName);
CompletableFuture<PersistentTopic> topicFuture = entry.getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] remove producer {} for topic {} at close()",
requestHandler.ctx.channel(), references.get(topicName), topicName);
}
if (references.get(topicName) != null) {
topicFuture.get().removeProducer(references.get(topicName));
references.remove(topicName);
}
topics.remove(topicName);
}
topics.clear();
} catch (Exception e) {
log.error("[{}] Failed to close KafkaTopicManager. exception:",
requestHandler.ctx.channel(), e);
Expand All @@ -389,11 +345,6 @@ public void deReference(String topicName) {
consumerTopicManagers.remove(topicName);
}

if (!topics.containsKey(topicName)) {
return;
}
topics.get(topicName).get().removeProducer(references.get(topicName));
topics.remove(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
requestHandler.ctx.channel(), topicName, e);
Expand Down

0 comments on commit 0830650

Please sign in to comment.