Skip to content

Commit

Permalink
Release client 2.0.0 and EPH 2.2.0 (#416)
Browse files Browse the repository at this point in the history
* Update Apache Proton-J dependency (0.29.0 --> 0.31.0) (#407)

* PartitionReceiver - add a method that provides an EventPosition which corresponds to an EventData returned last by the receiver (#408)

* Support IsPartitionEmpty property for PartitionRuntimeInformation (#399)

* Move setPrefetchCount API to the ReceiverOptions class from the PartitionReceiver and update the settings of Default & Max Prefetch count (#410)

This pull request includes two major changes related to Prefetch API.

1) Move setPrefetchCount API to the ReceiverOptions class so that prefetch value specified by a user can be used instead of using default value when communicating to the service during link open and initializing a receiver. This change also addresses the receiver stuck issue caused by setPrefetchAPI in a race condition.

2) Change the default value and set the upper bound of the prefetch count. Note that prefetch count should be greater than or equal to maxEventCount which can be set when either a) calling receive() API or b) implementing the getMaxEventCount API of the SessionReceiverHandler interface.

* Fixes several issues in the reactor related components (#411)

This pull request contains the following changes.

1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete.
2) Fix the session open timeout issue which can result in NPE in proton-J engine.
3) Make session open timeout configurable and use the value of OperationTimeout.
4) Update the message of exceptions and include an entity name in the exception message.
5) API change - use ScheduledExecutorService.
6) Improve tracing.

* Implement comparable on EventData (#395)

* Update receive/send link creation logic and improve tracing (#414)

* Prep for releasing client 2.0.0 and EPH 2.2.0 (#415)
  • Loading branch information
sjkwak authored Jan 4, 2019
1 parent f5b779d commit e68c319
Show file tree
Hide file tree
Showing 61 changed files with 1,119 additions and 620 deletions.
6 changes: 3 additions & 3 deletions ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.3.0</version>
<version>2.0.0</version>
</dependency>
```

Expand All @@ -42,11 +42,11 @@ For a simple event consumer, you'll need to import the *com.microsoft.azure.even
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

The receiver code creates an *EventHubClient* from a given connecting string
Expand Down
4 changes: 2 additions & 2 deletions Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ which is quite simple in a Maven build [as we explain in the guide](PublishingEv
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

Using an Event Hub connection string, which holds all required connection information, including an authorization key or token,
Expand Down
4 changes: 2 additions & 2 deletions PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.3.0</version>
<version>2.0.0</version>
</dependency>
```

Expand All @@ -34,7 +34,7 @@ So, `EventHubClient` requires an instance of `Executor`, where all these tasks a


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

Using an Event Hub connection string, which holds all required connection information including an authorization key or token
Expand Down
4 changes: 2 additions & 2 deletions azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>1.3.0</version>
<version>2.0.0</version>
</parent>

<version>2.1.0</version>
<version>2.2.0</version>

<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;

import java.time.Duration;
import java.util.Locale;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -112,11 +114,21 @@ public int getPrefetchCount() {
/***
* Sets the prefetch count for the underlying event hub client.
*
* The default is 300. This controls how many events are received in advance.
* The default is 500. This controls how many events are received in advance.
*
* @param prefetchCount The new prefetch count.
*/
public void setPrefetchCount(int prefetchCount) {
if (prefetchCount < PartitionReceiver.MINIMUM_PREFETCH_COUNT) {
throw new IllegalArgumentException(String.format(Locale.US,
"PrefetchCount has to be above %s", PartitionReceiver.MINIMUM_PREFETCH_COUNT));
}

if (prefetchCount > PartitionReceiver.MAXIMUM_PREFETCH_COUNT) {
throw new IllegalArgumentException(String.format(Locale.US,
"PrefetchCount has to be below %s", PartitionReceiver.MAXIMUM_PREFETCH_COUNT));
}

this.prefetchCount = prefetchCount;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PartitionManager extends Closable {
private ScheduledFuture<?> scanFuture = null;

PartitionManager(HostContext hostContext) {
super(null);
super(null);
this.hostContext = hostContext;
}

Expand All @@ -41,17 +41,17 @@ CompletableFuture<Void> cachePartitionIds() {
// EventHubException or IOException, in addition to whatever failures may occur when the result of
// the CompletableFuture is evaluated.
try {
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();

// Stage 0A: get EventHubClient for the event hub
retval = EventHubClient.create(this.hostContext.getEventHubConnectionString(), this.hostContext.getRetryPolicy(), this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 1: use the client to get runtime info for the event hub
.thenComposeAsync((ehClient) -> ehClient.getRuntimeInformation(), this.hostContext.getExecutor())
// Stage 2: extract the partition ids from the runtime info or throw on null (timeout)
Expand All @@ -71,7 +71,7 @@ CompletableFuture<Void> cachePartitionIds() {
// Stage 3: RUN REGARDLESS OF EXCEPTIONS -- if there was an error, wrap it in IllegalEntityException and throw
.handleAsync((empty, e) ->
{
cleanupFuture.complete(null); // trigger client cleanup
cleanupFuture.complete(null); // trigger client cleanup
if (e != null) {
Throwable notifyWith = e;
if (e instanceof CompletionException) {
Expand Down Expand Up @@ -104,8 +104,8 @@ void onPartitionCheckCompleteTestHook() {
}

CompletableFuture<Void> stopPartitions() {
setClosing();
setClosing();

// If the lease scanner is between runs, cancel so it doesn't run again.
synchronized (this.scanFutureSynchronizer) {
if (this.scanFuture != null) {
Expand All @@ -119,20 +119,20 @@ CompletableFuture<Void> stopPartitions() {
if (this.pumpManager != null) {
TRACE_LOGGER.info(this.hostContext.withHost("Shutting down all pumps"));
stopping = this.pumpManager.removeAllPumps(CloseReason.Shutdown)
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);

}
}
}, this.hostContext.getExecutor());
}
}
}, this.hostContext.getExecutor());
}
// else no pumps to shut down

stopping = stopping.whenCompleteAsync((empty, e) -> {
TRACE_LOGGER.info(this.hostContext.withHost("Partition manager exiting"));
setClosed();
Expand Down Expand Up @@ -287,14 +287,14 @@ private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callab
// Return Void so it can be called from a lambda.
// throwOnFailure is true
private Void scan(boolean isFirst) {
TRACE_LOGGER.info(this.hostContext.withHost("Starting lease scan"));
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) ->
{
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
Expand All @@ -305,11 +305,11 @@ private Void scan(boolean isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
class PartitionPump extends Closable implements PartitionReceiveHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionPump.class);
protected final HostContext hostContext;
protected final CompleteLease lease; // protected for testability
final private CompletableFuture<Void> shutdownTriggerFuture;
final private CompletableFuture<Void> shutdownFinishedFuture;
private final Object processingSynchronizer;
protected final CompleteLease lease; // protected for testability
private final Consumer<String> pumpManagerCallback;
private EventHubClient eventHubClient = null;
private PartitionReceiver partitionReceiver = null;
Expand All @@ -35,23 +35,28 @@ class PartitionPump extends Closable implements PartitionReceiveHandler {
private ScheduledFuture<?> leaseRenewerFuture = null;

PartitionPump(HostContext hostContext, CompleteLease lease, Closable parent, Consumer<String> pumpManagerCallback) {
super(parent);
super(parent);

this.hostContext = hostContext;
this.lease = lease;
this.pumpManagerCallback = pumpManagerCallback;
this.processingSynchronizer = new Object();

this.partitionContext = new PartitionContext(this.hostContext, this.lease.getPartitionId());
this.partitionContext.setLease(this.lease);

// Set up the shutdown futures. The shutdown process can be triggered just by completing this.shutdownFuture.
this.shutdownTriggerFuture = new CompletableFuture<Void>();
this.shutdownFinishedFuture = this.shutdownTriggerFuture
.handleAsync((r, e) -> { this.pumpManagerCallback.accept(this.lease.getPartitionId()); return cancelPendingOperations(); }, this.hostContext.getExecutor())
.handleAsync((r, e) -> {
this.pumpManagerCallback.accept(this.lease.getPartitionId());
return cancelPendingOperations();
}, this.hostContext.getExecutor())
.thenComposeAsync((empty) -> cleanUpAll(this.shutdownReason), this.hostContext.getExecutor())
.thenComposeAsync((empty) -> releaseLeaseOnShutdown(), this.hostContext.getExecutor())
.whenCompleteAsync((empty, e) -> { setClosed(); }, this.hostContext.getExecutor());
.whenCompleteAsync((empty, e) -> {
setClosed();
}, this.hostContext.getExecutor());
}

// The CompletableFuture returned by startPump remains uncompleted as long as the pump is running.
Expand Down Expand Up @@ -157,11 +162,11 @@ private CompletableFuture<Void> openClientsRetryWrapper() {
}

protected void scheduleLeaseRenewer() {
if (!getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> leaseRenewer(), seconds, TimeUnit.SECONDS);
TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
}
if (!getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> leaseRenewer(), seconds, TimeUnit.SECONDS);
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
}
}

private CompletableFuture<Boolean> openClients() {
Expand Down Expand Up @@ -198,8 +203,6 @@ private CompletableFuture<Boolean> openClients() {
// Stage 3: set up other receiver options, create receiver if initial offset is valid
.thenComposeAsync((startAt) ->
{
ReceiverOptions options = new ReceiverOptions();
options.setReceiverRuntimeMetricEnabled(this.hostContext.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
long epoch = this.lease.getEpoch();

TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext,
Expand All @@ -208,10 +211,15 @@ private CompletableFuture<Boolean> openClients() {
CompletableFuture<PartitionReceiver> receiverFuture = null;

try {
ReceiverOptions options = new ReceiverOptions();
options.setReceiverRuntimeMetricEnabled(this.hostContext.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
options.setPrefetchCount(this.hostContext.getEventProcessorOptions().getPrefetchCount());

receiverFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(),
this.partitionContext.getPartitionId(), startAt, epoch, options);
this.internalOperationFuture = receiverFuture;
} catch (EventHubException e) {
TRACE_LOGGER.error(this.hostContext.withHostAndPartition(this.partitionContext, "Opening EH receiver failed with an error "), e);
receiverFuture = new CompletableFuture<PartitionReceiver>();
receiverFuture.completeExceptionally(e);
}
Expand All @@ -238,15 +246,6 @@ private CompletableFuture<Boolean> openClients() {
// Stage 5: on success, set up the receiver
.thenApplyAsync((receiver) ->
{
if (this.hostContext.getEventProcessorOptions().getPrefetchCount() > PartitionReceiver.DEFAULT_PREFETCH_COUNT)
{
try {
this.partitionReceiver.setPrefetchCount(this.hostContext.getEventProcessorOptions().getPrefetchCount());
} catch (Exception e1) {
TRACE_LOGGER.error(this.hostContext.withHostAndPartition(this.partitionContext, "PartitionReceiver failed setting prefetch count"), e1);
throw new CompletionException(e1);
}
}
this.partitionReceiver.setReceiveTimeout(this.hostContext.getEventProcessorOptions().getReceiveTimeOut());

TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext,
Expand Down Expand Up @@ -386,8 +385,8 @@ private CompletableFuture<Void> releaseLeaseOnShutdown() // swallows all excepti
}

protected void internalShutdown(CloseReason reason, Throwable e) {
setClosing();
setClosing();

this.shutdownReason = reason;
if (e == null) {
this.shutdownTriggerFuture.complete(null);
Expand All @@ -412,7 +411,7 @@ private void leaseRenewer() {
return;
}
if (getIsClosingOrClosed()) {
return;
return;
}

// Stage 0: renew the lease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@

import org.junit.Assume;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

final class TestUtilities {
static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);

static void skipIfAppveyor() {
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
}

static String getStorageConnectionString() {
TestUtilities.skipIfAppveyor();
TestUtilities.skipIfAppveyor();

String retval = System.getenv("EPHTESTSTORAGE");

// if EPHTESTSTORAGE is not set - we cannot run integration tests
if (retval == null) {
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
}
Assume.assumeTrue(retval != null);

return ((retval != null) ? retval : "");
}

static Boolean isRunningOnAzure() {
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
}
}
Loading

0 comments on commit e68c319

Please sign in to comment.