Skip to content

Commit

Permalink
Unit tests for user error handler (#7048)
Browse files Browse the repository at this point in the history
* User error tests

* Update unit tes

* Use jdk 8 methods

* Add unit tests
  • Loading branch information
srnagar authored Jan 3, 2020
1 parent 3b14327 commit 918103c
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -40,7 +42,7 @@ public class EventProcessorClient {
private final ClientLogger logger = new ClientLogger(EventProcessorClient.class);

private final String identifier;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final PartitionPumpManager partitionPumpManager;
private final PartitionBasedLoadBalancer partitionBasedLoadBalancer;
private final CheckpointStore checkpointStore;
Expand All @@ -55,13 +57,16 @@ public class EventProcessorClient {
* @param consumerGroup The consumer group name used in this event processor to consumer events.
* @param partitionProcessorFactory The factory to create new partition processor(s).
* @param checkpointStore The store used for reading and updating partition ownership and checkpoints. information.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this
* EventProcessorClient will also include the last enqueued event properties for it's respective partitions.
* @param trackLastEnqueuedEventProperties If set to {@code true}, all events received by this EventProcessorClient
* will also include the last enqueued event properties for it's respective partitions.
* @param tracerProvider The tracer implementation.
* @param processError Error handler for any errors that occur outside the context of a partition.
* @param initialPartitionEventPosition Map of initial event positions for partition ids.
*/
EventProcessorClient(EventHubClientBuilder eventHubClientBuilder, String consumerGroup,
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError) {
Supplier<PartitionProcessor> partitionProcessorFactory, CheckpointStore checkpointStore,
boolean trackLastEnqueuedEventProperties, TracerProvider tracerProvider, Consumer<ErrorContext> processError,
Map<String, EventPosition> initialPartitionEventPosition) {

Objects.requireNonNull(eventHubClientBuilder, "eventHubClientBuilder cannot be null.");
Objects.requireNonNull(consumerGroup, "consumerGroup cannot be null.");
Expand All @@ -71,7 +76,7 @@ public class EventProcessorClient {
this.identifier = UUID.randomUUID().toString();
logger.info("The instance ID for this event processors is {}", this.identifier);
this.partitionPumpManager = new PartitionPumpManager(checkpointStore, partitionProcessorFactory,
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider);
eventHubClientBuilder, trackLastEnqueuedEventProperties, tracerProvider, initialPartitionEventPosition);
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.checkpointStore, eventHubAsyncClient,
Expand Down Expand Up @@ -103,7 +108,7 @@ public String getIdentifier() {
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop}
*/
public synchronized void start() {
if (!started.compareAndSet(false, true)) {
if (!isRunning.compareAndSet(false, true)) {
logger.info("Event processor is already running");
return;
}
Expand All @@ -127,12 +132,22 @@ public synchronized void start() {
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorclient.startstop}
*/
public synchronized void stop() {
if (!started.compareAndSet(true, false)) {
if (!isRunning.compareAndSet(true, false)) {
logger.info("Event processor has already stopped");
return;
}
runner.get().dispose();
scheduler.get().dispose();
this.partitionPumpManager.stopAllPartitionPumps();
}

/**
* Returns {@code true} if the event processor is running. If the event processor is already running, calling
* {@link #start()} has no effect.
*
* @return {@code true} if the event processor is running.
*/
public synchronized boolean isRunning() {
return isRunning.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.InitializationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.function.Consumer;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class EventProcessorClientBuilder {
private Consumer<InitializationContext> processPartitionInitialization;
private Consumer<CloseContext> processPartitionClose;
private boolean trackLastEnqueuedEventProperties;
private Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();

/**
* Creates a new instance of {@link EventProcessorClientBuilder}.
Expand Down Expand Up @@ -289,14 +292,30 @@ public EventProcessorClientBuilder processPartitionClose(Consumer<CloseContext>
*
* @param trackLastEnqueuedEventProperties {@code true} if the resulting events will keep track of the last
* enqueued information for that partition; {@code false} otherwise.
*
* @return The updated {@link EventProcessorClientBuilder} instance.
*/
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties) {
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
return this;
}

/**
* Sets the map containing the event position to use for each partition if a checkpoint for the partition does not
* exist in {@link CheckpointStore}. This map is keyed off of the partition id. If there is no checkpoint in
* {@link CheckpointStore} and there is no entry in this map, the processing of the partition will start from
* {@link EventPosition#latest() latest} position.
*
* @param initialPartitionEventPosition Map of initial event positions for partition ids.
* @return The updated {@link EventProcessorClientBuilder} instance.
*/
public EventProcessorClientBuilder initialPartitionEventPosition(
Map<String, EventPosition> initialPartitionEventPosition) {

this.initialPartitionEventPosition = Objects.requireNonNull(initialPartitionEventPosition,
"'initialPartitionEventPosition' cannot be null.");
return this;
}

/**
* This will create a new {@link EventProcessorClient} configured with the options set in this builder. Each call to
* this method will return a new instance of {@link EventProcessorClient}.
Expand All @@ -322,7 +341,7 @@ public EventProcessorClient buildEventProcessorClient() {
final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
return new EventProcessorClient(eventHubClientBuilder, this.consumerGroup,
getPartitionProcessorSupplier(), checkpointStore, trackLastEnqueuedEventProperties, tracerProvider,
processError);
processError, initialPartitionEventPosition);
}

private Supplier<PartitionProcessor> getPartitionProcessorSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
// If the partitions are evenly distributed among all active event processors, no change required.
logger.info("Load is balanced");
// renew ownership of already owned partitions
checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe();
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe();
return;
}

Expand All @@ -216,7 +220,11 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
logger.info("This event processor owns {} partitions and shouldn't own more",
ownerPartitionMap.get(ownerId).size());
// renew ownership of already owned partitions
checkpointStore.claimOwnership(ownerPartitionMap.get(this.ownerId)).subscribe();
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe();
return;
}

Expand Down Expand Up @@ -345,11 +353,16 @@ private void claimOwnership(final Map<String, PartitionOwnership> partitionOwner
PartitionOwnership ownershipRequest = createPartitionOwnershipRequest(partitionOwnershipMap,
partitionIdToClaim);

List<PartitionOwnership> currentPartitionsOwned = ownerPartitionsMap.get(ownerId);
currentPartitionsOwned.add(ownershipRequest);
List<PartitionOwnership> partitionsToClaim = new ArrayList<>();
partitionsToClaim.add(ownershipRequest);
partitionsToClaim.addAll(partitionPumpManager.getPartitionPumps()
.keySet()
.stream()
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()));

checkpointStore
.claimOwnership(currentPartitionsOwned)
.claimOwnership(partitionsToClaim)
.timeout(Duration.ofMinutes(1)) // TODO: configurable
.doOnNext(partitionOwnership -> logger.info("Successfully claimed ownership of partition {}",
partitionOwnership.getPartitionId()))
Expand Down
Loading

0 comments on commit 918103c

Please sign in to comment.