Skip to content

Commit

Permalink
Cancel scheduled tasks when deleting ManagedLedgerImpl (apache#12565)
Browse files Browse the repository at this point in the history
(cherry picked from commit a95a382)
(cherry picked from commit 935f8ba)
  • Loading branch information
Masahiro Sakamoto authored and nicoloboschi committed Dec 10, 2021
1 parent 588bc43 commit 4f4072b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1325,14 +1325,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c

factory.close(this);
STATE_UPDATER.set(this, State.Closed);

if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}
cancelScheduledTasks();

LedgerHandle lh = currentLedger;

Expand Down Expand Up @@ -2621,6 +2614,7 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
cancelScheduledTasks();

List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
if (cursors.isEmpty()) {
Expand Down Expand Up @@ -3985,4 +3979,14 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void cancelScheduledTasks() {
if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -3073,4 +3074,36 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){
", dataLength=" + dataLength +
'}';
}

@Test
public void testCancellationOfScheduledTasks() throws Exception {
Field timeoutTaskField = ManagedLedgerImpl.class.getDeclaredField("timeoutTask");
timeoutTaskField.setAccessible(true);
Field checkLedgerRollTaskField = ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask");
checkLedgerRollTaskField.setAccessible(true);

ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger_1");
ledger1.addEntry("dummy-entry-1".getBytes(Encoding));
ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>) timeoutTaskField.get(ledger1);
assertNotNull(timeoutTask1);
assertFalse(timeoutTask1.isDone());
ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger1);
assertNotNull(checkLedgerRollTask1);
assertFalse(checkLedgerRollTask1.isDone());
ledger1.close();
assertTrue(timeoutTask1.isCancelled());
assertTrue(checkLedgerRollTask1.isCancelled());

ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory.open("my_test_ledger_2");
ledger2.addEntry("dummy-entry-2".getBytes(Encoding));
ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>) timeoutTaskField.get(ledger2);
assertNotNull(timeoutTask2);
assertFalse(timeoutTask2.isDone());
ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger2);
assertNotNull(checkLedgerRollTask2);
assertFalse(checkLedgerRollTask2.isDone());
ledger2.delete();
assertTrue(timeoutTask2.isCancelled());
assertTrue(checkLedgerRollTask2.isCancelled());
}
}

0 comments on commit 4f4072b

Please sign in to comment.