Skip to content

Commit

Permalink
Getting the stats of a non-persistent topic that has been cleaned cau…
Browse files Browse the repository at this point in the history
…ses it to re-appear (apache#9029)

If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear.

For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint:

```
["non-persistent://public/bob/np"]
```

Since this topic is unused, it gets cleaned and no longer is returned by the endpoint:

```
[]
```

However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404):

```
bin/pulsar-admin topics stats non-persistent://public/bob/np
Warning: Nashorn engine is planned to be removed from a future JDK release
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 0,
  "msgOutCounter" : 0,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 0,
  "backlogSize" : 0,
  "publishers" : [ ],
  "subscriptions" : { },
  "replication" : { }
}
```

And now the topic re-appears on the topic-list endpoint:

```
["non-persistent://public/bob/np"]
```

When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value.
Add test case.

This change added tests and can be verified as in the bug description.

Run:
pulsar-admin topics create non-persistent://public/default/tmp
wait for the topic to be deleted
run
pulsar-admin topics stats non-persistent://public/default/tmp
  • Loading branch information
eolivelli authored and zzzming committed Dec 30, 2020
1 parent f4fdc0c commit 5c07070
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,19 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
}
}
final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
return topics.computeIfAbsent(topic, (topicName) -> {
return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
: createNonPersistentTopic(topicName);
});
if (isPersistentTopic) {
return topics.computeIfAbsent(topic, (topicName) -> {
return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
});
} else {
return topics.computeIfAbsent(topic, (topicName) -> {
if (createIfMissing) {
return createNonPersistentTopic(topicName);
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
});
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
return failedFuture(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@
package org.apache.pulsar.broker.service;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import lombok.Data;

import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -40,6 +50,8 @@

public class NonPersistentTopicE2ETest extends BrokerTestBase {

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class);

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -94,15 +106,32 @@ public void testGCWillDeleteSchema() throws Exception {
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));

// 2. Topic is not GCed with live connection
topicName = "non-persistent://prop/ns-abc/topic-2";
// 1a. Topic that add/removes subscription can be GC'd
topicName = "non-persistent://prop/ns-abc/topic-1a";
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));

admin.topics().deleteSubscription(topicName, subName);
consumer.close();

runGC();
topic = getTopic(topicName);
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));

// 2. Topic is not GCed with live connection
topicName = "non-persistent://prop/ns-abc/topic-2";
subName = "sub1";
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));

runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
Expand All @@ -125,4 +154,82 @@ public void testGCWillDeleteSchema() throws Exception {
assertFalse(topicHasSchema(topicName));
}

@Test
public void testPatternTopic() throws PulsarClientException, InterruptedException {
final String topic1 = "non-persistent://prop/ns-abc/testPatternTopic1-" + UUID.randomUUID().toString();
final String topic2 = "non-persistent://prop/ns-abc/testPatternTopic2-" + UUID.randomUUID().toString();
Pattern pattern = Pattern.compile("prop/ns-abc/test.*");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.subscriptionName("my-sub")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();

Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic1)
.create();

Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic2)
.create();

Thread.sleep(2000);
final int messages = 10;
for (int i = 0; i < messages; i++) {
producer1.send("Message sent by producer-1 -> " + i);
producer2.send("Message sent by producer-2 -> " + i);
}

for (int i = 0; i < messages * 2; i++) {
Message<String> received = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNotNull(received);
}

consumer.close();
producer1.close();
producer2.close();
}

@Test
public void testGC() throws Exception {
// 1. Simple successful GC
String topicName = "non-persistent://prop/ns-abc/topic-10";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();

assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// 2. Topic is not GCed with live connection
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();

runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// 3. Topic with subscription is not GCed even with no connections
consumer.close();

runGC();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// 4. Topic can be GCed after unsubscribe
admin.topics().deleteSubscription(topicName, subName);

runGC();
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
// 5. Get the topic and make sure it doesn't come back
admin.lookups().lookupTopic(topicName);
Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
assertFalse(topic.isPresent());
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// write again, the topic will be available
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
producer2.close();

assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
}
}

0 comments on commit 5c07070

Please sign in to comment.