Skip to content

Commit

Permalink
Event processor - API review (#4582)
Browse files Browse the repository at this point in the history
* Support context for ConfigurationAsyncClient APIs

* Initial commit to event processor

* add comments

* Update checkpoint

* Update javadocs

* address cr comments

* address cr comments

* Bug fixes

* Add more sample scenarios and cleanup dead code

* Fix checkstyle

* Move InMemoryPartitionManager to main package

* Add javadocs

* Update instanceId to ownerId

* Fix checkstyle

* More javadocs
  • Loading branch information
srnagar authored Aug 2, 2019
1 parent 510feda commit 614ae78
Show file tree
Hide file tree
Showing 13 changed files with 1,160 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;

import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Mono;

/**
* The checkpoint manager that clients should use to update checkpoints to track progress of events processed. Each
* instance of a {@link PartitionProcessor} will be provided with it's own instance of a CheckpointManager.
*/
public class CheckpointManager {

private final PartitionContext partitionContext;
private final PartitionManager partitionManager;
private final AtomicReference<String> eTag;
private final String ownerId;

/**
* Creates a new checkpoint manager which {@link PartitionProcessor} can use to update checkpoints.
*
* @param ownerId The event processor identifier that is responsible for updating checkpoints.
* @param partitionContext The partition context providing necessary partition and event hub information for updating
* checkpoints.
* @param partitionManager The {@link PartitionManager} implementation that will be store the checkpoint information.
* @param eTag The last known ETag stored in {@link PartitionManager} for this partition. When the update checkpoint
* is called from this CheckpointManager, this ETag will be used to provide <a href="https://en.wikipedia.org/wiki/Optimistic_concurrency_control">optimistic
* concurrency</a>.
*/
CheckpointManager(String ownerId, PartitionContext partitionContext, PartitionManager partitionManager,
String eTag) {
this.ownerId = ownerId;
this.partitionContext = partitionContext;
this.partitionManager = partitionManager;
this.eTag = new AtomicReference<>(eTag);
}

/**
* Updates the checkpoint for this partition using the event data. This will serve as the last known successfully
* processed event in this partition if the update is successful.
*
* @param eventData The event data to use for updating the checkpoint.
* @return a representation of deferred execution of this call.
*/
public Mono<Void> updateCheckpoint(EventData eventData) {
String previousETag = this.eTag.get();
Checkpoint checkpoint = new Checkpoint()
.consumerGroupName(partitionContext.consumerGroupName())
.eventHubName(partitionContext.eventHubName())
.ownerId(ownerId)
.partitionId(partitionContext.partitionId())
.sequenceNumber(eventData.sequenceNumber())
.offset(eventData.offset())
.eTag(previousETag);
return this.partitionManager.updateCheckpoint(checkpoint)
.map(eTag -> this.eTag.compareAndSet(previousETag, eTag))
.then();
}

/**
* Updates a checkpoint using the given offset and sequence number. This will serve as the last known successfully
* processed event in this partition if the update is successful.
*
* @param sequenceNumber The sequence number to update the checkpoint.
* @param offset The offset to update the checkpoint.
* @return a representation of deferred execution of this call.
*/
public Mono<Void> updateCheckpoint(long sequenceNumber, String offset) {
String previousETag = this.eTag.get();
Checkpoint checkpoint = new Checkpoint()
.consumerGroupName(partitionContext.consumerGroupName())
.eventHubName(partitionContext.eventHubName())
.ownerId(ownerId)
.partitionId(partitionContext.partitionId())
.sequenceNumber(sequenceNumber)
.offset(offset)
.eTag(previousETag);

return this.partitionManager.updateCheckpoint(checkpoint)
.map(eTag -> this.eTag.compareAndSet(previousETag, eTag))
.then();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

/**
* Enumeration of all possible reasons a {@link PartitionProcessor} may be closed.
*/
public enum CloseReason {
/**
* If another event processor instance stole the ownership of a partition, this reason will be provided to {@link
* PartitionProcessor#close(CloseReason)}.
*/
LOST_PARTITION_OWNERSHIP,

/**
* If the event processor is shutting down by calling {@link EventProcessorAsyncClient#stop()}, the {@link
* PartitionProcessor#close(CloseReason)} will be called with this reason.
*/
EVENT_PROCESSOR_SHUTDOWN,

/**
* If a non-retryable exception occured when receiving events from Event Hub, this reason will be provided when {@link
* PartitionProcessor#close(CloseReason)} is called.
*/
EVENT_HUB_EXCEPTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.messaging.eventhubs.implementation.ConnectionStringProperties;
import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.implementation.ReactorProvider;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ProxyAuthenticationType;
import com.azure.messaging.eventhubs.models.ProxyConfiguration;
import reactor.core.scheduler.Scheduler;
Expand Down Expand Up @@ -70,6 +71,10 @@ public class EventHubClientBuilder {
private TransportType transport;
private String host;
private String eventHubPath;
private EventPosition initialEventPosition;
private PartitionProcessorFactory partitionProcessorFactory;
private String consumerGroupName;
private PartitionManager partitionManager;

/**
* Creates a new instance with the default transport {@link TransportType#AMQP}.
Expand Down Expand Up @@ -336,4 +341,87 @@ private ProxyConfiguration getDefaultProxyConfiguration(Configuration configurat

return new ProxyConfiguration(authentication, proxy, username, password);
}

/**
* This property must be set for building an {@link EventProcessorAsyncClient}.
*
* Sets the consumer group name from which the {@link EventProcessorAsyncClient} should consume events from.
*
* @param consumerGroupName The consumer group name this {@link EventProcessorAsyncClient} should consume events
* from.
* @return The updated {@link EventHubClientBuilder} object.
*/
public EventHubClientBuilder consumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
return this;
}

/**
* This property can be optionally set when building an {@link EventProcessorAsyncClient}.
*
* Sets the initial event position. If this property is not set and if checkpoint for a partition doesn't exist,
* {@link EventPosition#earliest()} will be used as the initial event position to start consuming events.
*
* @param initialEventPosition The initial event position.
* @return The updated {@link EventHubClientBuilder} object.
*/
public EventHubClientBuilder initialEventPosition(EventPosition initialEventPosition) {
this.initialEventPosition = initialEventPosition;
return this;
}

/**
* This property must be set when building an {@link EventProcessorAsyncClient}.
*
* Sets the {@link PartitionManager} the {@link EventProcessorAsyncClient} will use for storing partition
* ownership and checkpoint information.
*
* @param partitionManager Implementation of {@link PartitionManager}.
* @return The updated {@link EventHubClientBuilder} object.
*/
public EventHubClientBuilder partitionManager(PartitionManager partitionManager) {
// If this is not set, look for classes implementing PartitionManager interface
// in the classpath and use it automatically. (To be implemented)
this.partitionManager = partitionManager;
return this;
}

/**
* This property must be set when building an {@link EventProcessorAsyncClient}.
*
* Sets the partition processor factory for creating new instance(s) of {@link PartitionProcessor}.
*
* @param partitionProcessorFactory The factory that creates new processor for each partition.
* @return The updated {@link EventHubClientBuilder} object.
*/
public EventHubClientBuilder partitionProcessorFactory(PartitionProcessorFactory partitionProcessorFactory) {
this.partitionProcessorFactory = partitionProcessorFactory;
return this;
}

/**
* This will create a new {@link EventProcessorAsyncClient} configured with the options set in this builder. Each call
* to this method will return a new instance of {@link EventProcessorAsyncClient}.
*
* <p>
* A new instance of {@link EventHubAsyncClient} will be created with configured options by calling the {@link
* #buildAsyncClient()} that will be used by the {@link EventProcessorAsyncClient}.
* </p>
*
* <p>
* If the {@link #initialEventPosition(EventPosition) initial event position} is not set, all partitions processed by
* this {@link EventProcessorAsyncClient} will start processing from {@link EventPosition#earliest() earliest}
* available event in the respective partitions.
* </p>
*
* @return A new instance of {@link EventProcessorAsyncClient}.
*/
public EventProcessorAsyncClient buildEventProcessorAsyncClient() {
EventPosition initialEventPosition =
this.initialEventPosition == null ? EventPosition.earliest()
: this.initialEventPosition;

return new EventProcessorAsyncClient(buildAsyncClient(), this.consumerGroupName,
this.partitionProcessorFactory, initialEventPosition, partitionManager, eventHubPath);
}
}
Loading

0 comments on commit 614ae78

Please sign in to comment.