Skip to content

Commit

Permalink
review: shutdown and resume on error
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Sep 17, 2024
1 parent 1256109 commit 916097d
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -318,19 +319,17 @@ public synchronized void stop(Duration timeout) {
}
runner.get().cancel(true);

scheduler.get().shutdown();

Mono<Boolean> awaitScheduler = Mono.fromCallable(() -> scheduler.get().awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS));
Mono<Boolean> awaitScheduler = Mono.fromRunnable(() -> shutdownWithAwait(scheduler.get(), timeout.toMillis()));
Flux<PartitionOwnership> clearOwnership =
checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup)
.filter(ownership -> identifier.equals(ownership.getOwnerId()))
.map(ownership -> ownership.setOwnerId(""))
.collect(Collectors.toList())
.flatMapMany(checkpointStore::claimOwnership);
.flatMapMany(p -> checkpointStore.claimOwnership(p).onErrorResume(ex -> Mono.empty()));

Mono.when(awaitScheduler,
partitionPumpManager.stopAllPartitionPumps(),
clearOwnership)
partitionPumpManager.stopAllPartitionPumps().onErrorResume(ex -> Mono.empty()),
clearOwnership.onErrorResume(ex -> Mono.empty()))
.block(timeout);
}

Expand All @@ -343,4 +342,16 @@ public synchronized void stop(Duration timeout) {
public synchronized boolean isRunning() {
return isRunning.get();
}

private void shutdownWithAwait(ExecutorService service, long timeoutMillis) {
service.shutdown();
try {
if (!service.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)) {
service.shutdownNow();
}
} catch (InterruptedException ex) {
service.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

0 comments on commit 916097d

Please sign in to comment.