Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

remove unnecessary topics cache for Topic Manager #267

Merged
merged 2 commits into from
Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
if (t != null || perTopic == null) {
log.error("Failed while get persistentTopic topic: {} ts: {}. ",
perTopic == null ? "null" : perTopic.getName(), timestamp, t);

// remove cache when topic is null
topicManager.removeTopicManagerCache(perTopic.getName());
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.LEADER_NOT_AVAILABLE,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
Expand Down Expand Up @@ -1368,12 +1369,13 @@ private CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
topicManager.removeTopicManagerCache(topic.toString());
}

if (!topicManager.topicExists(topic.toString())
&& localListeners.contains(kopBrokerUrl)) {
if (localListeners.contains(kopBrokerUrl)) {
topicManager.getTopic(topic.toString()).whenComplete((persistentTopic, exception) -> {
if (exception != null || persistentTopic == null) {
log.warn("[{}] findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:",
ctx.channel(), topic.toString(), kopBrokerUrl, exception);
// remove cache when topic is null
topicManager.removeTopicManagerCache(topic.toString());
returnFuture.complete(null);
} else {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class KafkaTopicManager {
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;

// cache for topics: <topicName, persistentTopic>
// cache for topics: <topicName, persistentTopic>, for removing producer
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;
Expand Down Expand Up @@ -114,7 +114,6 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
topicName,
t -> {
CompletableFuture<PersistentTopic> topic = getTopic(t);
checkState(topic != null);

return topic.thenApply(t2 -> {
if (log.isDebugEnabled()) {
Expand All @@ -123,6 +122,8 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
}

if (t2 == null) {
// remove cache when topic is null
removeTopicManagerCache(topic.toString());
return null;
}
// return consumer manager
Expand All @@ -142,11 +143,6 @@ public static void clearTopicManagerCache() {
KOP_ADDRESS_CACHE.clear();
}

// whether topic exists in cache.
public boolean topicExists(String topicName) {
return topics.containsKey(topicName);
}

// exception throw for pulsar.getClient();
private Producer registerInPersistentTopic(PersistentTopic persistentTopic) throws Exception {
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
Expand Down Expand Up @@ -275,47 +271,28 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
rwLock.readLock().unlock();
}

return topics.computeIfAbsent(topicName, t -> {
getTopicBroker(t).whenCompleteAsync((ignore, th) -> {
if (th != null || ignore == null) {
log.warn("[{}] Failed getTopicBroker {}, return null PersistentTopic. throwable: ",
requestHandler.ctx.channel(), t, th);

// get topic broker returns null. topic should be removed from LookupCache.
if (ignore == null) {
removeTopicManagerCache(topicName);
}

topicCompletableFuture.complete(null);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] getTopicBroker for {} in KafkaTopicManager. brokerAddress: {}",
requestHandler.ctx.channel(), t, ignore);
}

brokerService.getTopic(t, brokerService.isAllowAutoTopicCreation(t)).whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), t, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(t);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), t);
topicCompletableFuture.complete(null);
}
});
});
return topicCompletableFuture;
brokerService.getTopic(topicName, brokerService.isAllowAutoTopicCreation(topicName))
.whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), topicName);
topicCompletableFuture.complete(null);
}
});
// cache for removing producer
topics.put(topicName, topicCompletableFuture);
return topicCompletableFuture;
}

public void registerProducerInPersistentTopic (String topicName, PersistentTopic persistentTopic) {
Expand Down Expand Up @@ -361,18 +338,20 @@ public synchronized void close() {

for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : topics.entrySet()) {
String topicName = entry.getKey();
removeTopicManagerCache(topicName);
dockerzhang marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<PersistentTopic> topicFuture = entry.getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] remove producer {} for topic {} at close()",
requestHandler.ctx.channel(), references.get(topicName), topicName);
}
if (references.get(topicName) != null) {
topicFuture.get().removeProducer(references.get(topicName));
PersistentTopic persistentTopic = topicFuture.get();
if (persistentTopic != null) {
persistentTopic.removeProducer(references.get(topicName));
}
references.remove(topicName);
}
topics.remove(topicName);
}
// clear topics after close
topics.clear();
} catch (Exception e) {
log.error("[{}] Failed to close KafkaTopicManager. exception:",
Expand All @@ -397,7 +376,10 @@ public void deReference(String topicName) {
if (!topics.containsKey(topicName)) {
return;
}
topics.get(topicName).get().removeProducer(references.get(topicName));
PersistentTopic persistentTopic = topics.get(topicName).get();
if (persistentTopic != null) {
persistentTopic.removeProducer(references.get(topicName));
}
topics.remove(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
Expand Down