Skip to content

Commit

Permalink
Code snippets and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
srnagar committed Aug 5, 2019
1 parent f060ed4 commit e1207cd
Show file tree
Hide file tree
Showing 6 changed files with 496 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubclientbuilder.retry-timeout-scheduler}
*
* <p><strong>Creating an {@link EventProcessorAsyncClient} instance using Event Hub instance connection
* string.</strong></p>
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorasyncclient.instantiation}
*
* @see EventHubAsyncClient
* @see EventProcessorAsyncClient
*/
@ServiceClientBuilder(serviceClients = EventHubAsyncClient.class)
@ServiceClientBuilder(serviceClients = {EventHubAsyncClient.class, EventProcessorAsyncClient.class})
public class EventHubClientBuilder {

private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
Expand All @@ -28,6 +30,14 @@
* set up to consume events from the same Event Hub + consumer group and to balance the workload across different
* instances and track progress when events are processed.
* </p>
*
* <p><strong>Creating an {@link EventProcessorAsyncClient} instance using Event Hub instance connection
* string.</strong></p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorasyncclient.instantiation}
*
* @see EventHubAsyncClient
* @see EventHubClientBuilder
*/
public class EventProcessorAsyncClient {

Expand All @@ -44,9 +54,9 @@ public class EventProcessorAsyncClient {
private final String identifier;
private final Map<String, EventHubConsumer> partitionConsumers = new ConcurrentHashMap<>();
private final String eventHubName;
private final AtomicBoolean started = new AtomicBoolean(false);
private Disposable runner;
private Scheduler scheduler;
private AtomicBoolean started = new AtomicBoolean(false);

/**
* Package-private constructor. Use {@link EventHubClientBuilder} to create an instance.
Expand Down Expand Up @@ -95,6 +105,9 @@ public String identifier() {
* Subsequent calls to start will be ignored if this event processor is already running. Calling start after {@link
* #stop()} is called will restart this event processor.
* </p>
*
* <p><strong>Starting the processor to consume events from all partitions.</strong></p>
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorasyncclient.startstop}
*/
public synchronized void start() {
if (!started.compareAndSet(false, true)) {
Expand All @@ -112,6 +125,9 @@ public synchronized void start() {
* <p>
* Subsequent calls to stop will be ignored if the event processor is not running.
* </p>
*
* <p><strong>Stopping the processor.</strong></p>
* {@codesnippet com.azure.messaging.eventhubs.eventprocessorasyncclient.startstop}
*/
public synchronized void stop() {
if (!started.compareAndSet(true, false)) {
Expand Down Expand Up @@ -139,16 +155,16 @@ public synchronized void stop() {
* 3. Claims ownership of any partition that doesn't have an owner yet.
* 4. Starts a new PartitionProcessor and receives events from each of the partitions this instance owns
*/
void run() {
private void run() {
/* This will run periodically to get new ownership details and close/open new
consumers when ownership of this instance has changed */
final Flux<PartitionOwnership> ownershipFlux = partitionManager.listOwnership(eventHubName, consumerGroupName)
.cache();
eventHubAsyncClient.getPartitionIds()
.flatMap(id -> getCandidatePartitions(ownershipFlux, id))
.flatMap(this::claimOwnership)
.subscribeOn(Schedulers.newElastic("PartitionPumps"))
.subscribe(this::receiveEvents);
.subscribe(this::receiveEvents, ex -> logger.warning("Failed to receive events {}", ex.getMessage()),
() -> logger.info("Completed starting partition pumps for new partitions owned"));
}

/*
Expand All @@ -157,8 +173,6 @@ void run() {
private Publisher<? extends PartitionOwnership> getCandidatePartitions(Flux<PartitionOwnership> ownershipFlux,
String id) {
return ownershipFlux
.doOnNext(ownership -> logger
.info("Ownership flux: partitionId = {}; EH: partitionId = {}", ownership.partitionId(), id))
// Ownership has never been claimed, so it won't exist in the list, so we provide a default.
.filter(ownership -> id.equals(ownership.partitionId()))
.single(new PartitionOwnership()
Expand All @@ -180,7 +194,7 @@ private Publisher<? extends PartitionOwnership> claimOwnership(PartitionOwnershi
// and previous owner is not this instance
if (ownershipInfo.lastModifiedTime() == null
|| (System.currentTimeMillis() - ownershipInfo.lastModifiedTime() > OWNERSHIP_EXPIRATION_TIME_IN_MILLIS
&& !ownershipInfo.ownerId().equals(this.identifier))) {
&& !ownershipInfo.ownerId().equals(this.identifier))) {
ownershipInfo.ownerId(this.identifier); // update instance id before claiming ownership
return partitionManager.claimOwnership(ownershipInfo).doOnComplete(() -> {
logger.info("Claimed ownership of partition {}", ownershipInfo.partitionId());
Expand All @@ -203,7 +217,8 @@ private void receiveEvents(PartitionOwnership partitionOwnership) {
: EventPosition.fromSequenceNumber(partitionOwnership.sequenceNumber(), false);

EventHubConsumer consumer = this.eventHubAsyncClient
.createConsumer(this.consumerGroupName, partitionOwnership.partitionId(), startFromEventPosition, consumerOptions);
.createConsumer(this.consumerGroupName, partitionOwnership.partitionId(), startFromEventPosition,
consumerOptions);
this.partitionConsumers.put(partitionOwnership.partitionId(), consumer);

PartitionContext partitionContext = new PartitionContext(partitionOwnership.partitionId(), this.eventHubName,
Expand All @@ -213,13 +228,15 @@ private void receiveEvents(PartitionOwnership partitionOwnership) {
logger.info("Subscribing to receive events from partition {}", partitionOwnership.partitionId());
PartitionProcessor partitionProcessor = this.partitionProcessorFactory
.createPartitionProcessor(partitionContext, checkpointManager);
partitionProcessor.initialize();

consumer.receive().subscribe(eventData -> partitionProcessor.processEvent(eventData).subscribe(),
partitionProcessor::processError,
// Currently, there is no way to distinguish if the receiver was closed because
// another receiver with higher/same owner level(epoch) connected or because
// this event processor explicitly called close on this consumer.
() -> partitionProcessor.close(CloseReason.LOST_PARTITION_OWNERSHIP));
partitionProcessor.initialize().subscribe();

consumer.receive().subscribeOn(Schedulers.newElastic("PartitionPump"))
.subscribe(eventData -> partitionProcessor.processEvent(eventData).subscribe(unused -> { // do nothing
}, partitionProcessor::processError),
partitionProcessor::processError,
// Currently, there is no way to distinguish if the receiver was closed because
// another receiver with higher/same owner level(epoch) connected or because
// this event processor explicitly called close on this consumer.
() -> partitionProcessor.close(CloseReason.LOST_PARTITION_OWNERSHIP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,52 @@ public class InMemoryPartitionManager implements PartitionManager {
private final Map<String, PartitionOwnership> partitionOwnershipMap = new ConcurrentHashMap<>();
private final ClientLogger logger = new ClientLogger(InMemoryPartitionManager.class);

/**
* {@inheritDoc}
*
* @param eventHubName The Event Hub name to get ownership information.
* @param consumerGroupName The consumer group name.
* @return A {@link Flux} of partition ownership information.
*/
@Override
public Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName) {
logger.info("Listing partition ownership");
return Flux.fromIterable(partitionOwnershipMap.values());
}

/**
* Returns a {@link Flux} of partition ownership details for successfully claimed partitions. If a partition is
* already claimed by an instance or if the eTag in the request doesn't match the previously stored eTag, then
* ownership claim is denied.
*
* @param requestedPartitionOwnerships Array of partition ownerships this instance is requesting to own.
* @return Successfully claimed partition ownerships.
*/
@Override
public Flux<PartitionOwnership> claimOwnership(PartitionOwnership... requestedPartitionOwnerships) {
return Flux.fromArray(requestedPartitionOwnerships)
.filter(partitionOwnership -> {
return !partitionOwnershipMap.containsKey(partitionOwnership.partitionId())
|| partitionOwnershipMap.get(partitionOwnership.partitionId()).eTag().equals(partitionOwnership.eTag());
|| partitionOwnershipMap.get(partitionOwnership.partitionId()).eTag()
.equals(partitionOwnership.eTag());
})
.doOnNext(partitionOwnership -> logger
.info("Ownership of partition {} claimed by {}", partitionOwnership.partitionId(),
partitionOwnership.ownerId()))
.map(partitionOwnership -> {
partitionOwnership.eTag(UUID.randomUUID().toString()); // set new etag
partitionOwnership.lastModifiedTime(System.currentTimeMillis());
partitionOwnership.eTag(UUID.randomUUID().toString())
.lastModifiedTime(System.currentTimeMillis());
partitionOwnershipMap.put(partitionOwnership.partitionId(), partitionOwnership);
return partitionOwnership;
});
}

/**
* Updates the checkpoint with new sequence number and offset. A new eTag is generated when the update is successful.
*
* @param checkpoint Checkpoint information containing sequence number and offset to be stored for this partition.
* @return A new eTag associated with the updated checkpoint.
*/
@Override
public Mono<String> updateCheckpoint(Checkpoint checkpoint) {
String updatedETag = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.messaging.eventhubs.models.PartitionContext;
import reactor.core.publisher.Mono;

/**
* Code snippets for {@link EventProcessorAsyncClient}
*/
public final class EventProcessorJavaDocCodeSamples {

/**
* Code snippet for showing how to create a new instance of {@link EventProcessorAsyncClient}.
*
* @return An instance of {@link EventProcessorAsyncClient}.
*/
public EventProcessorAsyncClient createInstance() {
// BEGIN: com.azure.messaging.eventhubs.eventprocessorasyncclient.instantiation
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};"
+ "SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";
EventProcessorAsyncClient eventProcessorAsyncClient = new EventHubClientBuilder()
.connectionString(connectionString)
.partitionProcessorFactory((PartitionProcessorImpl::new))
.consumerGroupName("consumer-group")
.buildEventProcessorAsyncClient();
// END: com.azure.messaging.eventhubs.eventprocessorasyncclient.instantiation
return eventProcessorAsyncClient;
}

/**
* Code snippet for showing how to start and stop an {@link EventProcessorAsyncClient}.
*/
public void startStopSample() {
EventProcessorAsyncClient eventProcessorAsyncClient = createInstance();
// BEGIN: com.azure.messaging.eventhubs.eventprocessorasyncclient.startstop
eventProcessorAsyncClient.start();
// do other stuff while the event processor is running
eventProcessorAsyncClient.stop();
// END: com.azure.messaging.eventhubs.eventprocessorasyncclient.startstop
}

/**
* No-op partition processor used in code snippet to demo creating an instance of {@link EventProcessorAsyncClient}.
* This class will not be visible in the code snippet.
*/
private static class PartitionProcessorImpl implements PartitionProcessor {

PartitionContext partitionContext;
CheckpointManager checkpointManager;

/**
* Creates new instance.
*
* @param partitionContext The partition context for this partition processor.
* @param checkpointManager The checkpoint manager for this partition processor.
*/
private PartitionProcessorImpl(PartitionContext partitionContext, CheckpointManager checkpointManager) {
this.partitionContext = partitionContext;
this.checkpointManager = checkpointManager;
}

/**
* {@inheritDoc}
*
* @return a representation of deferred initialization.
*/
@Override
public Mono<Void> initialize() {
return Mono.empty();
}

/**
* {@inheritDoc}
*
* @return a representation of deferred initialization.
*/
@Override
public Mono<Void> processEvent(EventData eventData) {
return Mono.empty();
}

/**
* {@inheritDoc}
*
* @param throwable The {@link Throwable} that caused this method to be called.
*/
@Override
public void processError(Throwable throwable) {
System.out.println("Error while processing events");
}

/**
* {@inheritDoc}
*
* @param closeReason {@link CloseReason} for closing this partition processor.
* @return a representation of deferred initialization.
*/
@Override
public Mono<Void> close(CloseReason closeReason) {
return Mono.empty();
}
}

}
Loading

0 comments on commit e1207cd

Please sign in to comment.