diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java index 7b09c438b488c..58119671bf691 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java @@ -18,6 +18,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -318,19 +319,17 @@ public synchronized void stop(Duration timeout) { } runner.get().cancel(true); - scheduler.get().shutdown(); - - Mono awaitScheduler = Mono.fromCallable(() -> scheduler.get().awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)); + Mono awaitScheduler = Mono.fromRunnable(() -> shutdownWithAwait(scheduler.get(), timeout.toMillis())); Flux clearOwnership = checkpointStore.listOwnership(fullyQualifiedNamespace, eventHubName, consumerGroup) .filter(ownership -> identifier.equals(ownership.getOwnerId())) .map(ownership -> ownership.setOwnerId("")) .collect(Collectors.toList()) - .flatMapMany(checkpointStore::claimOwnership); + .flatMapMany(p -> checkpointStore.claimOwnership(p).onErrorResume(ex -> Mono.empty())); Mono.when(awaitScheduler, - partitionPumpManager.stopAllPartitionPumps(), - clearOwnership) + partitionPumpManager.stopAllPartitionPumps().onErrorResume(ex -> Mono.empty()), + clearOwnership.onErrorResume(ex -> Mono.empty())) .block(timeout); } @@ -343,4 +342,16 @@ public synchronized void stop(Duration timeout) { public synchronized boolean isRunning() { return isRunning.get(); } + + private void shutdownWithAwait(ExecutorService service, long timeoutMillis) { + service.shutdown(); + try { + if (!service.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)) { + service.shutdownNow(); + } + } catch (InterruptedException ex) { + service.shutdownNow(); + Thread.currentThread().interrupt(); + } + } }