diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CheckpointManager.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CheckpointManager.java new file mode 100644 index 0000000000000..3cfbb85f95976 --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CheckpointManager.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionContext; + +import java.util.concurrent.atomic.AtomicReference; +import reactor.core.publisher.Mono; + +/** + * The checkpoint manager that clients should use to update checkpoints to track progress of events processed. Each + * instance of a {@link PartitionProcessor} will be provided with it's own instance of a CheckpointManager. + */ +public class CheckpointManager { + + private final PartitionContext partitionContext; + private final PartitionManager partitionManager; + private final AtomicReference eTag; + private final String ownerId; + + /** + * Creates a new checkpoint manager which {@link PartitionProcessor} can use to update checkpoints. + * + * @param ownerId The event processor identifier that is responsible for updating checkpoints. + * @param partitionContext The partition context providing necessary partition and event hub information for updating + * checkpoints. + * @param partitionManager The {@link PartitionManager} implementation that will be store the checkpoint information. + * @param eTag The last known ETag stored in {@link PartitionManager} for this partition. When the update checkpoint + * is called from this CheckpointManager, this ETag will be used to provide optimistic + * concurrency. + */ + CheckpointManager(String ownerId, PartitionContext partitionContext, PartitionManager partitionManager, + String eTag) { + this.ownerId = ownerId; + this.partitionContext = partitionContext; + this.partitionManager = partitionManager; + this.eTag = new AtomicReference<>(eTag); + } + + /** + * Updates the checkpoint for this partition using the event data. This will serve as the last known successfully + * processed event in this partition if the update is successful. + * + * @param eventData The event data to use for updating the checkpoint. + * @return a representation of deferred execution of this call. + */ + public Mono updateCheckpoint(EventData eventData) { + String previousETag = this.eTag.get(); + Checkpoint checkpoint = new Checkpoint() + .consumerGroupName(partitionContext.consumerGroupName()) + .eventHubName(partitionContext.eventHubName()) + .ownerId(ownerId) + .partitionId(partitionContext.partitionId()) + .sequenceNumber(eventData.sequenceNumber()) + .offset(eventData.offset()) + .eTag(previousETag); + return this.partitionManager.updateCheckpoint(checkpoint) + .map(eTag -> this.eTag.compareAndSet(previousETag, eTag)) + .then(); + } + + /** + * Updates a checkpoint using the given offset and sequence number. This will serve as the last known successfully + * processed event in this partition if the update is successful. + * + * @param sequenceNumber The sequence number to update the checkpoint. + * @param offset The offset to update the checkpoint. + * @return a representation of deferred execution of this call. + */ + public Mono updateCheckpoint(long sequenceNumber, String offset) { + String previousETag = this.eTag.get(); + Checkpoint checkpoint = new Checkpoint() + .consumerGroupName(partitionContext.consumerGroupName()) + .eventHubName(partitionContext.eventHubName()) + .ownerId(ownerId) + .partitionId(partitionContext.partitionId()) + .sequenceNumber(sequenceNumber) + .offset(offset) + .eTag(previousETag); + + return this.partitionManager.updateCheckpoint(checkpoint) + .map(eTag -> this.eTag.compareAndSet(previousETag, eTag)) + .then(); + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CloseReason.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CloseReason.java new file mode 100644 index 0000000000000..a11cd5ba40739 --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/CloseReason.java @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +/** + * Enumeration of all possible reasons a {@link PartitionProcessor} may be closed. + */ +public enum CloseReason { + /** + * If another event processor instance stole the ownership of a partition, this reason will be provided to {@link + * PartitionProcessor#close(CloseReason)}. + */ + LOST_PARTITION_OWNERSHIP, + + /** + * If the event processor is shutting down by calling {@link EventProcessorAsyncClient#stop()}, the {@link + * PartitionProcessor#close(CloseReason)} will be called with this reason. + */ + EVENT_PROCESSOR_SHUTDOWN, + + /** + * If a non-retryable exception occured when receiving events from Event Hub, this reason will be provided when {@link + * PartitionProcessor#close(CloseReason)} is called. + */ + EVENT_HUB_EXCEPTION +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 1491a5f9400e4..9a3da92942616 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -18,6 +18,7 @@ import com.azure.messaging.eventhubs.implementation.ConnectionStringProperties; import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider; import com.azure.messaging.eventhubs.implementation.ReactorProvider; +import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.ProxyAuthenticationType; import com.azure.messaging.eventhubs.models.ProxyConfiguration; import reactor.core.scheduler.Scheduler; @@ -70,6 +71,10 @@ public class EventHubClientBuilder { private TransportType transport; private String host; private String eventHubPath; + private EventPosition initialEventPosition; + private PartitionProcessorFactory partitionProcessorFactory; + private String consumerGroupName; + private PartitionManager partitionManager; /** * Creates a new instance with the default transport {@link TransportType#AMQP}. @@ -336,4 +341,87 @@ private ProxyConfiguration getDefaultProxyConfiguration(Configuration configurat return new ProxyConfiguration(authentication, proxy, username, password); } + + /** + * This property must be set for building an {@link EventProcessorAsyncClient}. + * + * Sets the consumer group name from which the {@link EventProcessorAsyncClient} should consume events from. + * + * @param consumerGroupName The consumer group name this {@link EventProcessorAsyncClient} should consume events + * from. + * @return The updated {@link EventHubClientBuilder} object. + */ + public EventHubClientBuilder consumerGroupName(String consumerGroupName) { + this.consumerGroupName = consumerGroupName; + return this; + } + + /** + * This property can be optionally set when building an {@link EventProcessorAsyncClient}. + * + * Sets the initial event position. If this property is not set and if checkpoint for a partition doesn't exist, + * {@link EventPosition#earliest()} will be used as the initial event position to start consuming events. + * + * @param initialEventPosition The initial event position. + * @return The updated {@link EventHubClientBuilder} object. + */ + public EventHubClientBuilder initialEventPosition(EventPosition initialEventPosition) { + this.initialEventPosition = initialEventPosition; + return this; + } + + /** + * This property must be set when building an {@link EventProcessorAsyncClient}. + * + * Sets the {@link PartitionManager} the {@link EventProcessorAsyncClient} will use for storing partition + * ownership and checkpoint information. + * + * @param partitionManager Implementation of {@link PartitionManager}. + * @return The updated {@link EventHubClientBuilder} object. + */ + public EventHubClientBuilder partitionManager(PartitionManager partitionManager) { + // If this is not set, look for classes implementing PartitionManager interface + // in the classpath and use it automatically. (To be implemented) + this.partitionManager = partitionManager; + return this; + } + + /** + * This property must be set when building an {@link EventProcessorAsyncClient}. + * + * Sets the partition processor factory for creating new instance(s) of {@link PartitionProcessor}. + * + * @param partitionProcessorFactory The factory that creates new processor for each partition. + * @return The updated {@link EventHubClientBuilder} object. + */ + public EventHubClientBuilder partitionProcessorFactory(PartitionProcessorFactory partitionProcessorFactory) { + this.partitionProcessorFactory = partitionProcessorFactory; + return this; + } + + /** + * This will create a new {@link EventProcessorAsyncClient} configured with the options set in this builder. Each call + * to this method will return a new instance of {@link EventProcessorAsyncClient}. + * + *

+ * A new instance of {@link EventHubAsyncClient} will be created with configured options by calling the {@link + * #buildAsyncClient()} that will be used by the {@link EventProcessorAsyncClient}. + *

+ * + *

+ * If the {@link #initialEventPosition(EventPosition) initial event position} is not set, all partitions processed by + * this {@link EventProcessorAsyncClient} will start processing from {@link EventPosition#earliest() earliest} + * available event in the respective partitions. + *

+ * + * @return A new instance of {@link EventProcessorAsyncClient}. + */ + public EventProcessorAsyncClient buildEventProcessorAsyncClient() { + EventPosition initialEventPosition = + this.initialEventPosition == null ? EventPosition.earliest() + : this.initialEventPosition; + + return new EventProcessorAsyncClient(buildAsyncClient(), this.consumerGroupName, + this.partitionProcessorFactory, initialEventPosition, partitionManager, eventHubPath); + } } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorAsyncClient.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorAsyncClient.java new file mode 100644 index 0000000000000..292ee3af5806f --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorAsyncClient.java @@ -0,0 +1,225 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import com.azure.messaging.eventhubs.models.EventHubConsumerOptions; +import com.azure.messaging.eventhubs.models.EventPosition; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +/** + * This is the starting point for event processor. + *

+ * Event Processor based application consists of one or more instances of {@link EventProcessorAsyncClient} which are + * set up to consume events from the same Event Hub + consumer group and to balance the workload across different + * instances and track progress when events are processed. + *

+ */ +public class EventProcessorAsyncClient { + + private static final long INTERVAL_IN_SECONDS = 10; // run every 10 seconds + private static final long INITIAL_DELAY = 0; // start immediately + private static final long OWNERSHIP_EXPIRATION_TIME_IN_MILLIS = TimeUnit.SECONDS.toMillis(30); + private final ClientLogger logger = new ClientLogger(EventProcessorAsyncClient.class); + + private final EventHubAsyncClient eventHubAsyncClient; + private final String consumerGroupName; + private final EventPosition initialEventPosition; + private final PartitionProcessorFactory partitionProcessorFactory; + private final PartitionManager partitionManager; + private final String identifier; + private final Map partitionConsumers = new ConcurrentHashMap<>(); + private final String eventHubName; + private Disposable runner; + private Scheduler scheduler; + private AtomicBoolean started = new AtomicBoolean(false); + + /** + * Package-private constructor. Use {@link EventHubClientBuilder} to create an instance. + * + * @param eventHubAsyncClient The {@link EventHubAsyncClient}. + * @param consumerGroupName The consumer group name used in this event processor to consumer events. + * @param partitionProcessorFactory The factory to create new partition processor(s). + * @param initialEventPosition Initial event position to start consuming events. + * @param partitionManager The partition manager. + * @param eventHubName The Event Hub name. + */ + EventProcessorAsyncClient(EventHubAsyncClient eventHubAsyncClient, String consumerGroupName, + PartitionProcessorFactory partitionProcessorFactory, EventPosition initialEventPosition, + PartitionManager partitionManager, + String eventHubName) { + this.eventHubAsyncClient = Objects + .requireNonNull(eventHubAsyncClient, "eventHubAsyncClient cannot be null"); + this.consumerGroupName = Objects + .requireNonNull(consumerGroupName, "consumerGroupname cannot be null"); + this.partitionProcessorFactory = Objects + .requireNonNull(partitionProcessorFactory, "partitionProcessorFactory cannot be null"); + this.partitionManager = Objects + .requireNonNull(partitionManager, "partitionManager cannot be null"); + this.initialEventPosition = Objects + .requireNonNull(initialEventPosition, "initialEventPosition cannot be null"); + this.eventHubName = Objects + .requireNonNull(eventHubName, "eventHubName cannot be null"); + this.identifier = UUID.randomUUID().toString(); + logger.info("The instance ID for this event processors is {}", this.identifier); + } + + /** + * The identifier is a unique name given to this event processor instance. + * + * @return Identifier for this event processor. + */ + public String identifier() { + return this.identifier; + } + + /** + * Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a + * dedicated {@link PartitionProcessor} to each partition. If there are other Event Processors active for the same + * consumer group on the Event Hub, responsibility for partitions will be shared between them. + *

+ * Subsequent calls to start will be ignored if this event processor is already running. Calling start after {@link + * #stop()} is called will restart this event processor. + *

+ */ + public synchronized void start() { + if (!started.compareAndSet(false, true)) { + logger.info("Event processor is already running"); + return; + } + logger.info("Starting a new event processor instance with id {}", this.identifier); + scheduler = Schedulers.newElastic("EventProcessorAsyncClient"); + runner = scheduler.schedulePeriodically(this::run, INITIAL_DELAY, INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + } + + /** + * Stops processing events for all partitions owned by this event processor. All {@link PartitionProcessor} will be + * shutdown and any open resources will be closed. + *

+ * Subsequent calls to stop will be ignored if the event processor is not running. + *

+ */ + public synchronized void stop() { + if (!started.compareAndSet(true, false)) { + logger.info("Event processor has already stopped"); + return; + } + this.partitionConsumers.forEach((key, value) -> { + try { + logger.info("Closing event hub consumer for partition {}", key); + value.close(); + logger.info("Closed event hub consumer for partition {}", key); + partitionConsumers.remove(key); + } catch (IOException ex) { + logger.warning("Unable to close event hub consumer for partition {}", key); + } + }); + runner.dispose(); + scheduler.dispose(); + } + + /* + * A simple implementation of an event processor that: + * 1. Fetches all partition ids from Event Hub + * 2. Gets the current ownership information of all the partitions from PartitionManager + * 3. Claims ownership of any partition that doesn't have an owner yet. + * 4. Starts a new PartitionProcessor and receives events from each of the partitions this instance owns + */ + void run() { + /* This will run periodically to get new ownership details and close/open new + consumers when ownership of this instance has changed */ + final Flux ownershipFlux = partitionManager.listOwnership(eventHubName, consumerGroupName) + .cache(); + eventHubAsyncClient.getPartitionIds() + .flatMap(id -> getCandidatePartitions(ownershipFlux, id)) + .flatMap(this::claimOwnership) + .subscribeOn(Schedulers.newElastic("PartitionPumps")) + .subscribe(this::receiveEvents); + } + + /* + * Get the candidate partitions for claiming ownerships + */ + private Publisher getCandidatePartitions(Flux ownershipFlux, + String id) { + return ownershipFlux + .doOnNext(ownership -> logger + .info("Ownership flux: partitionId = {}; EH: partitionId = {}", ownership.partitionId(), id)) + // Ownership has never been claimed, so it won't exist in the list, so we provide a default. + .filter(ownership -> id.equals(ownership.partitionId())) + .single(new PartitionOwnership() + .partitionId(id) + .eventHubName(this.eventHubName) + .ownerId(this.identifier) + .consumerGroupName(this.consumerGroupName) + .ownerLevel(0L)); + } + + + /* + * Claim ownership of the given partition if it's available + */ + private Publisher claimOwnership(PartitionOwnership ownershipInfo) { + // Claim ownership if: + // it's not previously owned by any other instance, + // or if the last modified time is greater than ownership expiration time + // and previous owner is not this instance + if (ownershipInfo.lastModifiedTime() == null + || (System.currentTimeMillis() - ownershipInfo.lastModifiedTime() > OWNERSHIP_EXPIRATION_TIME_IN_MILLIS + && !ownershipInfo.ownerId().equals(this.identifier))) { + ownershipInfo.ownerId(this.identifier); // update instance id before claiming ownership + return partitionManager.claimOwnership(ownershipInfo).doOnComplete(() -> { + logger.info("Claimed ownership of partition {}", ownershipInfo.partitionId()); + }).doOnError(error -> { + logger.error("Unable to claim ownership of partition {}", ownershipInfo.partitionId(), error); + }); + } else { + return Flux.empty(); + } + } + + /* + * Creates a new consumer for given partition and starts receiving events for that partition. + */ + private void receiveEvents(PartitionOwnership partitionOwnership) { + EventHubConsumerOptions consumerOptions = new EventHubConsumerOptions(); + consumerOptions.ownerLevel(0L); + + EventPosition startFromEventPosition = partitionOwnership.sequenceNumber() == null ? this.initialEventPosition + : EventPosition.fromSequenceNumber(partitionOwnership.sequenceNumber(), false); + + EventHubConsumer consumer = this.eventHubAsyncClient + .createConsumer(this.consumerGroupName, partitionOwnership.partitionId(), startFromEventPosition, consumerOptions); + this.partitionConsumers.put(partitionOwnership.partitionId(), consumer); + + PartitionContext partitionContext = new PartitionContext(partitionOwnership.partitionId(), this.eventHubName, + this.consumerGroupName); + CheckpointManager checkpointManager = new CheckpointManager(this.identifier, partitionContext, + this.partitionManager, null); + logger.info("Subscribing to receive events from partition {}", partitionOwnership.partitionId()); + PartitionProcessor partitionProcessor = this.partitionProcessorFactory + .createPartitionProcessor(partitionContext, checkpointManager); + partitionProcessor.initialize(); + + consumer.receive().subscribe(eventData -> partitionProcessor.processEvent(eventData).subscribe(), + partitionProcessor::processError, + // Currently, there is no way to distinguish if the receiver was closed because + // another receiver with higher/same owner level(epoch) connected or because + // this event processor explicitly called close on this consumer. + () -> partitionProcessor.close(CloseReason.LOST_PARTITION_OWNERSHIP)); + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/InMemoryPartitionManager.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/InMemoryPartitionManager.java new file mode 100644 index 0000000000000..32f72a3507bdb --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/InMemoryPartitionManager.java @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * A simple in-memory implementation of a {@link PartitionManager}. + */ +public class InMemoryPartitionManager implements PartitionManager { + + private final Map partitionOwnershipMap = new ConcurrentHashMap<>(); + private final ClientLogger logger = new ClientLogger(InMemoryPartitionManager.class); + + @Override + public Flux listOwnership(String eventHubName, String consumerGroupName) { + logger.info("Listing partition ownership"); + return Flux.fromIterable(partitionOwnershipMap.values()); + } + + @Override + public Flux claimOwnership(PartitionOwnership... requestedPartitionOwnerships) { + return Flux.fromArray(requestedPartitionOwnerships) + .filter(partitionOwnership -> { + return !partitionOwnershipMap.containsKey(partitionOwnership.partitionId()) + || partitionOwnershipMap.get(partitionOwnership.partitionId()).eTag().equals(partitionOwnership.eTag()); + }) + .doOnNext(partitionOwnership -> logger + .info("Ownership of partition {} claimed by {}", partitionOwnership.partitionId(), + partitionOwnership.ownerId())) + .map(partitionOwnership -> { + partitionOwnership.eTag(UUID.randomUUID().toString()); // set new etag + partitionOwnership.lastModifiedTime(System.currentTimeMillis()); + partitionOwnershipMap.put(partitionOwnership.partitionId(), partitionOwnership); + return partitionOwnership; + }); + } + + @Override + public Mono updateCheckpoint(Checkpoint checkpoint) { + String updatedETag = UUID.randomUUID().toString(); + partitionOwnershipMap.get(checkpoint.partitionId()) + .sequenceNumber(checkpoint.sequenceNumber()) + .offset(checkpoint.offset()) + .eTag(updatedETag); + logger.info("Updated checkpoint for partition {} with sequence number {}", checkpoint.partitionId(), + checkpoint.sequenceNumber()); + return Mono.just(updatedETag); + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionManager.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionManager.java new file mode 100644 index 0000000000000..d7fb6945df956 --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionManager.java @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Partition manager stores and retrieves partition ownership information and checkpoint details for each partition. + */ +public interface PartitionManager { + + /** + * Called to get the list of all existing partition ownership from the underlying data store. Could return empty + * results if there are is no existing ownership information. + * + * @param eventHubName The Event Hub name to get ownership information. + * @param consumerGroupName The consumer group name. + * @return A flux of partition ownership details of all the partitions that have/had an owner. + */ + Flux listOwnership(String eventHubName, String consumerGroupName); + + /** + * Called to claim ownership of a list of partitions. This will return the list of partitions that were owned + * successfully. + * + * @param requestedPartitionOwnerships Array of partition ownerships this instance is requesting to own. + * @return A flux of partitions this instance successfully claimed ownership. + */ + Flux claimOwnership(PartitionOwnership... requestedPartitionOwnerships); + + /** + * Updates the checkpoint in the data store for a partition. + * + * @param checkpoint Checkpoint information containing sequence number and offset to be stored for this + * partition. + * @return The new ETag on successful update. + */ + Mono updateCheckpoint(Checkpoint checkpoint); +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessor.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessor.java new file mode 100644 index 0000000000000..5c9eb1ecc885a --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessor.java @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.PartitionContext; +import reactor.core.publisher.Mono; + +/** + * The interface defining all the operations that must be supported by a single partition processor. + *

+ * An instance of partition processor will process events only from a single partition. New instances of partition + * processors will be created through {@link PartitionProcessorFactory#createPartitionProcessor(PartitionContext, + * CheckpointManager) PartitionProcessorFactory}. + *

+ *

+ * Implementations of this interface also have the responsibility of updating checkpoints when appropriate. + *

+ */ +public interface PartitionProcessor { + + /** + * This method is called when this {@link EventProcessorAsyncClient} takes ownership of a new partition and before any + * events from this partition are received. + * + * @return a representation of the deferred computation of this call. + */ + Mono initialize(); + + /** + * This method is called when a new event is received for this partition. Processing of this event can happen + * asynchronously. + * + *

+ * This is also a good place to update checkpoints as appropriate. + * + * @param eventData {@link EventData} received from this partition. + * @return a representation of the deferred computation of this call. + */ + Mono processEvent(EventData eventData); + + /** + * This method is called when an error occurs while receiving events from Event Hub. An error also marks the end of + * event data stream. + * + * @param throwable The {@link Throwable} that caused this method to be called. + */ + void processError(Throwable throwable); + + /** + * This method is called before the partition processor is closed. A partition processor could be closed for various + * reasons and the reasons and implementations of this interface can take appropriate actions to cleanup before the + * partition processor is shutdown. + * + * @param closeReason The reason for closing this partition processor. + * @return a representation of the deferred computation of this call. + */ + Mono close(CloseReason closeReason); + +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessorFactory.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessorFactory.java new file mode 100644 index 0000000000000..5f55d74e5b4a6 --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionProcessorFactory.java @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.PartitionContext; + +/** + * A functional interface to create new instance(s) of {@link PartitionProcessor} when provided with a {@link + * PartitionContext} and {@link CheckpointManager}. + */ +@FunctionalInterface +public interface PartitionProcessorFactory { + + /** + * Factory method to create a new instance(s) of {@link PartitionProcessor} for a partition. + * + * @param partitionContext The partition context containing partition and Event Hub information. The new instance of + * {@link PartitionProcessor} created by this method will be responsible for processing events only for this + * partition. + * @param checkpointManager The checkpoint manager for updating checkpoints when events are processed by {@link + * PartitionProcessor}. + * @return A new instance of {@link PartitionProcessor} responsible for processing events from a single partition. + */ + PartitionProcessor createPartitionProcessor(PartitionContext partitionContext, + CheckpointManager checkpointManager); +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/Checkpoint.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/Checkpoint.java new file mode 100644 index 0000000000000..02e2b11f99ec0 --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/Checkpoint.java @@ -0,0 +1,164 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.models; + +import com.azure.core.implementation.annotation.Fluent; +import com.azure.messaging.eventhubs.PartitionProcessor; + +/** + * A model class to hold checkpoint data. A checkpoint represents the last successfully processed event by a {@link + * PartitionProcessor} for a particular partition of an Event Hub. + */ +@Fluent +public class Checkpoint { + + private String eventHubName; + private String consumerGroupName; + private String ownerId; + private String partitionId; + private String offset; + private long sequenceNumber; + private String eTag; + + /** + * Gets the Event Hub name associated with this checkpoint. + * + * @return The Event Hub name associated with this checkpoint. + */ + public String eventHubName() { + return eventHubName; + } + + /** + * Sets the Event Hub name associated with this checkpoint. + * + * @param eventHubName The Event Hub name associated with this checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint eventHubName(String eventHubName) { + this.eventHubName = eventHubName; + return this; + } + + /** + * Gets the consumer group name associated with this checkpoint. + * + * @return The consumer group name associated with this checkpoint. + */ + public String consumerGroupName() { + return consumerGroupName; + } + + /** + * Sets the consumer group name associated with this checkpoint. + * + * @param consumerGroupName The consumer group name associated with this checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint consumerGroupName(String consumerGroupName) { + this.consumerGroupName = consumerGroupName; + return this; + } + + /** + * Gets the partition id associated with this checkpoint. + * + * @return The partition id associated with this checkpoint. + */ + public String partitionId() { + return partitionId; + } + + /** + * Sets the partition id associated with this checkpoint. + * + * @param partitionId The partition id associated with this checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint partitionId(String partitionId) { + this.partitionId = partitionId; + return this; + } + + /** + * Sets the unique event processor identifier that created this checkpoint. + * + * @return The unique event processor identifier that created this checkpoint. + */ + public String ownerId() { + return ownerId; + } + + /** + * Gets the unique event processor identifier that created this checkpoint. + * + * @param ownerId The unique event processor identifier that created this checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint ownerId(String ownerId) { + this.ownerId = ownerId; + return this; + } + + /** + * Gets the offset of the last successfully processed event to store as checkpoint. + * + * @return The offset of the last successfully processed event to store as checkpoint. + */ + public String offset() { + return offset; + } + + /** + * Sets the offset of the last successfully processed event to store as checkpoint. + * + * @param offset The offset of the last successfully processed event to store as checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint offset(String offset) { + this.offset = offset; + return this; + } + + /** + * Gets the sequence number of the last successfully processed event to store as checkpoint. + * + * @return The sequence number of the last successfully processed event to store as checkpoint. + */ + public Long sequenceNumber() { + return sequenceNumber; + } + + /** + * Sets the sequence number of the last successfully processed event to store as checkpoint. + * + * @param sequenceNumber The sequence number of the last successfully processed event to store as checkpoint. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint sequenceNumber(Long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + return this; + } + + /** + * Gets the ETag that will be used to verify before updating the checkpoint. + * + * @return The unique identifier that was generated by last known successful update. + */ + public String eTag() { + return eTag; + } + + /** + * Sets the ETag that will be used by the checkpoint store to verify before updating the checkpoint. If the ETag in + * store does not match with this ETag, the checkpoint update is expected to fail. + * + * @param eTag The unique identifier that was generated by last known successful update. + * @return The updated {@link Checkpoint} instance. + */ + public Checkpoint eTag(String eTag) { + this.eTag = eTag; + return this; + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionContext.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionContext.java new file mode 100644 index 0000000000000..a83937928368d --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionContext.java @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.models; + +import com.azure.core.implementation.annotation.Immutable; +import com.azure.messaging.eventhubs.PartitionProcessor; +import java.util.Objects; + +/** + * A model class to contain partition information that will be provided to each instance of {@link PartitionProcessor}. + */ +@Immutable +public class PartitionContext { + + private String partitionId; + private String eventHubName; + private String consumerGroupName; + + /** + * Creates an immutable instance containing the information for processing a partition. + * + * @param partitionId The partition id. + * @param eventHubName The Event Hub name. + * @param consumerGroupName The consumer group name. + */ + public PartitionContext(String partitionId, String eventHubName, String consumerGroupName) { + this.partitionId = Objects.requireNonNull(partitionId, "partitionId cannot be null"); + this.eventHubName = Objects.requireNonNull(eventHubName, "eventHubName cannot be null"); + this.consumerGroupName = Objects.requireNonNull(consumerGroupName, "consumerGroupName cannot be null"); + } + + /** + * Gets the partition id associated to an instance of {@link PartitionProcessor}. + * + * @return The partition id associated to an instance of {@link PartitionProcessor}. + */ + public String partitionId() { + return partitionId; + } + + /** + * Gets the Event Hub name associated to an instance of {@link PartitionProcessor}. + * + * @return The Event Hub name associated to an instance of {@link PartitionProcessor}. + */ + public String eventHubName() { + return eventHubName; + } + + /** + * Gets the consumer group name associated to an instance of {@link PartitionProcessor}. + * + * @return The consumer group name associated to an instance of {@link PartitionProcessor}. + */ + public String consumerGroupName() { + return consumerGroupName; + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionOwnership.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionOwnership.java new file mode 100644 index 0000000000000..d2f54b75f375a --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionOwnership.java @@ -0,0 +1,217 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.models; + +import com.azure.core.implementation.annotation.Fluent; +import java.util.Objects; + +/** + * A model class to hold partition ownership information. + */ +@Fluent +public class PartitionOwnership { + + private String eventHubName; + private String consumerGroupName; + private String partitionId; + private String ownerId; + private long ownerLevel; + private String offset; + private Long sequenceNumber; + private Long lastModifiedTime; + private String eTag; + + /** + * Gets the Event Hub name associated with this ownership record. + * + * @return The Event Hub name associated with this ownership record. + */ + public String eventHubName() { + return eventHubName; + } + + /** + * Sets the Event Hub name associated with this ownership record. + * + * @param eventHubName The Event Hub name associated with this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership eventHubName(String eventHubName) { + this.eventHubName = Objects.requireNonNull(eventHubName, "eventHubName cannot be null"); + return this; + } + + /** + * Gets the consumer group name associated with this ownership record. + * + * @return The consumer group name associated with this ownership record. + */ + public String consumerGroupName() { + return consumerGroupName; + } + + /** + * Sets the consumer group name associated with this ownership record. + * + * @param consumerGroupName The consumer group name associated with this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership consumerGroupName(String consumerGroupName) { + this.consumerGroupName = Objects.requireNonNull(consumerGroupName, "consumerGroupName cannot be null"); + return this; + } + + /** + * Gets the partition id associated with this ownership record. + * + * @return The partition id associated with this ownership record. + */ + public String partitionId() { + return partitionId; + } + + /** + * Sets the partition id associated with this ownership record. + * + * @param partitionId The partition id associated with this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership partitionId(String partitionId) { + this.partitionId = Objects.requireNonNull(partitionId, "partitionId cannot be null"); + return this; + } + + /** + * Sets the unique event processor identifier as the owner of the partition id in this ownership record. + * + * @return The unique event processor identifier as the owner of the partition id in this ownership record. + */ + public String ownerId() { + return ownerId; + } + + /** + * Returns the unique event processor identifier that owns the partition id in this ownership record. + * + * @param ownerId The unique event processor identifier that owns the partition id in this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership ownerId(String ownerId) { + this.ownerId = Objects.requireNonNull(ownerId, "ownerId cannot be null"); + return this; + } + + /** + * Gets the owner level (aka epoch number) for the event processor identified by {@link #ownerId() this instance}. + * + * @return The owner level (aka epoch number) for the event processor identified by {@link #ownerId() this instance}. + */ + public long ownerLevel() { + return ownerLevel; + } + + /** + * Sets the owner level (aka epoch number) for the event processor identified by {@link #ownerId() this instance}. + * + * @param ownerLevel The owner level (aka epoch number) for the event processor identified by {@link #ownerId() this + * instance}. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership ownerLevel(long ownerLevel) { + this.ownerLevel = ownerLevel; + return this; + } + + /** + * Gets the offset that serves as checkpoint for the partition id in this ownership record. + * + * @return The offset that serves as checkpoint for the partition id in this ownership record. + */ + public String offset() { + return offset; + } + + /** + * Sets the offset that serves as checkpoint for the partition id in this ownership record. + * + * @param offset The offset that serves as checkpoint for the partition id in this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership offset(String offset) { + this.offset = offset; + return this; + } + + /** + * Gets the sequence number that serves as checkpoint for the partition id in this ownership record. + * + * @return The sequence number that serves as checkpoint for the partition id in this ownership record. + */ + public Long sequenceNumber() { + return sequenceNumber; + } + + /** + * Sets the sequence number that serves as checkpoint for the partition id in this ownership record. + * + * @param sequenceNumber The sequence number that serves as checkpoint for the partition id in this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership sequenceNumber(Long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + return this; + } + + /** + * Gets the last modified time of this ownership record as epoch millis. + * + * @return The last modified time of this ownership record as epoch millis. + */ + public Long lastModifiedTime() { + return lastModifiedTime; + } + + /** + * Sets the last modified time of this ownership record as epoch millis. + * + * @param lastModifiedTime The last modified time of this ownership record as epoch millis. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership lastModifiedTime(Long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + return this; + } + + /** + * Gets the ETag that was generated by the last known successful update to the partition ownership record. An ETag is + * a unique identifier * that is generated when a data record is successfully created/updated. The ETag is used to + * achieve optimistic concurrency in a distributed event processor setup. When multiple instances of event processor + * try to update the same partition ownership record, ETag is used to verify that the last values read by the instance + * requesting the update is still the latest ETag for this record. If the ETag in the store does not match the ETag in + * the update request, then the update is expected to fail as there was an update since the last time an event + * processor read this record. + * + * @return The eTag for this ownership record. + */ + public String eTag() { + return eTag; + } + + /** + * Sets the ETag with the last known successful update to partition ownership record. An ETag is a unique identifier + * that is generated when a data record is successfully created/updated. This ETag is used to achieve optimistic + * concurrency in a distributed event processor setup. When multiple instances of event processor try to update the + * same partition ownership record, ETag is used to verify that the last values read by the instance requesting the + * update is still the latest ETag for this record. If the ETag in the store does not match the ETag in the update + * request, then the update is expected to fail as there was an update since the last time an event processor read + * this record. + * + * @param eTag The eTag for this ownership record. + * @return The updated {@link PartitionOwnership} instance. + */ + public PartitionOwnership eTag(String eTag) { + this.eTag = eTag; + return this; + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorSample.java b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorSample.java new file mode 100644 index 0000000000000..2644a28cf718e --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorSample.java @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import java.util.concurrent.TimeUnit; + +/** + * Sample code to demonstrate how a customer might use {@link EventProcessorAsyncClient}. + */ +public class EventProcessorSample { + private static final String EH_CONNECTION_STRING = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}"; + + public static void main(String[] args) throws Exception { + EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder() + .connectionString(EH_CONNECTION_STRING) + .consumerGroupName(EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME) + .partitionProcessorFactory(LogPartitionProcessor::new) + .partitionManager(new InMemoryPartitionManager()); + + EventProcessorAsyncClient eventProcessorAsyncClient = eventHubClientBuilder.buildEventProcessorAsyncClient(); + System.out.println("Starting event processor"); + eventProcessorAsyncClient.start(); + eventProcessorAsyncClient.start(); // should be a no-op + + // do other stuff + Thread.sleep(TimeUnit.MINUTES.toMillis(1)); + + System.out.println("Stopping event processor"); + eventProcessorAsyncClient.stop(); + + Thread.sleep(TimeUnit.SECONDS.toMillis(40)); + System.out.println("Starting a new instance of event processor"); + eventProcessorAsyncClient = eventHubClientBuilder.buildEventProcessorAsyncClient(); + eventProcessorAsyncClient.start(); + // do other stuff + Thread.sleep(TimeUnit.MINUTES.toMillis(1)); + System.out.println("Stopping event processor"); + eventProcessorAsyncClient.stop(); + System.out.println("Exiting process"); + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/LogPartitionProcessor.java b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/LogPartitionProcessor.java new file mode 100644 index 0000000000000..85158154a174c --- /dev/null +++ b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/LogPartitionProcessor.java @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.models.PartitionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * A sample implementation of {@link PartitionProcessor}. This implementation logs the APIs that were called by {@link + * EventProcessorAsyncClient} while processing a partition. + */ +public class LogPartitionProcessor implements PartitionProcessor { + + private final Logger logger = LoggerFactory.getLogger(LogPartitionProcessor.class); + private final PartitionContext partitionContext; + private final CheckpointManager checkpointManager; + + public LogPartitionProcessor(PartitionContext partitionContext, + CheckpointManager checkpointManager) { + this.partitionContext = partitionContext; + this.checkpointManager = checkpointManager; + logger.info("Creating partition processor: Event Hub name = {}; consumer group name = {}; partition id = {}", + partitionContext.eventHubName(), partitionContext.consumerGroupName(), partitionContext.partitionId()); + } + + @Override + public Mono initialize() { + logger + .info("Initializing partition processor: Event Hub name = {}; consumer group name = {}; partition id = {}", + partitionContext.eventHubName(), partitionContext.consumerGroupName(), partitionContext.partitionId()); + return Mono.empty(); + } + + @Override + public Mono processEvent(EventData eventData) { + logger.info( + "Processing event: Event Hub name = {}; consumer group name = {}; partition id = {}; sequence number = {}", + partitionContext.eventHubName(), partitionContext.consumerGroupName(), partitionContext.partitionId(), + eventData.sequenceNumber()); + return this.checkpointManager.updateCheckpoint(eventData); + } + + @Override + public void processError(Throwable throwable) { + logger + .warn("Processing error: Event Hub name = {}; consumer group name = {}; partition id = {}; throwable = {}", + partitionContext.eventHubName(), partitionContext.consumerGroupName(), partitionContext.partitionId(), + throwable.getMessage()); + } + + @Override + public Mono close(CloseReason closeReason) { + logger.info( + "Closing partition processor: Event Hub name = {}; consumer group name = {}; partition id = {}; closeReason = {}", + partitionContext.eventHubName(), partitionContext.consumerGroupName(), partitionContext.partitionId(), + closeReason); + return Mono.empty(); + } +}