Skip to content

Commit

Permalink
Close the replicator and replication client when delete cluster. (apa…
Browse files Browse the repository at this point in the history
…che#11342)

Currently, when a cluster has been deleted, the replicator and the replication client will not be closed.
The producer of the replicator will try to reconnect to the deleted cluster continuously.
We should close and clean up the replicator and the replication client for the deleted cluster.

A new test was added for verifying the replicator and the replication client will be closed after the cluster is deleted.

(cherry picked from commit 8a4147e)
  • Loading branch information
codelipenghui authored and lhotari committed Feb 8, 2022
1 parent 33b5f2f commit 60d462a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
Expand Down Expand Up @@ -135,6 +136,8 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -594,6 +597,8 @@ public void start() throws PulsarServerException {
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getZooKeeperOperationTimeoutSeconds());

pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
.name("pulsar-ordered")
Expand Down Expand Up @@ -825,6 +830,14 @@ public Boolean get() {
}
}

private void handleDeleteCluster(Notification notification) {
if (notification.getPath().startsWith(ClusterResources.CLUSTERS_ROOT)
&& notification.getType() == NotificationType.Deleted) {
final String clusterName = notification.getPath().substring(ClusterResources.CLUSTERS_ROOT.length() + 1);
getBrokerService().closeAndRemoveReplicationClient(clusterName);
}
}

public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return MetadataStoreExtended.create(config.getZookeeperServers(),
MetadataStoreConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,32 @@ public void close() throws IOException {
}
}

public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterName) {
List<CompletableFuture<Void>> futures = new ArrayList<>((int) topics.size());
topics.forEach((__, future) -> {
CompletableFuture<Void> f = new CompletableFuture<>();
futures.add(f);
future.whenComplete((ot, ex) -> {
if (ot.isPresent()) {
Replicator r = ot.get().getReplicators().get(clusterName);
if (r != null && r.isConnected()) {
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
return;
}
}
f.complete(null);
});
});

return FutureUtil.waitForAll(futures).thenCompose(__ -> {
PulsarClient client = replicationClients.remove(clusterName);
if (client == null) {
return CompletableFuture.completedFuture(null);
}
return client.closeAsync();
});
}

public CompletableFuture<Void> closeAsync() {
try {
log.info("Shutting down Pulsar Broker service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.util.UUID;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -43,6 +44,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -980,6 +982,55 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
nonPersistentProducer2.close();
}

@Test
public void testRemoveClusterFromNamespace() throws Exception {
final String cluster4 = "r4";
admin1.clusters().createCluster(cluster4, ClusterData.builder()
.serviceUrl(url3.toString())
.serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
.build());

admin1.tenants().createTenant("pulsar1",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", "r3", cluster4)));

admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3", cluster4));

PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster4);
Assert.assertNotNull(repClient1);
Assert.assertFalse(repClient1.isClosed());

PulsarClient client = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();

Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create();

producer.send("Pulsar".getBytes());

producer.close();
client.close();

Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
.get().getReplicators().get(cluster4);

Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));

admin1.clusters().deleteCluster(cluster4);

Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed()));

Awaitility.await().untilAsserted(() -> Assert.assertNull(
pulsar1.getBrokerService().getReplicationClients().get(cluster4)));
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}

0 comments on commit 60d462a

Please sign in to comment.