From 5c070704cce5df1ab371501c8999c7a6b50659b4 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 29 Dec 2020 09:43:20 +0100 Subject: [PATCH] Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029) If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear. For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint: ``` ["non-persistent://public/bob/np"] ``` Since this topic is unused, it gets cleaned and no longer is returned by the endpoint: ``` [] ``` However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404): ``` bin/pulsar-admin topics stats non-persistent://public/bob/np Warning: Nashorn engine is planned to be removed from a future JDK release { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 0, "msgInCounter" : 0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 0, "backlogSize" : 0, "publishers" : [ ], "subscriptions" : { }, "replication" : { } } ``` And now the topic re-appears on the topic-list endpoint: ``` ["non-persistent://public/bob/np"] ``` When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value. Add test case. This change added tests and can be verified as in the bug description. Run: pulsar-admin topics create non-persistent://public/default/tmp wait for the topic to be deleted run pulsar-admin topics stats non-persistent://public/default/tmp --- .../pulsar/broker/service/BrokerService.java | 17 ++- .../service/NonPersistentTopicE2ETest.java | 111 +++++++++++++++++- 2 files changed, 122 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b27dba23607b9..eae03f1775f1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -731,10 +731,19 @@ public CompletableFuture> getTopic(final String topic, boolean c } } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); - return topics.computeIfAbsent(topic, (topicName) -> { - return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) - : createNonPersistentTopic(topicName); - }); + if (isPersistentTopic) { + return topics.computeIfAbsent(topic, (topicName) -> { + return this.loadOrCreatePersistentTopic(topicName, createIfMissing); + }); + } else { + return topics.computeIfAbsent(topic, (topicName) -> { + if (createIfMissing) { + return createNonPersistentTopic(topicName); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }); + } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topic, e); return failedFuture(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java index 9b429d306cf98..dee0c298ed84e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java @@ -19,17 +19,27 @@ package org.apache.pulsar.broker.service; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import lombok.Data; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -40,6 +50,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase { + private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class); + @BeforeMethod @Override protected void setup() throws Exception { @@ -94,8 +106,8 @@ public void testGCWillDeleteSchema() throws Exception { assertFalse(topic.isPresent()); assertFalse(topicHasSchema(topicName)); - // 2. Topic is not GCed with live connection - topicName = "non-persistent://prop/ns-abc/topic-2"; + // 1a. Topic that add/removes subscription can be GC'd + topicName = "non-persistent://prop/ns-abc/topic-1a"; String subName = "sub1"; Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); topic = getTopic(topicName); @@ -103,6 +115,23 @@ public void testGCWillDeleteSchema() throws Exception { topic.get().addSchema(schemaData).join(); assertTrue(topicHasSchema(topicName)); + admin.topics().deleteSubscription(topicName, subName); + consumer.close(); + + runGC(); + topic = getTopic(topicName); + assertFalse(topic.isPresent()); + assertFalse(topicHasSchema(topicName)); + + // 2. Topic is not GCed with live connection + topicName = "non-persistent://prop/ns-abc/topic-2"; + subName = "sub1"; + consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + topic = getTopic(topicName); + assertTrue(topic.isPresent()); + topic.get().addSchema(schemaData).join(); + assertTrue(topicHasSchema(topicName)); + runGC(); topic = getTopic(topicName); assertTrue(topic.isPresent()); @@ -125,4 +154,82 @@ public void testGCWillDeleteSchema() throws Exception { assertFalse(topicHasSchema(topicName)); } + @Test + public void testPatternTopic() throws PulsarClientException, InterruptedException { + final String topic1 = "non-persistent://prop/ns-abc/testPatternTopic1-" + UUID.randomUUID().toString(); + final String topic2 = "non-persistent://prop/ns-abc/testPatternTopic2-" + UUID.randomUUID().toString(); + Pattern pattern = Pattern.compile("prop/ns-abc/test.*"); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topicsPattern(pattern) + .subscriptionName("my-sub") + .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS) + .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics) + .subscribe(); + + Producer producer1 = pulsarClient.newProducer(Schema.STRING) + .topic(topic1) + .create(); + + Producer producer2 = pulsarClient.newProducer(Schema.STRING) + .topic(topic2) + .create(); + + Thread.sleep(2000); + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer1.send("Message sent by producer-1 -> " + i); + producer2.send("Message sent by producer-2 -> " + i); + } + + for (int i = 0; i < messages * 2; i++) { + Message received = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(received); + } + + consumer.close(); + producer1.close(); + producer2.close(); + } + + @Test + public void testGC() throws Exception { + // 1. Simple successful GC + String topicName = "non-persistent://prop/ns-abc/topic-10"; + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + producer.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + runGC(); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 2. Topic is not GCed with live connection + String subName = "sub1"; + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 3. Topic with subscription is not GCed even with no connections + consumer.close(); + + runGC(); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // 4. Topic can be GCed after unsubscribe + admin.topics().deleteSubscription(topicName, subName); + + runGC(); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + // 5. Get the topic and make sure it doesn't come back + admin.lookups().lookupTopic(topicName); + Optional topic = pulsar.getBrokerService().getTopicIfExists(topicName).join(); + assertFalse(topic.isPresent()); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + + // write again, the topic will be available + Producer producer2 = pulsarClient.newProducer().topic(topicName).create(); + producer2.close(); + + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); + } }