Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
topic cache only to remove producer
Browse files Browse the repository at this point in the history
  • Loading branch information
dockerzhang committed Dec 18, 2020
1 parent 2f29ad4 commit 6e1594f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,8 @@ protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
if (t != null || perTopic == null) {
log.error("Failed while get persistentTopic topic: {} ts: {}. ",
perTopic == null ? "null" : perTopic.getName(), timestamp, t);

// remove cache when topic is null
topicManager.removeTopicManagerCache(perTopic.getName());
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.LEADER_NOT_AVAILABLE,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
Expand Down Expand Up @@ -1368,12 +1369,13 @@ private CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
topicManager.removeTopicManagerCache(topic.toString());
}

if (!topicManager.topicExists(topic.toString())
&& localListeners.contains(kopBrokerUrl)) {
if (localListeners.contains(kopBrokerUrl)) {
topicManager.getTopic(topic.toString()).whenComplete((persistentTopic, exception) -> {
if (exception != null || persistentTopic == null) {
log.warn("[{}] findBroker: Failed to getOrCreateTopic {}. broker:{}, exception:",
ctx.channel(), topic.toString(), kopBrokerUrl, exception);
// remove cache when topic is null
topicManager.removeTopicManagerCache(topic.toString());
returnFuture.complete(null);
} else {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class KafkaTopicManager {
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;

// cache for topics: <topicName, persistentTopic>
// cache for topics: <topicName, persistentTopic>, for removing producer
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;
Expand Down Expand Up @@ -114,7 +114,6 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
topicName,
t -> {
CompletableFuture<PersistentTopic> topic = getTopic(t);
checkState(topic != null);

return topic.thenApply(t2 -> {
if (log.isDebugEnabled()) {
Expand All @@ -123,6 +122,8 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
}

if (t2 == null) {
// remove cache when topic is null
removeTopicManagerCache(topic.toString());
return null;
}
// return consumer manager
Expand All @@ -142,11 +143,6 @@ public static void clearTopicManagerCache() {
KOP_ADDRESS_CACHE.clear();
}

// whether topic exists in cache.
public boolean topicExists(String topicName) {
return topics.containsKey(topicName);
}

// exception throw for pulsar.getClient();
private Producer registerInPersistentTopic(PersistentTopic persistentTopic) throws Exception {
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
Expand Down Expand Up @@ -275,47 +271,28 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
rwLock.readLock().unlock();
}

return topics.computeIfAbsent(topicName, t -> {
getTopicBroker(t).whenCompleteAsync((ignore, th) -> {
if (th != null || ignore == null) {
log.warn("[{}] Failed getTopicBroker {}, return null PersistentTopic. throwable: ",
requestHandler.ctx.channel(), t, th);

// get topic broker returns null. topic should be removed from LookupCache.
if (ignore == null) {
removeTopicManagerCache(topicName);
}

topicCompletableFuture.complete(null);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] getTopicBroker for {} in KafkaTopicManager. brokerAddress: {}",
requestHandler.ctx.channel(), t, ignore);
}

brokerService.getTopic(t, brokerService.isAllowAutoTopicCreation(t)).whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), t, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(t);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), t);
topicCompletableFuture.complete(null);
}
});
});
return topicCompletableFuture;
brokerService.getTopic(topicName, brokerService.isAllowAutoTopicCreation(topicName))
.whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
return;
}
if (t2.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) t2.get();
topicCompletableFuture.complete(persistentTopic);
} else {
log.error("[{}]Get empty topic for name {}",
requestHandler.ctx.channel(), topicName);
topicCompletableFuture.complete(null);
}
});
// cache for removing producer
topics.put(topicName, topicCompletableFuture);
return topicCompletableFuture;
}

public void registerProducerInPersistentTopic (String topicName, PersistentTopic persistentTopic) {
Expand Down Expand Up @@ -361,7 +338,6 @@ public synchronized void close() {

for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : topics.entrySet()) {
String topicName = entry.getKey();
removeTopicManagerCache(topicName);
CompletableFuture<PersistentTopic> topicFuture = entry.getValue();
if (log.isDebugEnabled()) {
log.debug("[{}] remove producer {} for topic {} at close()",
Expand All @@ -371,8 +347,8 @@ public synchronized void close() {
topicFuture.get().removeProducer(references.get(topicName));
references.remove(topicName);
}
topics.remove(topicName);
}
// clear topics after close
topics.clear();
} catch (Exception e) {
log.error("[{}] Failed to close KafkaTopicManager. exception:",
Expand All @@ -394,11 +370,6 @@ public void deReference(String topicName) {
consumerTopicManagers.remove(topicName);
}

if (!topics.containsKey(topicName)) {
return;
}
topics.get(topicName).get().removeProducer(references.get(topicName));
topics.remove(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
requestHandler.ctx.channel(), topicName, e);
Expand Down

0 comments on commit 6e1594f

Please sign in to comment.