diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml
index ec012ce84e0d2..dbc497d34c773 100644
--- a/eng/spotbugs-aggregate-report/pom.xml
+++ b/eng/spotbugs-aggregate-report/pom.xml
@@ -15,9 +15,10 @@
1.0.0
- 1.2.05.0.1
- 2.0.0
+ 2.3.0
+ 2.5.0
+ 1.2.010.5.0
@@ -67,7 +68,7 @@
com.microsoft.azureazure-eventhubs-eph
- ${azure-eventhubs.version}
+ ${azure-eventhubs-eph.version}com.microsoft.azure
diff --git a/eventhubs/data-plane/ConsumingEvents.md b/eventhubs/data-plane/ConsumingEvents.md
index cff94be57c9eb..450247efcb710 100644
--- a/eventhubs/data-plane/ConsumingEvents.md
+++ b/eventhubs/data-plane/ConsumingEvents.md
@@ -23,10 +23,10 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:
```XML
-
- com.microsoft.azure
- azure-eventhubs
- 2.0.0
+
+ com.microsoft.azure
+ azure-eventhubs
+ 2.3.0
```
diff --git a/eventhubs/data-plane/PublishingEvents.md b/eventhubs/data-plane/PublishingEvents.md
index 5a9eb428e531b..c83007c0174b4 100644
--- a/eventhubs/data-plane/PublishingEvents.md
+++ b/eventhubs/data-plane/PublishingEvents.md
@@ -9,11 +9,11 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:
```XML
-
- com.microsoft.azure
- azure-eventhubs
- 2.0.0
-
+
+ com.microsoft.azure
+ azure-eventhubs
+ 2.3.0
+
```
For different types of build environments, the latest released JAR files can also be [explicitly obtained from the
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml
index 33d31c4e0ace8..649dc2322e693 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml
+++ b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml
@@ -4,22 +4,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ com.microsoft.azure
+ azure-eventhubs-clients
+ 2.3.0
+ ../pom.xml
+
+
4.0.0com.microsoft.azureazure-eventhubs-eph
- 2.2.0
+ 2.5.0Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layerhttps://github.com/Azure/azure-sdk-for-java
-
- com.microsoft.azure
- azure-eventhubs-clients
- 2.0.0
- ../pom.xml
-
-
azure-java-build-docs
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java
index 91aa4a89486a1..e44b6d2ccdf62 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java
@@ -34,6 +34,7 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.regex.Matcher;
@@ -331,10 +332,17 @@ public CompletableFuture> getAllLeases() {
(bp.getLeaseState() == LeaseState.LEASED)));
});
future = CompletableFuture.completedFuture(infos);
- } catch (URISyntaxException | StorageException e) {
+ } catch (URISyntaxException | StorageException | NoSuchElementException e) {
+ Throwable effective = e;
+ if (e instanceof NoSuchElementException) {
+ // If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
+ // Strip the misleading NoSuchElementException to provide a meaningful error for the user.
+ effective = e.getCause();
+ }
+
TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
- future = new CompletableFuture>();
- future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
+ future = new CompletableFuture<>();
+ future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
}
return future;
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java
index 3f52e162b227f..c9eaf61e266dc 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java
@@ -11,6 +11,7 @@
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
+import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -23,7 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
/***
- * The main class of event processor host.
+ * The main class of event processor host.
*/
public final class EventProcessorHost {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class);
@@ -268,15 +269,14 @@ public EventProcessorHost(
if (leaseManager == null) {
throw new IllegalArgumentException("Must provide an object which implements ILeaseManager");
}
- // executorService argument is allowed to be null, that is the indication to use an internal threadpool.
+ // executorService argument is allowed to be null, that is the indication to use an internal threadpool.
// Normally will not be null because we're using the AzureStorage implementation.
// If it is null, we're using user-supplied implementation. Establish generic defaults
// in case the user doesn't provide an options object.
this.partitionManagerOptions = new PartitionManagerOptions();
-
if (executorService != null) {
// User has supplied an ExecutorService, so use that.
this.weOwnExecutor = false;
@@ -560,7 +560,7 @@ public Thread newThread(Runnable r) {
}
private String getNamePrefix() {
- return String.format("[%s|%s|%s]-%s-",
+ return String.format(Locale.US, "[%s|%s|%s]-%s-",
this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement());
}
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java
index 06637660e5573..1b0bd2783dc09 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java
@@ -11,6 +11,9 @@
import java.util.function.Consumer;
import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/***
* Options affecting the behavior of the event processor host instance in general.
*/
@@ -25,6 +28,8 @@ public final class EventProcessorOptions {
return EventPosition.fromStartOfStream();
};
+ private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class);
+
public EventProcessorOptions() {
}
@@ -112,7 +117,7 @@ public int getPrefetchCount() {
/***
* Sets the prefetch count for the underlying event hub client.
*
- * The default is 500. This controls how many events are received in advance.
+ * The default is 300. This controls how many events are received in advance.
*
* @param prefetchCount The new prefetch count.
*/
@@ -210,7 +215,11 @@ void notifyOfException(String hostname, Exception exception, String action, Stri
// Capture handler so it doesn't get set to null between test and use
Consumer handler = this.exceptionNotificationHandler;
if (handler != null) {
- handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
+ try {
+ handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
+ } catch (Exception e) {
+ TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e);
+ }
}
}
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java
index a2238c2bca9a0..9b95d54571294 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java
@@ -14,7 +14,7 @@
import java.util.concurrent.ConcurrentHashMap;
/***
- * An ILeaseManager implementation based on an in-memory store.
+ * An ILeaseManager implementation based on an in-memory store.
*
* THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory
* only and not persisted in any way. In addition, it is only visible within the same process: multiple
@@ -46,11 +46,11 @@ public InMemoryLeaseManager() {
public void initialize(HostContext hostContext) {
this.hostContext = hostContext;
}
-
+
public void setLatency(long milliseconds) {
this.millisecondsLatency = milliseconds;
}
-
+
private void latency(String caller) {
if (this.millisecondsLatency > 0) {
try {
@@ -91,7 +91,7 @@ public CompletableFuture deleteLeaseStore() {
latency("deleteLeaseStore");
return CompletableFuture.completedFuture(null);
}
-
+
@Override
public CompletableFuture getLease(String partitionId) {
TRACE_LOGGER.debug(this.hostContext.withHost("getLease()"));
@@ -110,7 +110,7 @@ public CompletableFuture> getAllLeases() {
latency("getAllLeasesStateInfo");
return CompletableFuture.completedFuture(infos);
}
-
+
@Override
public CompletableFuture createAllLeasesIfNotExists(List partitionIds) {
ArrayList> createFutures = new ArrayList>();
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java
index b6ed86fe462dd..8f419a35e6e28 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java
@@ -283,16 +283,21 @@ private Void scan(boolean isFirst) {
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();
- (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
+ try {
+ (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) -> {
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
+ if ((e != null) && !(e instanceof ClosingException)) {
+ TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
+ }
+
onPartitionCheckCompleteTestHook();
// Schedule the next scan unless we are shutting down.
if (!this.getIsClosingOrClosed()) {
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds()
- : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
+ : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
if (isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
@@ -301,9 +306,19 @@ private Void scan(boolean isFirst) {
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
- TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
+ TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());
+ } catch (Exception e) {
+ TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e);
+ if (!this.getIsClosingOrClosed()) {
+ int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
+ synchronized (this.scanFutureSynchronizer) {
+ this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
+ }
+ TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
+ }
+ }
return null;
}
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java
index ef774f314eaea..d2e16d1e3c097 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java
@@ -135,7 +135,7 @@ private CompletableFuture openClientsRetryWrapper() {
// trace exceptions from the final attempt, or ReceiverDisconnectedException.
return retryResult.handleAsync((r, e) -> {
if (e == null) {
- // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
+ // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
// meaning it is safe to set the handler and start calling IEventProcessor.onEvents.
this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout());
} else {
diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java
index 429798d6520ea..bafa1af7b3815 100644
--- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java
+++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java
@@ -312,7 +312,7 @@ private CompletableFuture stealLeases(List stealThese) {
return allSteals;
}
-
+
private static class AcquisitionHolder {
private CompleteLease acquiredLease;
diff --git a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
index cb42b1e681787..45bbd35f6b582 100644
--- a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
+++ b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
@@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ com.microsoft.azure
+ azure-eventhubs-clients
+ 2.3.0
+ ../pom.xml
+
+
4.0.0com.microsoft.azureazure-eventhubs-extensions
@@ -12,13 +19,6 @@
Extensions built on Microsoft Azure Event Hubshttps://github.com/Azure/azure-sdk-for-java
-
- com.microsoft.azure
- azure-eventhubs-clients
- 2.0.0
- ../pom.xml
-
-
azure-java-build-docs
diff --git a/eventhubs/data-plane/azure-eventhubs/pom.xml b/eventhubs/data-plane/azure-eventhubs/pom.xml
index 93bc7d05635b4..c7a1e80f3f5a4 100644
--- a/eventhubs/data-plane/azure-eventhubs/pom.xml
+++ b/eventhubs/data-plane/azure-eventhubs/pom.xml
@@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ com.microsoft.azure
+ azure-eventhubs-clients
+ 2.3.0
+ ../pom.xml
+
+
4.0.0com.microsoft.azureazure-eventhubs
@@ -12,13 +19,6 @@
Libraries built on Microsoft Azure Event Hubshttps://github.com/Azure/azure-sdk-for-java
-
- com.microsoft.azure
- azure-eventhubs-clients
- 2.0.0
- ../pom.xml
-
-
azure-java-build-docs
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java
index c361076eb9e78..e2091f2977e63 100755
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java
@@ -147,6 +147,7 @@ static EventData create(final ByteBuffer buffer) {
* @see SystemProperties#getEnqueuedTime
*/
SystemProperties getSystemProperties();
+ void setSystemProperties(SystemProperties props);
class SystemProperties extends HashMap {
private static final long serialVersionUID = -2827050124966993723L;
@@ -155,6 +156,13 @@ public SystemProperties(final HashMap map) {
super(Collections.unmodifiableMap(map));
}
+ public SystemProperties(final long sequenceNumber, final Instant enqueuedTimeUtc, final String offset, final String partitionKey) {
+ this.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, sequenceNumber);
+ this.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, new Date(enqueuedTimeUtc.toEpochMilli()));
+ this.put(AmqpConstants.OFFSET_ANNOTATION_NAME, offset);
+ this.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, partitionKey);
+ }
+
public String getOffset() {
return this.getSystemProperty(AmqpConstants.OFFSET_ANNOTATION_NAME);
}
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java
index b48a67f11fc45..4edbcf56f7900 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java
@@ -25,7 +25,7 @@ public interface PartitionReceiver {
int MINIMUM_PREFETCH_COUNT = 1;
int DEFAULT_PREFETCH_COUNT = 500;
- int MAXIMUM_PREFETCH_COUNT = 2000;
+ int MAXIMUM_PREFETCH_COUNT = 8000;
long NULL_EPOCH = 0;
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
index 315927bf8e672..5dae17cc69f4b 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
@@ -79,7 +79,6 @@ public String getIdentifier() {
* EventHubs service will throw {@link QuotaExceededException} and will include this identifier.
* So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}.
*
- *
*
* @param value string to identify {@link PartitionReceiver}
*/
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java
index a81285f71aeb4..63a20902717f9 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java
@@ -40,6 +40,10 @@ final class ActiveClientTokenManager {
}
public void cancel() {
+ if (TRACE_LOGGER.isInfoEnabled()) {
+ TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - canceling ActiveClientLinkManager",
+ clientEntity.getClientId()));
+ }
synchronized (this.timerLock) {
this.timer.cancel(false);
@@ -61,9 +65,8 @@ public void run() {
} else {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(
- String.format(Locale.US,
- "clientEntity[%s] - closing ActiveClientLinkManager", clientEntity.getClientId()));
+ TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - closing ActiveClientLinkManager",
+ clientEntity.getClientId()));
}
}
}
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java
index eac5f39c9caa1..cad341a757210 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java
@@ -12,6 +12,7 @@
public final class AmqpConstants {
public static final String APACHE = "apache.org";
+ public static final String PROTON = "proton";
public static final String VENDOR = "com.microsoft";
public static final String AMQP_ANNOTATION_FORMAT = "amqp.annotation.%s >%s '%s'";
public static final String OFFSET_ANNOTATION_NAME = "x-opt-offset";
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java
index 991c3b1cf9b20..45fe9885baca9 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java
@@ -12,12 +12,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Locale;
+
public class BaseLinkHandler extends BaseHandler {
protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseLinkHandler.class);
+ private final String name;
private final AmqpLink underlyingEntity;
- public BaseLinkHandler(final AmqpLink amqpLink) {
+ public BaseLinkHandler(final AmqpLink amqpLink, final String name) {
+ this.name = name;
this.underlyingEntity = amqpLink;
}
@@ -27,10 +31,8 @@ public void onLinkLocalClose(Event event) {
final ErrorCondition condition = link.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s], errorCondition[%s], errorDescription[%s]",
- link.getName(),
- condition != null ? condition.getCondition() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
+ this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
}
closeSession(link, link.getCondition());
@@ -39,13 +41,11 @@ public void onLinkLocalClose(Event event) {
@Override
public void onLinkRemoteClose(Event event) {
final Link link = event.getLink();
- final ErrorCondition condition = link.getCondition();
+ final ErrorCondition condition = link.getRemoteCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s], errorCondition[%s], errorDescription[%s]",
- link.getName(),
- condition != null ? condition.getCondition() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
+ this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
}
handleRemoteLinkClosed(event);
@@ -57,10 +57,8 @@ public void onLinkRemoteDetach(Event event) {
final ErrorCondition condition = link.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s], errorCondition[%s], errorDescription[%s]",
- link.getName(),
- condition != null ? condition.getCondition() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteDetach clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
+ this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
}
handleRemoteLinkClosed(event);
@@ -68,16 +66,19 @@ public void onLinkRemoteDetach(Event event) {
public void processOnClose(Link link, ErrorCondition condition) {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format("processOnClose linkName[%s], errorCondition[%s], errorDescription[%s]",
- link.getName(),
- condition != null ? condition.getCondition() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
+ this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
}
this.underlyingEntity.onClose(condition);
}
public void processOnClose(Link link, Exception exception) {
+ if (TRACE_LOGGER.isInfoEnabled()) {
+ TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], exception[%s]",
+ this.name, link.getName(), exception != null ? exception.getMessage() : "n/a"));
+ }
+
this.underlyingEntity.onError(exception);
}
@@ -86,10 +87,8 @@ private void closeSession(Link link, ErrorCondition condition) {
if (session != null && session.getLocalState() != EndpointState.CLOSED) {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format("closeSession for linkName[%s], errorCondition[%s], errorDescription[%s]",
- link.getName(),
- condition != null ? condition.getCondition() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "closeSession for clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]",
+ this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a"));
}
session.setCondition(condition);
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java
index 27b870ddf4545..4dcfcd6c5c380 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java
@@ -17,11 +17,12 @@ final class CBSChannel {
CBSChannel(
final SessionProvider sessionProvider,
- final AmqpConnection connection) {
+ final AmqpConnection connection,
+ final String clientId) {
RequestResponseCloser closer = new RequestResponseCloser();
this.innerChannel = new FaultTolerantObject<>(
- new RequestResponseOpener(sessionProvider, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection),
+ new RequestResponseOpener(sessionProvider, clientId, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection),
closer);
closer.setInnerChannel(this.innerChannel);
}
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java
index 9d140815aef10..2fb6bee48ebe0 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java
@@ -19,6 +19,7 @@ public final class ClientConstants {
public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost");
public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked");
public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
+ public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io");
public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id");
public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024;
@@ -26,7 +27,7 @@ public final class ClientConstants {
public static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1);
public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0);
public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30);
- public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins
+ public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(5); // renew every 5 minutes, which expires 20 minutes
public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20);
public static final int DEFAULT_MAX_RETRY_COUNT = 10;
public static final boolean DEFAULT_IS_TRANSIENT = true;
@@ -36,7 +37,7 @@ public final class ClientConstants {
public static final String NO_RETRY = "NoRetry";
public static final String DEFAULT_RETRY = "Default";
public static final String PRODUCT_NAME = "MSJavaClient";
- public static final String CURRENT_JAVACLIENT_VERSION = "2.0.0";
+ public static final String CURRENT_JAVACLIENT_VERSION = "2.3.0";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
@@ -76,6 +77,9 @@ public final class ClientConstants {
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
public static final int MAX_RECEIVER_NAME_LENGTH = 64;
+ public static final String COMMUNICATION_EXCEPTION_GENERIC_MESSAGE = "A communication error has occurred. "
+ + "This may be due to an incorrect host name in your connection string or a problem with your network connection.";
+
/**
* This is a constant defined to represent the start of a partition stream in EventHub.
*/
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java
index 445f0dc66f5e0..1e8bf4c606858 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java
@@ -22,21 +22,21 @@
import java.util.Locale;
import java.util.Map;
-// ServiceBus <-> ProtonReactor interaction handles all
-// amqp_connection/transport related events from reactor
public class ConnectionHandler extends BaseHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
private final AmqpConnection amqpConnection;
+ private final String connectionId;
- protected ConnectionHandler(final AmqpConnection amqpConnection) {
+ protected ConnectionHandler(final AmqpConnection amqpConnection, final String connectionId) {
add(new Handshaker());
this.amqpConnection = amqpConnection;
+ this.connectionId = connectionId;
}
- static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) {
+ static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection, String connectionId) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) {
@@ -46,7 +46,7 @@ static ConnectionHandler create(TransportType transportType, AmqpConnection amqp
}
case AMQP:
default:
- return new ConnectionHandler(amqpConnection);
+ return new ConnectionHandler(amqpConnection, connectionId);
}
}
@@ -67,17 +67,18 @@ protected AmqpConnection getAmqpConnection() {
@Override
public void onConnectionInit(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s]", this.amqpConnection.getHostName()));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s], connectionId[%s]",
+ this.amqpConnection.getHostName(), this.connectionId));
}
final Connection connection = event.getConnection();
final String hostName = new StringBuilder(this.amqpConnection.getHostName())
.append(":")
- .append(String.valueOf(this.getProtocolPort()))
+ .append(this.getProtocolPort())
.toString();
connection.setHostname(hostName);
- connection.setContainer(StringUtil.getRandomString());
+ connection.setContainer(this.connectionId);
final Map connectionProperties = new HashMap<>();
connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME);
@@ -141,7 +142,8 @@ protected int getMaxFrameSize() {
@Override
public void onConnectionBound(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s]", this.amqpConnection.getHostName()));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s], connectionId[%s]",
+ this.amqpConnection.getHostName(), this.connectionId));
}
final Transport transport = event.getTransport();
@@ -154,8 +156,8 @@ public void onConnectionUnbound(Event event) {
final Connection connection = event.getConnection();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound: hostname[%s], state[%s], remoteState[%s]",
- connection.getHostname(), connection.getLocalState(), connection.getRemoteState()));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound hostname[%s], connectionId[%s], state[%s], remoteState[%s]",
+ connection.getHostname(), this.connectionId, connection.getLocalState(), connection.getRemoteState()));
}
// if failure happened while establishing transport - nothing to free up.
@@ -172,9 +174,8 @@ public void onTransportError(Event event) {
final ErrorCondition condition = transport.getCondition();
if (TRACE_LOGGER.isWarnEnabled()) {
- TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError: hostname[%s], error[%s]",
- connection != null ? connection.getHostname() : "n/a",
- condition != null ? condition.getDescription() : "n/a"));
+ TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError hostname[%s], connectionId[%s], error[%s]",
+ connection != null ? connection.getHostname() : "n/a", this.connectionId, condition != null ? condition.getDescription() : "n/a"));
}
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@@ -197,8 +198,8 @@ public void onTransportClosed(Event event) {
final ErrorCondition condition = transport.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed: hostname[%s], error[%s]",
- connection != null ? connection.getHostname() : "n/a", (condition != null ? condition.getDescription() : "n/a")));
+ TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s], connectionId[%s], error[%s]",
+ connection != null ? connection.getHostname() : "n/a", this.connectionId, (condition != null ? condition.getDescription() : "n/a")));
}
if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) {
@@ -214,10 +215,8 @@ public void onConnectionLocalOpen(Event event) {
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen: hostname[%s], errorCondition[%s], errorDescription[%s]",
- connection.getHostname(),
- error != null ? error.getCondition() : "n/a",
- error != null ? error.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
+ connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
}
}
@@ -225,8 +224,8 @@ public void onConnectionLocalOpen(Event event) {
public void onConnectionRemoteOpen(Event event) {
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen: hostname[%s], remoteContainer[%s]",
- event.getConnection().getHostname(), event.getConnection().getRemoteContainer()));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen hostname[%s], connectionId[%s], remoteContainer[%s]",
+ event.getConnection().getHostname(), this.connectionId, event.getConnection().getRemoteContainer()));
}
this.amqpConnection.onOpenComplete(null);
@@ -239,10 +238,8 @@ public void onConnectionLocalClose(Event event) {
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
- connection.getHostname(),
- error != null ? error.getCondition() : "n/a",
- error != null ? error.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
+ connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
}
if (connection.getRemoteState() == EndpointState.CLOSED) {
@@ -261,10 +258,8 @@ public void onConnectionRemoteClose(Event event) {
final ErrorCondition error = connection.getRemoteCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose: hostname[%s], errorCondition[%s], errorDescription[%s]",
- connection.getHostname(),
- error != null ? error.getCondition() : "n/a",
- error != null ? error.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
+ connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
}
this.amqpConnection.onConnectionError(error);
@@ -276,10 +271,8 @@ public void onConnectionFinal(Event event) {
final ErrorCondition error = connection.getCondition();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal: hostname[%s], errorCondition[%s], errorDescription[%s]",
- connection.getHostname(),
- error != null ? error.getCondition() : "n/a",
- error != null ? error.getDescription() : "n/a"));
+ TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]",
+ connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a"));
}
}
}
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java
index 2050bc5873d6c..9169eb10b4787 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java
@@ -16,14 +16,20 @@ public class CustomIOHandler extends IOHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CustomIOHandler.class);
+ private final String name;
+
+ public CustomIOHandler(final String name) {
+ this.name = name;
+ }
+
@Override
public void onTransportClosed(Event event) {
final Transport transport = event.getTransport();
final Connection connection = event.getConnection();
if (TRACE_LOGGER.isInfoEnabled()) {
- TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s]",
- (connection != null ? connection.getHostname() : "n/a")));
+ TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed name[%s], hostname[%s]",
+ this.name, (connection != null ? connection.getHostname() : "n/a")));
}
if (transport != null && connection != null && connection.getTransport() != null) {
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java
index 569792a4f5cd6..54207e7e18a87 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java
@@ -11,6 +11,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
final class EventDataBatchImpl implements EventDataBatch {
@@ -45,7 +46,7 @@ public boolean tryAdd(final EventData eventData) throws PayloadSizeExceededExcep
try {
size = getSize(eventDataImpl, events.isEmpty());
} catch (java.nio.BufferOverflowException exception) {
- throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024));
+ throw new PayloadSizeExceededException(String.format(Locale.US, "Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024));
}
if (this.currentSize + size > this.maxMessageSize) {
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java
index 12b4a98484044..b4294435f7e0f 100755
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java
@@ -165,6 +165,10 @@ public SystemProperties getSystemProperties() {
return this.systemProperties;
}
+ public void setSystemProperties(EventData.SystemProperties props) {
+ this.systemProperties = props;
+ }
+
// This is intended to be used while sending EventData - so EventData.SystemProperties will not be copied over to the AmqpMessage
Message toAmqpMessage() {
final Message amqpMessage = Proton.message();
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java
index dba9d1e999745..d8e1c3588e54e 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java
@@ -14,9 +14,6 @@
import java.util.Set;
import java.util.function.Consumer;
-/*
- * Internal utility class for EventData
- */
final class EventDataUtil {
@SuppressWarnings("serial")
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java
index 5f3d339e35d29..a352172bb1dc4 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java
@@ -50,15 +50,15 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl
private CompletableFuture createSender;
private EventHubClientImpl(final ConnectionStringBuilder connectionString, final ScheduledExecutorService executor) {
- super("EventHubClientImpl".concat(StringUtil.getRandomString()), null, executor);
+ super(StringUtil.getRandomString("EC"), null, executor);
this.eventHubName = connectionString.getEventHubName();
this.senderCreateSync = new Object();
}
public static CompletableFuture create(
- final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor)
- throws EventHubException, IOException {
+ final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor)
+ throws IOException {
final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr, executor);
@@ -232,7 +232,8 @@ private CompletableFuture createInternalSender() {
if (!this.isSenderCreateStarted) {
synchronized (this.senderCreateSync) {
if (!this.isSenderCreateStarted) {
- this.createSender = MessageSender.create(this.underlyingFactory, this.getClientId().concat("-InternalSender"), this.eventHubName)
+ String senderName = StringUtil.getRandomString("EC").concat(StringUtil.SEPARATOR + this.underlyingFactory.getClientId()).concat("-InternalSender");
+ this.createSender = MessageSender.create(this.underlyingFactory, senderName, this.eventHubName)
.thenAcceptAsync(new Consumer() {
public void accept(MessageSender a) {
EventHubClientImpl.this.sender = a;
@@ -314,7 +315,7 @@ public CompletableFuture apply(Map
private CompletableFuture addManagementToken(Map request) {
CompletableFuture retval = null;
try {
- String audience = String.format("amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName);
+ String audience = String.format(Locale.US, "amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName);
String token = this.underlyingFactory.getTokenProvider().getToken(audience, ClientConstants.TOKEN_REFRESH_INTERVAL);
request.put(ClientConstants.MANAGEMENT_SECURITY_TOKEN_KEY, token);
} catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) {
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java
index dbbb4ab5fe645..80f2cec4a44c9 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java
@@ -8,6 +8,7 @@
import org.slf4j.LoggerFactory;
import java.time.Instant;
+import java.util.Locale;
public final class EventPositionImpl implements EventPosition {
@@ -103,7 +104,7 @@ String getExpression() {
@Override
public String toString() {
- return String.format("offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]",
+ return String.format(Locale.US, "offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]",
this.offset, this.sequenceNumber,
(this.dateTime != null) ? this.dateTime.toEpochMilli() : "null",
this.inclusiveFlag);
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java
index da5a59b8e4a15..621c582d2af30 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java
@@ -4,6 +4,7 @@
package com.microsoft.azure.eventhubs.impl;
import com.microsoft.azure.eventhubs.AuthorizationFailedException;
+import com.microsoft.azure.eventhubs.CommunicationException;
import com.microsoft.azure.eventhubs.ErrorContext;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.IllegalEntityException;
@@ -62,6 +63,12 @@ static Exception toException(ErrorCondition errorCondition) {
return new EventHubException(true, new AmqpException(errorCondition));
} else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) {
return new QuotaExceededException(new AmqpException(errorCondition));
+ } else if (errorCondition.getCondition() == ClientConstants.PROTON_IO_ERROR) {
+ String message = ClientConstants.COMMUNICATION_EXCEPTION_GENERIC_MESSAGE;
+ if (errorCondition.getDescription() != null) {
+ message = errorCondition.getDescription();
+ }
+ return new CommunicationException(message, null);
}
return new EventHubException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.getDescription());
@@ -137,7 +144,7 @@ public static String toStackTraceString(final Throwable exception, final String
final Throwable innerException = exception.getCause();
if (innerException != null) {
- builder.append("Cause: " + innerException.getMessage());
+ builder.append("Cause: ").append(innerException.getMessage());
final StackTraceElement[] innerStackTraceElements = innerException.getStackTrace();
for (final StackTraceElement ste : innerStackTraceElements) {
builder.append(System.lineSeparator());
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java
index 4ae1530610034..01e9006950c0c 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java
@@ -47,7 +47,7 @@ public void runOnOpenedObject(
public void onEvent() {
if (!creatingNewInnerObject
&& (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED
- || innerObject.getState() == IOObject.IOObjectState.CLOSING)) {
+ || innerObject.getState() == IOObject.IOObjectState.CLOSING)) {
creatingNewInnerObject = true;
try {
@@ -59,6 +59,7 @@ public void onComplete(T result) {
for (OperationResult callback : openCallbacks) {
callback.onComplete(result);
}
+
openCallbacks.clear();
}
@@ -67,6 +68,7 @@ public void onError(Exception error) {
for (OperationResult callback : openCallbacks) {
callback.onError(error);
}
+
openCallbacks.clear();
}
});
diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java
index 7e74034d319e7..b6f577165d58b 100644
--- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java
+++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java
@@ -3,33 +3,34 @@
package com.microsoft.azure.eventhubs.impl;
+import com.microsoft.azure.eventhubs.OperationCancelledException;
+import com.microsoft.azure.eventhubs.TimeoutException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import java.io.IOException;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import com.microsoft.azure.eventhubs.OperationCancelledException;
-import com.microsoft.azure.eventhubs.TimeoutException;
-
final class ManagementChannel {
final FaultTolerantObject innerChannel;
- ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection) {
+ ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection, final String clientId) {
final RequestResponseCloser closer = new RequestResponseCloser();
this.innerChannel = new FaultTolerantObject<>(
- new RequestResponseOpener(
- sessionProvider,
- "mgmt-session",
- "mgmt",
- ClientConstants.MANAGEMENT_ADDRESS,
- connection),
- closer);
+ new RequestResponseOpener(
+ sessionProvider,
+ clientId,
+ "mgmt-session",
+ "mgmt",
+ ClientConstants.MANAGEMENT_ADDRESS,
+ connection),
+ closer);
closer.setInnerChannel(this.innerChannel);
}
@@ -45,22 +46,22 @@ public CompletableFuture
+
junitjunit
diff --git a/eventhubs/data-plane/readme.md b/eventhubs/data-plane/readme.md
index ea52c6191cd04..38634dbb29796 100644
--- a/eventhubs/data-plane/readme.md
+++ b/eventhubs/data-plane/readme.md
@@ -41,11 +41,11 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK
|azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs)
```XML
-
- com.microsoft.azure
- azure-eventhubs
- 2.0.0
-
+
+ com.microsoft.azure
+ azure-eventhubs
+ 2.3.0
+
```
#### Microsoft Azure EventHubs Java Event Processor Host library
@@ -58,12 +58,12 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries.
|azure-eventhubs-eph|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph)
```XML
-
- com.microsoft.azure
- azure-eventhubs-eph
- 2.2.0
-
-```
+
+ com.microsoft.azure
+ azure-eventhubs-eph
+ 2.5.0
+
+```
## How to provide feedback
diff --git a/pom.client.xml b/pom.client.xml
index 460cb031d8cb9..225f88e6694dc 100644
--- a/pom.client.xml
+++ b/pom.client.xml
@@ -109,7 +109,7 @@
1.103.1.110.31.0
- 1.1.0
+ 1.2.02.11.12.9.3-012.4.16-03
@@ -249,13 +249,13 @@
log4j-api${log4j-api.version}
-
+
com.microsoft.rest.v2client-runtime${client-runtime.version.v2}
-
+
org.slf4jslf4j-api
@@ -362,7 +362,7 @@
${cglib-nodep.version}test
-
+
org.slf4jslf4j-simple