diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ffeed8bd2ea86..85ee429c026b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1309,6 +1309,7 @@ public void closeComplete(Object ctx) { unregisterTopicPolicyListener(); log.info("[{}] Topic closed", topic); + cancelFencedTopicMonitoringTask(); closeFuture.complete(null); }) .exceptionally(ex -> { @@ -2899,6 +2900,13 @@ public boolean isPersistent() { return true; } + private synchronized void cancelFencedTopicMonitoringTask() { + ScheduledFuture monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask != null && !monitoringTask.isDone()) { + monitoringTask.cancel(false); + } + } + private synchronized void fence() { isFenced = true; ScheduledFuture monitoringTask = this.fencedTopicMonitoringTask; @@ -2913,10 +2921,7 @@ private synchronized void fence() { private synchronized void unfence() { isFenced = false; - ScheduledFuture monitoringTask = this.fencedTopicMonitoringTask; - if (monitoringTask != null && !monitoringTask.isDone()) { - monitoringTask.cancel(false); - } + cancelFencedTopicMonitoringTask(); } private void closeFencedTopicForcefully() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 19ee7f58c4b1e..d1aba97ee9032 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -2162,6 +2162,28 @@ public void testTopicFencingTimeout() throws Exception { assertTrue((boolean) isClosingOrDeletingField.get(topic)); } + @Test + public void testTopicCloseFencingTimeout() throws Exception { + pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10); + Method fence = PersistentTopic.class.getDeclaredMethod("fence"); + fence.setAccessible(true); + Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask"); + fencedTopicMonitoringTaskField.setAccessible(true); + + // create topic + PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); + + // fence topic to init fencedTopicMonitoringTask + fence.invoke(topic); + + // close topic + topic.close().get(); + assertFalse(brokerService.getTopicReference(successTopicName).isPresent()); + ScheduledFuture fencedTopicMonitoringTask = (ScheduledFuture) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertTrue(fencedTopicMonitoringTask.isCancelled()); + } + @Test public void testGetDurableSubscription() throws Exception { ManagedLedger mockLedger = mock(ManagedLedger.class);