Skip to content

Commit

Permalink
Enable blocking operation in batch receive (#12822)
Browse files Browse the repository at this point in the history
* Enable blocking operation in batch receive

* Fix error type

* Remove unused imports
  • Loading branch information
srnagar authored Jul 7, 2020
1 parent 927fa43 commit 5a6bd8c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

Expand All @@ -22,7 +21,6 @@
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -123,12 +121,6 @@ void loadBalance() {
final Mono<List<String>> partitionsMono = eventHubAsyncClient
.getPartitionIds()
.timeout(Duration.ofMinutes(1))
.onErrorResume(TimeoutException.class, error -> {
// In the subsequent step where it tries to balance the load, it'll propagate an error to the user.
// So it is okay to return an empty Flux.
logger.warning("Unable to get partitionIds from eventHubAsyncClient.");
return Flux.empty();
})
.collectList();

Mono.zip(partitionOwnershipMono, partitionsMono)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

/**
* The partition pump manager that keeps track of all the partition pumps started by this {@link EventProcessorClient}.
Expand Down Expand Up @@ -164,37 +165,30 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi

partitionPumps.put(claimedOwnership.getPartitionId(), eventHubConsumer);
//@formatter:off
Flux<Flux<PartitionEvent>> partitionEventFlux;
if (maxWaitTime != null) {
eventHubConsumer
partitionEventFlux = eventHubConsumer
.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions)
.windowTimeout(maxBatchSize, maxWaitTime)
.flatMap(Flux::collectList)
.subscribe(partitionEventBatch -> processEvents(partitionContext, partitionProcessor,
eventHubConsumer, partitionEventBatch),
/* EventHubConsumer receive() returned an error */
ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> {
partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN));
cleanup(claimedOwnership, eventHubConsumer);
});
.windowTimeout(maxBatchSize, maxWaitTime);
} else {
eventHubConsumer
partitionEventFlux = eventHubConsumer
.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions)
.window(maxBatchSize)
.flatMap(Flux::collectList)
.subscribe(partitionEventBatch -> {
processEvents(partitionContext, partitionProcessor,
eventHubConsumer, partitionEventBatch);
},
/* EventHubConsumer receive() returned an error */
ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> {
partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN));
cleanup(claimedOwnership, eventHubConsumer);
});
.window(maxBatchSize);
}
partitionEventFlux
.flatMap(Flux::collectList)
.publishOn(Schedulers.boundedElastic())
.subscribe(partitionEventBatch -> {
processEvents(partitionContext, partitionProcessor,
eventHubConsumer, partitionEventBatch);
},
/* EventHubConsumer receive() returned an error */
ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> {
partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN));
cleanup(claimedOwnership, eventHubConsumer);
});
//@formatter:on
} catch (Exception ex) {
if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
Expand Down

0 comments on commit 5a6bd8c

Please sign in to comment.