From a1fa17592ede53237d3032f2d81a80a9d8350c56 Mon Sep 17 00:00:00 2001
From: Srikanta <51379715+srnagar@users.noreply.github.com>
Date: Fri, 13 Nov 2020 11:13:16 -0800
Subject: [PATCH] Preparing Event Hubs 5.4.0-beta.1 release (#17502)
---
eng/jacoco-test-coverage/pom.xml | 2 +-
eng/versioning/version_client.txt | 4 +-
sdk/core/azure-core-amqp-experimental/pom.xml | 2 +-
sdk/core/azure-core-amqp/CHANGELOG.md | 7 +-
sdk/core/azure-core-amqp/pom.xml | 2 +-
.../azure/core/amqp/AmqpMessageConstant.java | 18 +-
.../amqp/exception/AmqpErrorCondition.java | 12 +-
.../amqp/implementation/AmqpSendLink.java | 8 +
.../amqp/implementation/ExceptionUtil.java | 2 +
.../amqp/implementation/ReactorSender.java | 10 +
.../amqp/implementation/ReactorSession.java | 19 +-
.../CHANGELOG.md | 4 +-
.../README.md | 2 +-
.../azure-messaging-eventhubs/CHANGELOG.md | 10 +-
.../azure-messaging-eventhubs/README.md | 2 +-
.../azure-messaging-eventhubs/pom.xml | 7 +-
.../azure/messaging/eventhubs/EventData.java | 178 ++++++-
.../messaging/eventhubs/EventDataBatch.java | 83 +++-
.../eventhubs/EventHubAsyncClient.java | 25 +-
.../eventhubs/EventHubClientBuilder.java | 77 ++-
.../EventHubConsumerAsyncClient.java | 13 +-
.../eventhubs/EventHubMessageSerializer.java | 24 +-
.../EventHubPartitionAsyncConsumer.java | 13 +-
.../EventHubProducerAsyncClient.java | 263 +++++++++--
.../eventhubs/EventHubProducerClient.java | 11 +
.../EventProcessorClientBuilder.java | 13 +
.../PartitionPublishingProperties.java | 141 ++++++
.../implementation/ClientConstants.java | 18 +
.../EventHubAmqpConnection.java | 5 +
.../EventHubReactorAmqpConnection.java | 26 +-
.../EventHubReactorSession.java | 36 +-
.../implementation/EventHubSession.java | 14 +
.../PartitionPublishingState.java | 188 ++++++++
.../PartitionPublishingUtils.java | 32 ++
.../eventhubs/models/PartitionEvent.java | 2 +-
.../src/main/java/module-info.java | 1 +
.../eventhubs/EventDataBatchTest.java | 19 +-
.../EventHubConsumerAsyncClientTest.java | 18 +-
.../eventhubs/EventHubConsumerClientTest.java | 10 +-
.../EventHubPartitionAsyncConsumerTest.java | 94 +++-
...tHubProducerAsyncClientIdempotentTest.java | 438 ++++++++++++++++++
.../EventHubProducerAsyncClientTest.java | 22 +-
.../eventhubs/EventHubProducerClientTest.java | 10 +-
.../PartitionPublishigUtilsTest.java | 43 ++
.../azure-messaging-servicebus/pom.xml | 2 +-
45 files changed, 1781 insertions(+), 149 deletions(-)
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPublishingProperties.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingState.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingUtils.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIdempotentTest.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/PartitionPublishigUtilsTest.java
diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml
index c3fed3aa39e48..de34c0f3e7d79 100644
--- a/eng/jacoco-test-coverage/pom.xml
+++ b/eng/jacoco-test-coverage/pom.xml
@@ -84,7 +84,7 @@
com.azure
azure-core-amqp
- 1.7.0-beta.2
+ 1.7.0-beta.3
com.azure
diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt
index c6e004f772ca8..83acba67563af 100644
--- a/eng/versioning/version_client.txt
+++ b/eng/versioning/version_client.txt
@@ -45,7 +45,7 @@ com.azure:azure-communication-common;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-administration;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-sms;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.10.0;1.11.0-beta.1
-com.azure:azure-core-amqp;1.6.0;1.7.0-beta.2
+com.azure:azure-core-amqp;1.7.0-beta.2;1.7.0-beta.3
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.8;1.0.0-beta.9
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
@@ -178,7 +178,6 @@ com.microsoft:microsoft-opentelemetry-exporter-azuremonitor;1.0.0-beta.1;1.0.0-b
# note: The unreleased dependencies will not be manipulated with the automatic PR creation code.
unreleased_com.azure:azure-core-experimental;1.0.0-beta.9
unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.7
-unreleased_com.azure:azure-messaging-eventhubs;5.3.0
# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
@@ -189,4 +188,3 @@ unreleased_com.azure:azure-messaging-eventhubs;5.3.0
# note: Released beta versions will not be manipulated with the automatic PR creation code.
beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.1
beta_com.azure:azure-storage-common;12.9.0-beta.1
-beta_com.azure:azure-core-amqp;1.7.0-beta.1
diff --git a/sdk/core/azure-core-amqp-experimental/pom.xml b/sdk/core/azure-core-amqp-experimental/pom.xml
index 4ef5ebc7b8cb6..0d2c74a0bf5b7 100644
--- a/sdk/core/azure-core-amqp-experimental/pom.xml
+++ b/sdk/core/azure-core-amqp-experimental/pom.xml
@@ -58,7 +58,7 @@
com.azure
azure-core-amqp
- 1.7.0-beta.2
+ 1.7.0-beta.3
diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md
index db51536214309..cf0cfb197daac 100644
--- a/sdk/core/azure-core-amqp/CHANGELOG.md
+++ b/sdk/core/azure-core-amqp/CHANGELOG.md
@@ -1,5 +1,10 @@
# Release History
-## 1.7.0-beta.2 (Unreleased)
+## 1.7.0-beta.3 (Unreleased)
+
+## 1.7.0-beta.2 (2020-11-10)
+### New Features
+- Optionally enable idempotency of a send link to send AMQP messages with producer group id, producer owner level and
+producer sequence number in the message annotations.
## 1.7.0-beta.1 (2020-11-03)
### Dependency Updates
diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml
index b29fd9c1d1432..2df58ebda0569 100644
--- a/sdk/core/azure-core-amqp/pom.xml
+++ b/sdk/core/azure-core-amqp/pom.xml
@@ -14,7 +14,7 @@
com.azure
azure-core-amqp
- 1.7.0-beta.2
+ 1.7.0-beta.3
jar
Microsoft Azure Java Core AMQP Library
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
index 3cc986e779ef8..e0d7fc1c2deac 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpMessageConstant.java
@@ -94,6 +94,7 @@ public enum AmqpMessageConstant {
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),
+
/**
* The name representing scheduled enqueue time.
*/
@@ -122,7 +123,22 @@ public enum AmqpMessageConstant {
/**
* The identifier for deadletter reason.
*/
- DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");
+ DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason"),
+
+ /**
+ * The published sequence number when a message was sent from an idempotent producer.
+ */
+ PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME("com.microsoft:producer-sequence-number"),
+
+ /**
+ * The published epoch when a message was sent from an idempotent producer.
+ */
+ PRODUCER_EPOCH_ANNOTATION_NAME("com.microsoft:producer-epoch"),
+
+ /**
+ * The published producer id when a message was sent from an idempotent producer.
+ */
+ PRODUCER_ID_ANNOTATION_NAME("com.microsoft:producer-id");
private static final Map RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java
index fdf9de731921a..4470e0f5447fd 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java
@@ -139,7 +139,17 @@ public enum AmqpErrorCondition {
/**
* Error condition when a subscription client tries to create a rule with the name of an already existing rule.
*/
- ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists");
+ ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists"),
+
+ /**
+ * A producer is disconnected because another higher epoc producer connects to the service.
+ */
+ PRODUCER_EPOCH_STOLEN("com.microsoft:producer-epoch-stolen"),
+
+ /**
+ * An idempotent producer is sending an event without a consecutive producer sequence number.
+ */
+ OUT_OF_ORDER_SEQUENCE("com.microsoft:out-of-order-sequence");
private static final Map ERROR_CONSTANT_MAP = new HashMap<>();
private final String errorCondition;
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java
index 5c757249afb8c..9d894ee47077d 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpSendLink.java
@@ -6,12 +6,14 @@
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.engine.Delivery;
import reactor.core.publisher.Mono;
import java.util.List;
+import java.util.Map;
/**
* An AMQP link that sends information to the remote endpoint.
@@ -77,6 +79,12 @@ public interface AmqpSendLink extends AmqpLink {
*/
Mono getLinkSize();
+ /**
+ * Gets the properties of the send link returned from the service.
+ * @return A Mono that completes and returns the properties of the send link.
+ */
+ Mono> getRemoteProperties();
+
/**
* Gets the context for this AMQP send link.
*/
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ExceptionUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ExceptionUtil.java
index da4f2cd989646..e9b3419741a5c 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ExceptionUtil.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ExceptionUtil.java
@@ -70,6 +70,8 @@ public static Exception toException(String errorCondition, String description, A
case ENTITY_ALREADY_EXISTS:
case MESSAGE_NOT_FOUND:
case SESSION_NOT_FOUND:
+ case PRODUCER_EPOCH_STOLEN:
+ case OUT_OF_ORDER_SEQUENCE:
isTransient = false;
break;
case NOT_IMPLEMENTED:
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
index 0d11b882ea4b0..c722bb060ad4d 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java
@@ -41,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
@@ -276,6 +277,15 @@ public Mono getLinkSize() {
}
}
+ @Override
+ public Mono> getRemoteProperties() {
+ return RetryUtil.withRetry(
+ getEndpointStates()
+ .takeUntil(state -> state == AmqpEndpointState.ACTIVE)
+ .then(Mono.fromCallable(sender::getRemoteProperties)),
+ timeout, retry);
+ }
+
@Override
public boolean isDisposed() {
return isDisposed.get();
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
index c05e70c5e5498..864a7048e8eec 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
@@ -413,13 +413,14 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
* @param linkName Name of the receive link.
* @param entityPath Address in the message broker for the link.
* @param linkProperties The properties needed to be set on the link.
+ * @param senderDesiredCapabilities Capabilities that the sender link supports.
* @param timeout Operation timeout when creating the link.
* @param retry Retry policy to apply when link creation times out.
*
* @return A new instance of an {@link AmqpLink} with the correct properties set.
*/
protected Mono createProducer(String linkName, String entityPath, Duration timeout,
- AmqpRetryPolicy retry, Map linkProperties) {
+ AmqpRetryPolicy retry, Map linkProperties, Symbol[] senderDesiredCapabilities) {
if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
@@ -451,8 +452,8 @@ protected Mono createProducer(String linkName, String entityPath, Dura
}
logger.info("Creating a new sender link with linkName {}", linkName);
- return getSubscription(linkName, entityPath, linkProperties, timeout, retry,
- tokenManager);
+ return getSubscription(linkName, entityPath, linkProperties, senderDesiredCapabilities,
+ timeout, retry, tokenManager);
});
sink.success(computed.getLink());
@@ -463,11 +464,17 @@ protected Mono createProducer(String linkName, String entityPath, Dura
}));
}
+ protected Mono createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry,
+ Map linkProperties) {
+
+ return this.createProducer(linkName, entityPath, timeout, retry, linkProperties, null);
+ }
/**
* NOTE: Ensure this is invoked using the reactor dispatcher because proton-j is not thread-safe.
*/
private LinkSubscription getSubscription(String linkName, String entityPath,
- Map linkProperties, Duration timeout, AmqpRetryPolicy retry, TokenManager tokenManager) {
+ Map linkProperties, Symbol[] senderDesiredCapabilities, Duration timeout, AmqpRetryPolicy retry,
+ TokenManager tokenManager) {
final Sender sender = session.sender(linkName);
final Target target = new Target();
@@ -482,6 +489,10 @@ private LinkSubscription getSubscription(String linkName, String e
sender.setProperties(linkProperties);
}
+ if (senderDesiredCapabilities != null && senderDesiredCapabilities.length > 0) {
+ sender.setDesiredCapabilities(senderDesiredCapabilities);
+ }
+
final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(
sessionHandler.getConnectionId(), sessionHandler.getHostname(), linkName, entityPath);
BaseHandler.setHandler(sender, sendLinkHandler);
diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md
index 711f89b03c9da..810a91af84c84 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md
+++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md
@@ -1,6 +1,8 @@
# Release History
-## 1.4.0-beta.1 (Unreleased)
+## 1.4.0-beta.1 (2020-11-12)
+### Dependency Updates
+- Update `azure-messaging-eventhubs` dependency to `5.4.0-beta.1`.
## 1.3.1 (2020-10-30)
### Dependency Updates
diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md
index 4342fa104d377..d2d532a31eb84 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md
+++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md
@@ -27,7 +27,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
com.azure
azure-messaging-eventhubs-checkpointstore-blob
- 1.3.1
+ 1.4.0-beta.1
```
[//]: # ({x-version-update-end})
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
index 34539123bf8b3..c895478cb735f 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
+++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
@@ -1,6 +1,9 @@
# Release History
-## 5.4.0-beta.1 (Unreleased)
+## 5.4.0-beta.1 (2020-11-12)
+### Breaking changes
+- Removed `ObjectBatch` and related `createBatch()` and `send()` operations in favor of
+ supporting `BinaryData` in `EventData`.
## 5.3.1 (2020-10-30)
### Bug fixes
@@ -23,6 +26,11 @@ the partition consumer to rebuild the connection later.
- Update `azure-core-amqp` dependency to `1.6.0`.
- Update `azure-identity` dependency to `1.1.3`.
+## 5.3.0-beta.1 (2020-09-25)
+### New Features
+- A producer client can be configured to be an idempotent producer. When an event with the
+same producer group id and publishing sequence number is sent twice, only one will be accepted to the event hub.
+
## 5.2.0 (2020-09-11)
- Default scheme to 'sb://' if no scheme is set in 'Endpoint'.
- Update dependency version of `azure-core-amp` to `1.5.1`
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/README.md b/sdk/eventhubs/azure-messaging-eventhubs/README.md
index dad1c5b6ed1cb..4bbccca8e7c56 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/README.md
+++ b/sdk/eventhubs/azure-messaging-eventhubs/README.md
@@ -58,7 +58,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
com.azure
azure-messaging-eventhubs
- 5.3.1
+ 5.4.0-beta.1
```
[//]: # ({x-version-update-end})
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
index 40ee6c1a7a5f6..37b6df2c8e322 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
+++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
@@ -39,10 +39,15 @@
azure-core
1.10.0
+
+ com.azure
+ azure-core-experimental
+ 1.0.0-beta.8
+
com.azure
azure-core-amqp
- 1.6.0
+ 1.7.0-beta.2
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java
index bcf8f0d101fbb..4986ec5f3793a 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventData.java
@@ -3,8 +3,15 @@
package com.azure.messaging.eventhubs;
+import com.azure.core.amqp.AmqpMessageConstant;
+import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
+import com.azure.core.util.serializer.TypeReference;
+import reactor.core.publisher.Mono;
+import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
@@ -20,8 +27,11 @@
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
-import static com.azure.core.amqp.AmqpMessageConstant.PUBLISHER_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME;
+import static com.azure.core.amqp.AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME;
+import static com.azure.core.util.FluxUtil.monoError;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
@@ -52,18 +62,23 @@ public class EventData {
*/
static final Set RESERVED_SYSTEM_PROPERTIES;
+ private final ClientLogger logger = new ClientLogger(EventData.class);
private final Map properties;
- private final byte[] body;
+ private final BinaryData body;
private final SystemProperties systemProperties;
+ private ObjectSerializer serializer;
private Context context;
+ private Long publishedGroupId;
+ private Short publishedOwnerLevel;
+ private Integer publishedSequenceNumber;
+
static {
final Set properties = new HashSet<>();
properties.add(OFFSET_ANNOTATION_NAME.getValue());
properties.add(PARTITION_KEY_ANNOTATION_NAME.getValue());
properties.add(SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
properties.add(ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue());
- properties.add(PUBLISHER_ANNOTATION_NAME.getValue());
RESERVED_SYSTEM_PROPERTIES = Collections.unmodifiableSet(properties);
}
@@ -75,10 +90,7 @@ public class EventData {
* @throws NullPointerException if {@code body} is {@code null}.
*/
public EventData(byte[] body) {
- this.body = Objects.requireNonNull(body, "'body' cannot be null.");
- this.context = Context.NONE;
- this.properties = new HashMap<>();
- this.systemProperties = new SystemProperties();
+ this(BinaryData.fromBytes(Objects.requireNonNull(body, "'body' cannot be null.")));
}
/**
@@ -101,6 +113,15 @@ public EventData(String body) {
this(Objects.requireNonNull(body, "'body' cannot be null.").getBytes(UTF_8));
}
+ /**
+ * Creates an event with the provided {@link BinaryData} as payload.
+ *
+ * @param body The {@link BinaryData} payload for this event.
+ */
+ public EventData(BinaryData body) {
+ this(Objects.requireNonNull(body, "'body' cannot be null."), new SystemProperties(), Context.NONE);
+ }
+
/**
* Creates an event with the given {@code body}, system properties and context.
*
@@ -109,11 +130,12 @@ public EventData(String body) {
* @param context A specified key-value pair of type {@link Context}.
* @throws NullPointerException if {@code body}, {@code systemProperties}, or {@code context} is {@code null}.
*/
- EventData(byte[] body, SystemProperties systemProperties, Context context) {
+ EventData(BinaryData body, SystemProperties systemProperties, Context context) {
this.body = Objects.requireNonNull(body, "'body' cannot be null.");
this.context = Objects.requireNonNull(context, "'context' cannot be null.");
this.systemProperties = Objects.requireNonNull(systemProperties, "'systemProperties' cannot be null.");
this.properties = new HashMap<>();
+ this.commitProducerDataFromSysProperties(); // populate producer publishing when receiving an event.
}
/**
@@ -155,7 +177,7 @@ public Map getSystemProperties() {
* @return A byte array representing the data.
*/
public byte[] getBody() {
- return Arrays.copyOf(body, body.length);
+ return body.toBytes();
}
/**
@@ -164,7 +186,16 @@ public byte[] getBody() {
* @return UTF-8 decoded string representation of the event data.
*/
public String getBodyAsString() {
- return new String(body, UTF_8);
+ return new String(body.toBytes(), UTF_8);
+ }
+
+ /**
+ * Returns the {@link BinaryData} payload associated with this event.
+ *
+ * @return the {@link BinaryData} payload associated with this event.
+ */
+ public BinaryData getBodyAsBinaryData() {
+ return body;
}
/**
@@ -213,6 +244,80 @@ public Long getSequenceNumber() {
return systemProperties.getSequenceNumber();
}
+ /**
+ * Gets the producer sequence number that was assigned during publishing, if the event was successfully
+ * published by a sequence-aware producer. If the producer was not configured to apply
+ * sequence numbering or if the event has not yet been successfully published, this member
+ * will be {@code null}.
+ *
+ * The published sequence number is only populated and relevant when certain features
+ * of the producer are enabled. For example, it is used by idempotent publishing.
+ *
+ * @return The publishing sequence number assigned to the event at the time it was successfully published.
+ * {@code null} if the {@link EventData} hasn't been sent or it's sent without idempotent publishing enabled.
+ */
+ public Integer getPublishedSequenceNumber() {
+ return publishedSequenceNumber;
+ }
+
+ /**
+ * Gets the producer group id that was assigned during publishing, if the event was successfully
+ * published by a sequence-aware producer. If the producer was not configured to apply
+ * sequence numbering or if the event has not yet been successfully published, this member
+ * will be {@code null}.
+ *
+ * The producer group id is only populated and relevant when certain features
+ * of the producer are enabled. For example, it is used by idempotent publishing.
+ *
+ * @return The producer group id assigned to the event at the time it was successfully published.
+ * {@code null} if the {@link EventData} hasn't been sent or it's sent without idempotent publishing enabled.
+ */
+ Long getPublishedGroupId() {
+ return publishedGroupId;
+ }
+
+ /**
+ * Gets the producer owner level that was assigned during publishing, if the event was successfully
+ * published by a sequence-aware producer. If the producer was not configured to apply
+ * sequence numbering or if the event has not yet been successfully published, this member
+ * will be {@code null}.
+ *
+ * The producer owner level is only populated and relevant when certain features
+ * of the producer are enabled. For example, it is used by idempotent publishing.
+ *
+ * @return The producer owner level assigned to the event at the time it was successfully published.
+ * {@code null} if the {@link EventData} hasn't been sent or it's sent without idempotent publishing enabled.
+ */
+ Short getPublishedOwnerLevel() {
+ return publishedOwnerLevel;
+ }
+
+ void setProducerGroupIdInSysProperties(Long producerGroupId) {
+ this.getSystemProperties().put(
+ AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue(),
+ producerGroupId
+ );
+ }
+
+ void setProducerOwnerLevelInSysProperties(Short producerOwnerLevel) {
+ this.getSystemProperties().put(
+ AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue(),
+ producerOwnerLevel
+ );
+ }
+
+ void setPublishedSequenceNumberInSysProperties(Integer publishedSequenceNumber) {
+ this.getSystemProperties().put(
+ AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(),
+ publishedSequenceNumber);
+ }
+
+ void commitProducerDataFromSysProperties() {
+ this.publishedGroupId = this.systemProperties.getPublishedGroupId();
+ this.publishedOwnerLevel = this.systemProperties.getPublishedOwnerLevel();
+ this.publishedSequenceNumber = this.systemProperties.getPublishedSequenceNumber();
+ }
+
/**
* {@inheritDoc}
*/
@@ -226,7 +331,7 @@ public boolean equals(Object o) {
}
EventData eventData = (EventData) o;
- return Arrays.equals(body, eventData.body);
+ return Arrays.equals(body.toBytes(), eventData.body.toBytes());
}
/**
@@ -234,7 +339,7 @@ public boolean equals(Object o) {
*/
@Override
public int hashCode() {
- return Arrays.hashCode(body);
+ return Arrays.hashCode(body.toBytes());
}
/**
@@ -262,6 +367,43 @@ public EventData addContext(String key, Object value) {
return this;
}
+
+ /**
+ * Deserializes event payload into an object of type {@code T}.
+ *
+ * @param objectType Class object of type T.
+ * @param object type for deserialization.
+ * @return deserialized object as type T.
+ */
+ public T getDeserializedObject(Class objectType) {
+ return getDeserializedObjectAsync(objectType).block();
+ }
+
+ /**
+ * Deserializes event payload into object.
+ *
+ * @param objectType Class object of type T
+ * @param object type for deserialization
+ * @return deserialized object as type T
+ */
+ public Mono getDeserializedObjectAsync(Class objectType) {
+ if (this.serializer == null) {
+ return monoError(logger,
+ new NullPointerException("No serializer set for deserializing EventData payload."));
+ }
+ if (objectType == null) {
+ return monoError(logger, new IllegalArgumentException("objectType cannot be null."));
+ }
+
+ return serializer.deserializeAsync(new ByteArrayInputStream(getBody()),
+ TypeReference.createInstance(objectType));
+ }
+
+ EventData setSerializer(ObjectSerializer serializer) {
+ this.serializer = serializer;
+ return this;
+ }
+
/**
* A collection of properties populated by Azure Event Hubs service.
*/
@@ -349,6 +491,18 @@ private Long getSequenceNumber() {
return sequenceNumber;
}
+ private Integer getPublishedSequenceNumber() {
+ return (Integer) this.get(PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
+ }
+
+ private Long getPublishedGroupId() {
+ return (Long) this.get(PRODUCER_ID_ANNOTATION_NAME.getValue());
+ }
+
+ private Short getPublishedOwnerLevel() {
+ return (Short) this.get(PRODUCER_EPOCH_ANNOTATION_NAME.getValue());
+ }
+
@SuppressWarnings("unchecked")
private T removeSystemProperty(final String key) {
if (this.containsKey(key)) {
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java
index e605f0042c5eb..77ebc1addbdd5 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java
@@ -46,8 +46,8 @@
* @see EventHubClientBuilder See EventHubClientBuilder for examples of building an asynchronous or synchronous
* producer.
*/
-public final class EventDataBatch {
- private final ClientLogger logger = new ClientLogger(EventDataBatch.class);
+public class EventDataBatch {
+ private final ClientLogger logger = new ClientLogger(this.getClass());
private final Object lock = new Object();
private final int maxMessageSize;
private final String partitionKey;
@@ -55,13 +55,16 @@ public final class EventDataBatch {
private final List events;
private final byte[] eventBytes;
private final String partitionId;
+ private final boolean isPublishingSequenceNumberRequired;
private int sizeInBytes;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;
+ private Integer startingPublishedSequenceNumber;
- EventDataBatch(int maxMessageSize, String partitionId, String partitionKey, ErrorContextProvider contextProvider,
- TracerProvider tracerProvider, String entityPath, String hostname) {
+ EventDataBatch(int maxMessageSize, String partitionId, String partitionKey,
+ ErrorContextProvider contextProvider, TracerProvider tracerProvider, String entityPath,
+ String hostname) {
this.maxMessageSize = maxMessageSize;
this.partitionKey = partitionKey;
this.partitionId = partitionId;
@@ -72,6 +75,23 @@ public final class EventDataBatch {
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
this.hostname = hostname;
+ this.isPublishingSequenceNumberRequired = false;
+ }
+
+ EventDataBatch(int maxMessageSize, String partitionId, String partitionKey,
+ ErrorContextProvider contextProvider, TracerProvider tracerProvider, String entityPath,
+ String hostname, boolean isPublishingSequenceNumberRequired) {
+ this.maxMessageSize = maxMessageSize;
+ this.partitionKey = partitionKey;
+ this.partitionId = partitionId;
+ this.contextProvider = contextProvider;
+ this.events = new LinkedList<>();
+ this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
+ this.eventBytes = new byte[maxMessageSize];
+ this.tracerProvider = tracerProvider;
+ this.entityPath = entityPath;
+ this.hostname = hostname;
+ this.isPublishingSequenceNumberRequired = isPublishingSequenceNumberRequired;
}
/**
@@ -101,6 +121,20 @@ public int getSizeInBytes() {
return this.sizeInBytes;
}
+ /**
+ * Gets the sequence number of the first event in the batch, if the batch was successfully
+ * published by a sequence-aware producer. If the producer was not configured to apply
+ * sequence numbering or if the batch has not yet been successfully published, this member
+ * will be {@code null}.
+ *
+ * @return the publishing sequence number assigned to the first event in the batch at the time
+ * the batch was successfully published. {@code null} if the producer was not configured to apply
+ * sequence numbering or if the batch has not yet been successfully published.
+ */
+ public Integer getStartingPublishedSequenceNumber() {
+ return this.startingPublishedSequenceNumber;
+ }
+
/**
* Tries to add an {@link EventData event} to the batch.
*
@@ -112,7 +146,7 @@ public int getSizeInBytes() {
*/
public boolean tryAdd(final EventData eventData) {
if (eventData == null) {
- throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null"));
+ throw logger.logExceptionAsWarning(new NullPointerException("eventData cannot be null"));
}
EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData;
@@ -138,13 +172,14 @@ public boolean tryAdd(final EventData eventData) {
return true;
}
+
/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the message.
*
* @param eventData The Event to add tracing span for.
* @return the updated event data object.
*/
- private EventData traceMessageSpan(EventData eventData) {
+ EventData traceMessageSpan(EventData eventData) {
Optional eventContextData = eventData.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
@@ -168,6 +203,10 @@ private EventData traceMessageSpan(EventData eventData) {
return eventData;
}
+ void setStartingPublishedSequenceNumber(Integer startingPublishedSequenceNumber) {
+ this.startingPublishedSequenceNumber = startingPublishedSequenceNumber;
+ }
+
List getEvents() {
return events;
}
@@ -180,10 +219,29 @@ String getPartitionId() {
return partitionId;
}
- private int getSize(final EventData eventData, final boolean isFirst) {
+ int getSize(final EventData eventData, final boolean isFirst) {
Objects.requireNonNull(eventData, "'eventData' cannot be null.");
final Message amqpMessage = createAmqpMessage(eventData, partitionKey);
+ if (isPublishingSequenceNumberRequired) {
+ // Pre-allocate size for system properties "com.microsoft:producer-sequence-number",
+ // "com.microsoft:producer-epoch", and "com.microsoft:producer-producer-id".
+ // EventData doesn't have this system property until it's added just before an idempotent producer
+ // sends the EventData out.
+ final MessageAnnotations messageAnnotations = (amqpMessage.getMessageAnnotations() == null)
+ ? new MessageAnnotations(new HashMap<>())
+ : amqpMessage.getMessageAnnotations();
+ amqpMessage.setMessageAnnotations(messageAnnotations);
+ messageAnnotations.getValue().put(
+ Symbol.getSymbol(
+ AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()), Integer.MAX_VALUE);
+ messageAnnotations.getValue().put(
+ Symbol.getSymbol(
+ AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()), Short.MAX_VALUE);
+ messageAnnotations.getValue().put(
+ Symbol.getSymbol(
+ AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()), Long.MAX_VALUE);
+ }
int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size
eventSize += 16; // data section overhead
@@ -259,16 +317,17 @@ private Message createAmqpMessage(EventData event, String partitionKey) {
case REPLY_TO_GROUP_ID:
message.setReplyToGroupId((String) value);
break;
+ case PRODUCER_EPOCH_ANNOTATION_NAME:
+ case PRODUCER_ID_ANNOTATION_NAME:
+ case PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME:
+ EventHubMessageSerializer.setMessageAnnotation(message, key, value);
+ break;
default:
throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US,
"Property is not a recognized reserved property name: %s", key)));
}
} else {
- final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
- ? new MessageAnnotations(new HashMap<>())
- : message.getMessageAnnotations();
- messageAnnotations.getValue().put(Symbol.getSymbol(key), value);
- message.setMessageAnnotations(messageAnnotations);
+ EventHubMessageSerializer.setMessageAnnotation(message, key, value);
}
});
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
index 3a0598a08da51..fac22122c7ef3 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
@@ -6,13 +6,16 @@
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
+import com.azure.messaging.eventhubs.implementation.PartitionPublishingState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import java.io.Closeable;
+import java.util.Map;
import java.util.Objects;
/**
@@ -32,9 +35,17 @@ class EventHubAsyncClient implements Closeable {
private final boolean isSharedConnection;
private final Runnable onClientClose;
private final TracerProvider tracerProvider;
+ private final boolean isIdempotentPartitionPublishing;
+ private final Map initialPartitionPublishingStates;
+ private final ObjectSerializer serializer;
EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
- MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose) {
+ MessageSerializer messageSerializer, ObjectSerializer serializer, Scheduler scheduler,
+ boolean isSharedConnection, Runnable onClientClose, boolean isIdempotentPartitionPublishing,
+ Map initialPartitionPublishingStates
+ ) {
+
+
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.connectionProcessor = Objects.requireNonNull(connectionProcessor,
@@ -43,6 +54,9 @@ class EventHubAsyncClient implements Closeable {
this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
this.isSharedConnection = isSharedConnection;
+ this.isIdempotentPartitionPublishing = isIdempotentPartitionPublishing;
+ this.initialPartitionPublishingStates = initialPartitionPublishingStates;
+ this.serializer = serializer;
}
/**
@@ -104,8 +118,9 @@ Mono getPartitionProperties(String partitionId) {
*/
EventHubProducerAsyncClient createProducer() {
return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
- connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer, scheduler,
- isSharedConnection, onClientClose);
+ connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer,
+ serializer, scheduler, isSharedConnection, onClientClose, isIdempotentPartitionPublishing,
+ initialPartitionPublishingStates);
}
/**
@@ -129,8 +144,8 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou
}
return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
- connectionProcessor, messageSerializer, consumerGroup, prefetchCount, scheduler, isSharedConnection,
- onClientClose);
+ connectionProcessor, messageSerializer, serializer, consumerGroup, prefetchCount, scheduler,
+ isSharedConnection, onClientClose);
}
/**
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index 0ffe23f98ad15..3b311ad356d31 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -25,6 +25,7 @@
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
@@ -32,6 +33,11 @@
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import org.apache.qpid.proton.engine.SslDomain;
+
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+import com.azure.messaging.eventhubs.implementation.PartitionPublishingState;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@@ -44,7 +50,6 @@
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
/**
* This class provides a fluent builder API to aid the instantiation of {@link EventHubProducerAsyncClient}, {@link
@@ -144,6 +149,9 @@ public class EventHubClientBuilder {
private Integer prefetchCount;
private ClientOptions clientOptions;
private SslDomain.VerifyMode verifyMode;
+ private boolean isIdempotentPartitionPublishing;
+ private Map initialPartitionPublishingStates;
+ private ObjectSerializer serializer;
/**
* Keeps track of the open clients that were created from this builder when there is a shared connection.
@@ -388,6 +396,64 @@ public EventHubClientBuilder prefetchCount(int prefetchCount) {
return this;
}
+ /**
+ * Enables idempotent publishing when an {@link EventHubProducerAsyncClient} or {@link EventHubProducerClient}
+ * is built.
+ *
+ * If enabled, the producer will only be able to publish directly to partitions; it will not be able to publish to
+ * the Event Hubs gateway for automatic partition routing nor using a partition key.
+ *
+ * @return The updated {@link EventHubClientBuilder} object.
+ */
+ public EventHubClientBuilder enableIdempotentPartitionPublishing() {
+ this.isIdempotentPartitionPublishing = true;
+ return this;
+ }
+
+ /**
+ * Sets the idempotent publishing options to {@link EventHubProducerAsyncClient} or {@link EventHubProducerClient}
+ * when you build them.
+ *
+ * The set of options that can be specified to influence publishing behavior specific to the configured Event Hub
+ * partition.
+ * These options are not necessary in the majority of scenarios and are intended for use with specialized scenarios,
+ * such as when recovering the state used for idempotent publishing.
+ *
+ * It is highly recommended that these options only be specified if there is a proven need to do so; Incorrectly
+ * configuring these values may result in the built {@link EventHubProducerAsyncClient} or
+ * {@link EventHubProducerClient} instance unable to publish to the Event Hubs.
+ *
+ * These options are ignored when publishing to the Event Hubs gateway for automatic routing or when using a
+ * partition key.
+ *
+ * @param states A {@link Map} of {@link PartitionPublishingProperties} for each partition. The keys of the map
+ * are the partition ids.
+ * @return The updated {@link EventHubClientBuilder} object.
+ */
+ public EventHubClientBuilder initialPartitionPublishingStates(Map states) {
+ if (states != null) {
+ this.initialPartitionPublishingStates = new HashMap<>();
+ states.forEach((partitionId, state) -> {
+ this.initialPartitionPublishingStates.put(partitionId, new PartitionPublishingState(state));
+ });
+ } else {
+ this.initialPartitionPublishingStates = null;
+ }
+ return this;
+ }
+
+ /**
+ * Set ObjectSerializer implementation to be used for creating ObjectBatch.
+ *
+ * @param serializer ObjectSerializer implementation
+ *
+ * @return updated builder instance
+ */
+ public EventHubClientBuilder serializer(ObjectSerializer serializer) {
+ this.serializer = serializer;
+ return this;
+ }
+
/**
* Package-private method that sets the scheduler for the created Event Hub client.
*
@@ -454,6 +520,10 @@ public EventHubConsumerClient buildConsumerClient() {
* proxy is specified but the transport type is not {@link AmqpTransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubProducerAsyncClient buildAsyncProducerClient() {
+ if (initialPartitionPublishingStates != null && !isIdempotentPartitionPublishing) {
+ throw logger.logExceptionAsError(new IllegalArgumentException("'initialPartitionPublishingStates' "
+ + "shouldn't be set if 'idempotentPartitionPublishing' is not true."));
+ }
return buildAsyncClient().createProducer();
}
@@ -527,8 +597,9 @@ EventHubAsyncClient buildAsyncClient() {
final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));
- return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, scheduler,
- isSharedConnection.get(), this::onClientClose);
+ return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, serializer, scheduler,
+ isSharedConnection.get(), this::onClientClose,
+ isIdempotentPartitionPublishing, initialPartitionPublishingStates);
}
/**
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
index ee88f75aa6b0a..d74961075e874 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
@@ -12,6 +12,7 @@
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
@@ -70,6 +71,7 @@ public class EventHubConsumerAsyncClient implements Closeable {
private final String eventHubName;
private final EventHubConnectionProcessor connectionProcessor;
private final MessageSerializer messageSerializer;
+ private final ObjectSerializer serializer;
private final String consumerGroup;
private final int prefetchCount;
private final Scheduler scheduler;
@@ -83,12 +85,15 @@ public class EventHubConsumerAsyncClient implements Closeable {
new ConcurrentHashMap<>();
EventHubConsumerAsyncClient(String fullyQualifiedNamespace, String eventHubName,
- EventHubConnectionProcessor connectionProcessor, MessageSerializer messageSerializer, String consumerGroup,
- int prefetchCount, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClosed) {
+ EventHubConnectionProcessor connectionProcessor,
+ MessageSerializer messageSerializer, ObjectSerializer serializer,
+ String consumerGroup, int prefetchCount, Scheduler scheduler,
+ boolean isSharedConnection, Runnable onClientClosed) {
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
this.eventHubName = eventHubName;
this.connectionProcessor = connectionProcessor;
this.messageSerializer = messageSerializer;
+ this.serializer = serializer;
this.consumerGroup = consumerGroup;
this.prefetchCount = prefetchCount;
this.scheduler = scheduler;
@@ -370,8 +375,8 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
new AmqpReceiveLinkProcessor(prefetchCount, retryPolicy, connectionProcessor));
return new EventHubPartitionAsyncConsumer(linkMessageProcessor, messageSerializer, getFullyQualifiedNamespace(),
- getEventHubName(), consumerGroup, partitionId, initialPosition,
- receiveOptions.getTrackLastEnqueuedEventProperties(), scheduler);
+ getEventHubName(), consumerGroup, partitionId, serializer,
+ initialPosition, receiveOptions.getTrackLastEnqueuedEventProperties(), scheduler);
}
boolean isConnectionClosed() {
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java
index a936ae5ed1db4..41dae16296352 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java
@@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.exception.AzureException;
+import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ManagementChannel;
@@ -227,7 +228,7 @@ private EventData deserializeEventData(Message message) {
}
final EventData.SystemProperties systemProperties = new EventData.SystemProperties(receiveProperties);
- final EventData eventData = new EventData(body, systemProperties, Context.NONE);
+ final EventData eventData = new EventData(BinaryData.fromBytes(body), systemProperties, Context.NONE);
final Map properties = message.getApplicationProperties() == null
? new HashMap<>()
: message.getApplicationProperties().getValue();
@@ -296,7 +297,7 @@ private Instant getDate(Map, ?> amqpBody, String key) {
/*
* Sets AMQP protocol header values on the AMQP message.
*/
- private static void setSystemProperties(EventData eventData, Message message) {
+ static void setSystemProperties(EventData eventData, Message message) {
if (eventData.getSystemProperties() == null || eventData.getSystemProperties().isEmpty()) {
return;
}
@@ -349,6 +350,11 @@ private static void setSystemProperties(EventData eventData, Message message) {
case REPLY_TO_GROUP_ID:
message.setReplyToGroupId((String) value);
break;
+ case PRODUCER_EPOCH_ANNOTATION_NAME:
+ case PRODUCER_ID_ANNOTATION_NAME:
+ case PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME:
+ setMessageAnnotation(message, key, value);
+ break;
default:
throw new IllegalArgumentException(
String.format(
@@ -356,15 +362,19 @@ private static void setSystemProperties(EventData eventData, Message message) {
key));
}
} else {
- final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
- ? new MessageAnnotations(new HashMap<>())
- : message.getMessageAnnotations();
- messageAnnotations.getValue().put(Symbol.getSymbol(key), value);
- message.setMessageAnnotations(messageAnnotations);
+ setMessageAnnotation(message, key, value);
}
});
}
+ static void setMessageAnnotation(Message message, String key, Object value) {
+ final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
+ ? new MessageAnnotations(new HashMap<>())
+ : message.getMessageAnnotations();
+ messageAnnotations.getValue().put(Symbol.getSymbol(key), value);
+ message.setMessageAnnotations(messageAnnotations);
+ }
+
private static int getPayloadSize(Message msg) {
if (msg == null || msg.getBody() == null) {
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java
index d0b070343d4d6..386810b69d85d 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java
@@ -5,6 +5,7 @@
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
@@ -39,13 +40,16 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
private final Scheduler scheduler;
private final EmitterProcessor emitterProcessor;
private final EventPosition initialPosition;
+ private final ObjectSerializer serializer;
private volatile Long currentOffset;
EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor,
- MessageSerializer messageSerializer, String fullyQualifiedNamespace, String eventHubName, String consumerGroup,
- String partitionId, AtomicReference> currentEventPosition,
- boolean trackLastEnqueuedEventProperties, Scheduler scheduler) {
+ MessageSerializer messageSerializer, String fullyQualifiedNamespace,
+ String eventHubName, String consumerGroup, String partitionId,
+ ObjectSerializer serializer,
+ AtomicReference> currentEventPosition,
+ boolean trackLastEnqueuedEventProperties, Scheduler scheduler) {
this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(),
"'currentEventPosition.get().get()' cannot be null.");
this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor;
@@ -54,6 +58,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
this.eventHubName = eventHubName;
this.consumerGroup = consumerGroup;
this.partitionId = partitionId;
+ this.serializer = serializer;
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
@@ -136,6 +141,8 @@ private PartitionEvent onMessageReceived(Message message) {
}
}
+ event.setSerializer(this.serializer);
+
final PartitionContext partitionContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
consumerGroup, partitionId);
return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get());
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
index 760b7d6b135d6..442383424f726 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
@@ -18,10 +18,13 @@
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
+import com.azure.messaging.eventhubs.implementation.PartitionPublishingState;
+import com.azure.messaging.eventhubs.implementation.PartitionPublishingUtils;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -33,6 +36,7 @@
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -112,15 +116,21 @@ public class EventHubProducerAsyncClient implements Closeable {
private final Scheduler scheduler;
private final boolean isSharedConnection;
private final Runnable onClientClose;
+ private final boolean isIdempotentPartitionPublishing;
+ private final Map partitionPublishingStates;
+ private final ObjectSerializer serializer;
/**
* Creates a new instance of this {@link EventHubProducerAsyncClient} that can send messages to a single partition
* when {@link CreateBatchOptions#getPartitionId()} is not null or an empty string. Otherwise, allows the service to
* load balance the messages amongst available partitions.
*/
- EventHubProducerAsyncClient(String fullyQualifiedNamespace, String eventHubName,
+ EventHubProducerAsyncClient(
+ String fullyQualifiedNamespace, String eventHubName,
EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider,
- MessageSerializer messageSerializer, Scheduler scheduler, boolean isSharedConnection, Runnable onClientClose) {
+ MessageSerializer messageSerializer, ObjectSerializer serializer, Scheduler scheduler,
+ boolean isSharedConnection, Runnable onClientClose, boolean isIdempotentPartitionPublishing,
+ Map initialPartitionPublishingStates) {
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
@@ -134,6 +144,26 @@ public class EventHubProducerAsyncClient implements Closeable {
this.retryPolicy = getRetryPolicy(retryOptions);
this.scheduler = scheduler;
this.isSharedConnection = isSharedConnection;
+ this.serializer = serializer;
+ this.isIdempotentPartitionPublishing = isIdempotentPartitionPublishing;
+ if (isIdempotentPartitionPublishing) {
+ if (initialPartitionPublishingStates == null) {
+ this.partitionPublishingStates = new HashMap<>();
+ } else {
+ this.partitionPublishingStates = initialPartitionPublishingStates;
+ }
+ } else {
+ this.partitionPublishingStates = null;
+ }
+ }
+
+ EventHubProducerAsyncClient(String fullyQualifiedNamespace, String eventHubName,
+ EventHubConnectionProcessor connectionProcessor, AmqpRetryOptions retryOptions, TracerProvider tracerProvider,
+ MessageSerializer messageSerializer, ObjectSerializer serializer, Scheduler scheduler,
+ boolean isSharedConnection, Runnable onClientClose) {
+ this(fullyQualifiedNamespace, eventHubName, connectionProcessor, retryOptions, tracerProvider,
+ messageSerializer, serializer, scheduler, isSharedConnection, onClientClose,
+ false, null);
}
/**
@@ -190,6 +220,43 @@ public Mono getPartitionProperties(String partitionId) {
.flatMap(node -> node.getPartitionProperties(partitionId));
}
+ /**
+ * Get the idempotent producer's publishing state of a partition.
+ * @param partitionId The partition id of the publishing state
+ * @return A mono that has the {@link PartitionPublishingProperties}.
+ * {@code null} if the partition doesn't have any state yet.
+ * @throws UnsupportedOperationException if this producer isn't an idempotent producer.
+ */
+ Mono getPartitionPublishingProperties(String partitionId) {
+ PartitionPublishingState publishingState = getClientPartitionPublishingState(partitionId);
+ if (publishingState.isFromLink()) {
+ return Mono.defer(() -> Mono.just(publishingState.toPartitionPublishingProperties()));
+ } else {
+ return withRetry(getSendLink(partitionId).flatMap(amqpSendLink ->
+ Mono.just(getClientPartitionPublishingState(partitionId))),
+ retryOptions.getTryTimeout(), retryPolicy).map(
+ PartitionPublishingState::toPartitionPublishingProperties);
+ }
+ }
+
+ /**
+ * Get the idempotent producer's publishing state of a partition.
+ * @param partitionId The partition id of the publishing state
+ * @return A mono that has the {@link PartitionPublishingState}.
+ * {@code null} if the partition doesn't have any state yet.
+ * @throws UnsupportedOperationException if this producer isn't an idempotent producer.
+ */
+ Mono getPartitionPublishingState(String partitionId) {
+ PartitionPublishingState publishingState = getClientPartitionPublishingState(partitionId);
+ if (publishingState.isFromLink()) {
+ return Mono.defer(() -> Mono.just(publishingState));
+ } else {
+ return withRetry(getSendLink(partitionId).flatMap(amqpSendLink ->
+ Mono.just(getClientPartitionPublishingState(partitionId))),
+ retryOptions.getTryTimeout(), retryPolicy);
+ }
+ }
+
/**
* Creates an {@link EventDataBatch} that can fit as many events as the transport allows.
*
@@ -212,6 +279,15 @@ public Mono createBatch(CreateBatchOptions options) {
if (options == null) {
return monoError(logger, new NullPointerException("'options' cannot be null."));
}
+ if (isIdempotentPartitionPublishing && CoreUtils.isNullOrEmpty(options.getPartitionId())) {
+ return monoError(logger, new IllegalArgumentException(
+ "An idempotent producer can not create an EventDataBatch without partition id"));
+ }
+
+ Mono optionsError = validateBatchOptions(options);
+ if (optionsError != null) {
+ return optionsError;
+ }
final String partitionKey = options.getPartitionKey();
final String partitionId = options.getPartitionId();
@@ -236,7 +312,6 @@ public Mono createBatch(CreateBatchOptions options) {
final int maximumLinkSize = size > 0
? size
: MAX_MESSAGE_LENGTH_BYTES;
-
if (batchMaxSize > maximumLinkSize) {
return monoError(logger,
new IllegalArgumentException(String.format(Locale.US,
@@ -249,10 +324,12 @@ public Mono createBatch(CreateBatchOptions options) {
: maximumLinkSize;
return Mono.just(new EventDataBatch(batchSize, partitionId, partitionKey, link::getErrorContext,
- tracerProvider, link.getEntityPath(), link.getHostname()));
+ tracerProvider, link.getEntityPath(), link.getHostname(),
+ isIdempotentPartitionPublishing));
}));
}
+
/**
* Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
* allowed, an exception will be triggered and the send will fail.
@@ -383,22 +460,36 @@ Mono send(Flux events, SendOptions options) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
} else if (options == null) {
return monoError(logger, new NullPointerException("'options' cannot be null."));
+ } else if (options.getPartitionId() == null && isIdempotentPartitionPublishing) {
+ return monoError(logger, new IllegalArgumentException("Please set the partition id in `options` "
+ + "because this producer client is an idempotent producer"));
}
return sendInternal(events, options).publishOn(scheduler);
}
/**
- * Sends the batch to the associated Event Hub.
+ * Sends the event data batch to the associated Event Hub.
*
* @param batch The batch to send to the service.
* @return A {@link Mono} that completes when the batch is pushed to the service.
- * @throws NullPointerException if {@code batch} is {@code null}.
+ * @throws NullPointerException if {@code eventDataBatch} is {@code null}.
* @see EventHubProducerAsyncClient#createBatch()
* @see EventHubProducerAsyncClient#createBatch(CreateBatchOptions)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono send(EventDataBatch batch) {
+ return this.sendInternal(batch);
+ }
+
+ /**
+ * Internal batch send for EventDataBatchBase implementations.
+ *
+ * @param batch The batch to send to the service.
+ * @return A {@link Mono} that completes when the batch is pushed to the service.
+ * @throws NullPointerException if {@code batch} is {@code null}.
+ */
+ private Mono sendInternal(EventDataBatch batch) {
if (batch == null) {
return monoError(logger, new NullPointerException("'batch' cannot be null."));
} else if (batch.getEvents().isEmpty()) {
@@ -434,16 +525,9 @@ public Mono send(EventDataBatch batch) {
}
tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, event.getContext()));
}
- final Message message = messageSerializer.serialize(event);
-
- if (!CoreUtils.isNullOrEmpty(partitionKey)) {
- final MessageAnnotations messageAnnotations = message.getMessageAnnotations() == null
- ? new MessageAnnotations(new HashMap<>())
- : message.getMessageAnnotations();
- messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
- message.setMessageAnnotations(messageAnnotations);
+ if (!isIdempotentPartitionPublishing) {
+ messages.add(createMessageFromEvent(event, partitionKey));
}
- messages.add(message);
}
if (isTracingEnabled) {
@@ -456,18 +540,65 @@ public Mono send(EventDataBatch batch) {
// Start send span and store updated context
parentContext.set(tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, finalSharedContext, ProcessKind.SEND));
}
-
- return withRetry(getSendLink(batch.getPartitionId())
- .flatMap(link ->
- messages.size() == 1
+ if (isIdempotentPartitionPublishing) {
+ PartitionPublishingState publishingState = getClientPartitionPublishingState(batch.getPartitionId());
+ return Mono.fromRunnable(() -> {
+ publishingState.getSemaphore().acquireUninterruptibly();
+ int seqNumber = publishingState.getSequenceNumber();
+ for (EventData eventData : batch.getEvents()) {
+ eventData.setProducerGroupIdInSysProperties(publishingState.getProducerGroupId());
+ eventData.setPublishedSequenceNumberInSysProperties(seqNumber);
+ eventData.setProducerOwnerLevelInSysProperties(publishingState.getOwnerLevel());
+ seqNumber = PartitionPublishingUtils.incrementSequenceNumber(seqNumber);
+ messages.add(createMessageFromEvent(eventData, partitionKey));
+ }
+ }).then(
+ withRetry(getSendLink(batch.getPartitionId())
+ .flatMap(
+ link -> messages.size() == 1
? link.send(messages.get(0))
- : link.send(messages)), retryOptions.getTryTimeout(), retryPolicy)
- .publishOn(scheduler)
- .doOnEach(signal -> {
+ : link.send(messages)),
+ retryOptions.getTryTimeout(), retryPolicy
+ )).publishOn(scheduler).doOnEach(signal -> {
if (isTracingEnabled) {
tracerProvider.endSpan(parentContext.get(), signal);
}
- });
+ }).thenEmpty(Mono.fromRunnable(() -> {
+ // Update back if send is successful
+ batch.setStartingPublishedSequenceNumber(publishingState.getSequenceNumber());
+ for (EventData eventData : batch.getEvents()) {
+ eventData.commitProducerDataFromSysProperties();
+ }
+ publishingState.incrementSequenceNumber(batch.getCount());
+ })).doFinally(// Release the partition state semaphore
+ signalType -> {
+ publishingState.getSemaphore().release();
+ }
+ );
+ } else {
+ return withRetry(getSendLink(batch.getPartitionId())
+ .flatMap(link -> messages.size() == 1
+ ? link.send(messages.get(0))
+ : link.send(messages)), retryOptions.getTryTimeout(), retryPolicy)
+ .publishOn(scheduler)
+ .doOnEach(signal -> {
+ if (isTracingEnabled) {
+ tracerProvider.endSpan(parentContext.get(), signal);
+ }
+ });
+ }
+ }
+
+ private Message createMessageFromEvent(EventData event, String partitionKey) {
+ final Message message = messageSerializer.serialize(event);
+ if (!CoreUtils.isNullOrEmpty(partitionKey)) {
+ final MessageAnnotations messageAnnotations = message.getMessageAnnotations() == null
+ ? new MessageAnnotations(new HashMap<>())
+ : message.getMessageAnnotations();
+ messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
+ message.setMessageAnnotations(messageAnnotations);
+ }
+ return message;
}
private Mono sendInternal(Flux events, SendOptions options) {
@@ -491,7 +622,8 @@ private Mono sendInternal(Flux events, SendOptions options) {
.setPartitionId(options.getPartitionId())
.setMaximumSizeInBytes(batchSize);
return events.collect(new EventDataCollector(batchOptions, 1, link::getErrorContext,
- tracerProvider, link.getEntityPath(), link.getHostname()));
+ tracerProvider, link.getEntityPath(), link.getHostname(),
+ isIdempotentPartitionPublishing));
})
.flatMap(list -> sendInternal(Flux.fromIterable(list))));
}
@@ -511,12 +643,85 @@ private String getEntityPath(String partitionId) {
: String.format(Locale.US, SENDER_ENTITY_PATH_FORMAT, eventHubName, partitionId);
}
+ private Mono updatePublishingState(String partitionId, AmqpSendLink amqpSendLink) {
+ if (isIdempotentPartitionPublishing) {
+ return amqpSendLink.getRemoteProperties().map(properties -> {
+ setPartitionPublishingState(
+ partitionId, (Long) properties.get(ClientConstants.PRODUCER_ID),
+ (Short) properties.get(ClientConstants.PRODUCER_EPOCH),
+ (Integer) properties.get(ClientConstants.PRODUCER_SEQUENCE_NUMBER)
+ );
+ return amqpSendLink;
+ });
+ } else {
+ return Mono.just(amqpSendLink);
+ }
+ }
+
+ /**
+ * Get the idempotent producer's publishing state of a partition from the client side maintained state.
+ * It doesn't create a link to get state from the service.
+ */
+ private PartitionPublishingState getClientPartitionPublishingState(String partitionId) {
+ if (!isIdempotentPartitionPublishing) {
+ throw logger.logExceptionAsWarning(
+ new IllegalStateException("getPartitionPublishingState() shouldn't be called if the producer"
+ + " is not an idempotent producer."));
+ }
+ if (partitionPublishingStates.containsKey(partitionId)) {
+ return partitionPublishingStates.get(partitionId);
+ } else {
+ synchronized (partitionPublishingStates) {
+ if (partitionPublishingStates.containsKey(partitionId)) { // recheck after locked.
+ return partitionPublishingStates.get(partitionId);
+ }
+ PartitionPublishingState state = new PartitionPublishingState();
+ partitionPublishingStates.put(partitionId, state);
+ return state;
+ }
+ }
+ }
+
+ private void setPartitionPublishingState(
+ String partitionId, Long producerGroupId, Short ownerLevel, Integer sequenceNumber) {
+ PartitionPublishingState publishingState = getClientPartitionPublishingState(partitionId);
+ if (publishingState != null
+ && (publishingState.getSequenceNumber() == null || publishingState.getSequenceNumber() <= sequenceNumber)) {
+ publishingState.setOwnerLevel(ownerLevel);
+ publishingState.setProducerGroupId(producerGroupId);
+ publishingState.setSequenceNumber(sequenceNumber);
+ publishingState.setFromLink(true);
+ }
+ }
+
private Mono getSendLink(String partitionId) {
final String entityPath = getEntityPath(partitionId);
final String linkName = getEntityPath(partitionId);
return connectionProcessor
- .flatMap(connection -> connection.createSendLink(linkName, entityPath, retryOptions));
+ .flatMap(connection -> isIdempotentPartitionPublishing
+ ? connection.createSendLink(
+ linkName, entityPath, retryOptions, true, getClientPartitionPublishingState(partitionId))
+ : connection.createSendLink(
+ linkName, entityPath, retryOptions))
+ .flatMap(amqpSendLink ->
+ updatePublishingState(partitionId, amqpSendLink));
+ }
+
+ private Mono validateBatchOptions(CreateBatchOptions options) {
+ if (!CoreUtils.isNullOrEmpty(options.getPartitionKey())
+ && !CoreUtils.isNullOrEmpty(options.getPartitionId())) {
+ return monoError(logger, new IllegalArgumentException(String.format(Locale.US,
+ "CreateBatchOptions.getPartitionKey() and CreateBatchOptions.getPartitionId() are both set. "
+ + "Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'",
+ options.getPartitionKey(), options.getPartitionId())));
+ } else if (!CoreUtils.isNullOrEmpty(options.getPartitionKey())
+ && options.getPartitionKey().length() > MAX_PARTITION_KEY_LENGTH) {
+ return monoError(logger, new IllegalArgumentException(String.format(Locale.US,
+ "Partition key '%s' exceeds the maximum allowed length: '%s'.", options.getPartitionKey(),
+ MAX_PARTITION_KEY_LENGTH)));
+ }
+ return null;
}
/**
@@ -552,11 +757,12 @@ private static class EventDataCollector implements Collector 0
? options.getMaximumSizeInBytes()
@@ -567,9 +773,10 @@ private static class EventDataCollector implements Collector, EventData> accumulator() {
}
currentBatch = new EventDataBatch(maxMessageSize, partitionId, partitionKey, contextProvider,
- tracerProvider, entityPath, hostname);
+ tracerProvider, entityPath, hostname, this.isCreatedByIdempotentProducer);
currentBatch.tryAdd(event);
list.add(batch);
};
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
index f805d67b2c256..597fc85629553 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
@@ -118,6 +118,17 @@ public PartitionProperties getPartitionProperties(String partitionId) {
return producer.getPartitionProperties(partitionId).block(tryTimeout);
}
+ /**
+ * Get the idempotent producer's publishing state of a partition.
+ * @param partitionId The partition id of the publishing state
+ * @return A mono that has the {@link PartitionPublishingProperties}.
+ * {@code null} if the partition doesn't have any state yet.
+ * @throws UnsupportedOperationException if this producer isn't an idempotent producer.
+ */
+ public PartitionPublishingProperties getPartitionPublishingProperties(String partitionId) {
+ return producer.getPartitionPublishingProperties(partitionId).block();
+ }
+
/**
* Creates an {@link EventDataBatch} that can fit as many events as the transport allows.
*
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
index a837989ae16a7..213e8638c676c 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
@@ -12,6 +12,7 @@
import com.azure.core.exception.AzureException;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.models.CloseContext;
@@ -177,6 +178,18 @@ public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, St
return this;
}
+ /**
+ * Set ObjectSerializer implementation to be used for creating ObjectBatch.
+ *
+ * @param serializer ObjectSerializer implementation
+ *
+ * @return updated builder instance
+ */
+ public EventProcessorClientBuilder serializer(ObjectSerializer serializer) {
+ eventHubClientBuilder.serializer(serializer);
+ return this;
+ }
+
/**
* Sets the proxy configuration to use for {@link EventHubAsyncClient}. When a proxy is configured, {@link
* AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPublishingProperties.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPublishingProperties.java
new file mode 100644
index 0000000000000..bad180cc5cbe9
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPublishingProperties.java
@@ -0,0 +1,141 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.annotation.Fluent;
+
+/**
+ * Store the starting and running state of a partition, which an idempotent producer sends events to.
+ */
+
+@Fluent
+public final class PartitionPublishingProperties {
+ private Short ownerLevel;
+ private Long producerGroupId;
+ private Integer sequenceNumber;
+
+ /**
+ * Create a PartitionPublishingState with producer group id, owner level and starting sequence number
+ * being {@code null}.
+ */
+ PartitionPublishingProperties() {
+ }
+
+ /**
+ * Create a PartitionPublishingState with the producer group id, owner level and starting sequence number.
+ *
+ * @param producerGroupId See {@link #getProducerGroupId()}}
+ * @param ownerLevel See {@link #getOwnerLevel()}
+ * @param sequenceNumber See {@link #getSequenceNumber()} ()}
+ */
+ public PartitionPublishingProperties(Long producerGroupId, Short ownerLevel, Integer sequenceNumber) {
+ this.ownerLevel = ownerLevel;
+ this.producerGroupId = producerGroupId;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ /**
+ * Gets the owner level that indicates a publishing is intended to be performed exclusively for events in the
+ * requested partition in the context of the associated producer group. To do so, publishing will attempt to assert
+ * ownership over the partition; in the case where more than one publisher in the producer group attempts to assert
+ * ownership for the same partition, the one having a larger owner level value will "win".
+ *
+ * When an owner level is specified, other exclusive publishers which have a lower owner level within the context of
+ * the same producer group will either not be allowed to be created or, if they already exist, will encounter an
+ * exception during the next attempted operation. Should there be multiple producers in the producer group with the
+ * same owner level, each of them will be able to publish to the partition.
+ *
+ * Producers with no owner level or which belong to a different producer group are permitted to publish to the
+ * associated partition without restriction or awareness of other exclusive producers.
+ *
+ * The owner level is only recognized and relevant when certain features of the producer are enabled. For example,
+ * it is used by idempotent publishing.
+ *
+ * An {@link com.azure.core.amqp.exception.AmqpException} will occur if an {@link EventHubProducerAsyncClient} or
+ * {@link EventHubProducerClient} is unable to publish events to the
+ * Event Hub partition for the given producer group id. In this case, the errorCondition of
+ * {@link com.azure.core.amqp.exception.AmqpException} will be set to
+ * {@link com.azure.core.amqp.exception.AmqpErrorCondition#PRODUCER_EPOCH_STOLEN}.
+ *
+ * @see EventHubClientBuilder#enableIdempotentPartitionPublishing() ()
+ *
+ * @return The relative priority to associate with an exclusive publisher; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+ public Short getOwnerLevel() {
+ return ownerLevel;
+ }
+
+ /**
+ * Gets the identifier of the producer group that this producer is associated with when publishing to the
+ * associated partition. Events will be published in the context of this group.
+ *
+ * The producer group is only recognized and relevant when certain features of the producer are enabled.
+ * For example, it is used by idempotent publishing.
+ *
+ * @see EventHubClientBuilder#enableIdempotentPartitionPublishing() ()
+ *
+ * @return The identifier of the producer group to associate with the partition; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+ public Long getProducerGroupId() {
+ return producerGroupId;
+ }
+
+ /**
+ * Get the starting number that should be used for the automatic sequencing of events for the associated partition,
+ * when published by this producer.
+ *
+ * The starting sequence number is only recognized and relevant when certain features of the producer are enabled.
+ * For example, it is used by idempotent publishing.
+ *
+ * @see EventHubClientBuilder#enableIdempotentPartitionPublishing() ()
+ *
+ * @return The starting sequence number to associate with the partition; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+ public Integer getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * Set the owner level.
+ * @param ownerLevel The owner level of the idempotent producer.
+ * @return The updated {@link PartitionPublishingProperties} object.
+ */
+ public PartitionPublishingProperties setOwnerLevel(Short ownerLevel) {
+ this.ownerLevel = ownerLevel;
+ return this;
+ }
+
+ /**
+ * Set the producer group id.
+ * @param producerGroupId The producer group id of the idempotent producer.
+ * @return The updated {@link PartitionPublishingProperties} object.
+ */
+ public PartitionPublishingProperties setProducerGroupId(Long producerGroupId) {
+ this.producerGroupId = producerGroupId;
+ return this;
+ }
+
+ /**
+ * Set the sequence number.
+ * @param sequenceNumber The next publishing sequence number of a partition when an idempotent producer send
+ * an {@link EventData} to.
+ * @return The updated {@link PartitionPublishingProperties} object.
+ */
+ public PartitionPublishingProperties setSequenceNumber(Integer sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionPublishingProperties{"
+ + "ownerLevel=" + ownerLevel
+ + ", producerGroupId=" + producerGroupId
+ + ", sequenceNumber=" + sequenceNumber
+ + '}';
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
index b4afd24461d57..c0b78c90139ce 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
@@ -3,8 +3,13 @@
package com.azure.messaging.eventhubs.implementation;
+import com.azure.core.amqp.AmqpMessageConstant;
+import org.apache.qpid.proton.amqp.Symbol;
+
import java.time.Duration;
+import static com.azure.core.amqp.implementation.AmqpConstants.VENDOR;
+
public final class ClientConstants {
public static final String AZURE_ACTIVE_DIRECTORY_SCOPE = "https://eventhubs.azure.net/.default";
// Please see here
@@ -23,4 +28,17 @@ public final class ClientConstants {
*/
public static final String ENDPOINT_FORMAT = "sb://%s.%s";
public static final String AZ_TRACING_SERVICE_NAME = "EventHubs.";
+
+
+ // Symbols used on links
+ public static final Symbol EPOCH = Symbol.valueOf(VENDOR + ":epoch");
+ public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(
+ VENDOR + ":enable-receiver-runtime-metric");
+ public static final Symbol ENABLE_IDEMPOTENT_PRODUCER = Symbol.valueOf(VENDOR + ":idempotent-producer");
+
+ public static final Symbol PRODUCER_EPOCH = Symbol.valueOf(
+ AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue());
+ public static final Symbol PRODUCER_ID = Symbol.valueOf(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue());
+ public static final Symbol PRODUCER_SEQUENCE_NUMBER = Symbol.valueOf(
+ AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue());
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubAmqpConnection.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubAmqpConnection.java
index e992419c41ba5..7b72b27f5a3bd 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubAmqpConnection.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubAmqpConnection.java
@@ -29,8 +29,13 @@ public interface EventHubAmqpConnection extends AmqpConnection {
* @param linkName The name of the link.
* @param entityPath The remote address to connect to for the message broker.
* @param retryOptions Options to use when creating the link.
+ * @param idempotentPartitionPublishing Enable the idempotent producer feature when creating the link.
+ * @param publishingState Set the starting state
* @return A new or existing send link that is connected to the given {@code entityPath}.
*/
+ Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions,
+ boolean idempotentPartitionPublishing, PartitionPublishingState publishingState);
+
Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions);
/**
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java
index 77466160c6086..c83d3b63acd19 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.java
@@ -96,14 +96,24 @@ public Mono getManagementNode() {
* @return A new or existing send link that is connected to the given {@code entityPath}.
*/
@Override
- public Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions) {
- return createSession(entityPath).flatMap(session -> {
- logger.verbose("Get or create producer for path: '{}'", entityPath);
- final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
-
- return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy)
- .cast(AmqpSendLink.class);
- });
+ public Mono createSendLink(
+ String linkName, String entityPath, AmqpRetryOptions retryOptions,
+ boolean idempotentPartitionPublishing, PartitionPublishingState publishingState
+ ) {
+ return createSession(entityPath).cast(EventHubSession.class)
+ .flatMap(session -> {
+ logger.verbose("Get or create producer for path: '{}'", entityPath);
+ final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
+ return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), retryPolicy,
+ idempotentPartitionPublishing, publishingState);
+ });
+ }
+
+ @Override
+ public Mono createSendLink(
+ String linkName, String entityPath, AmqpRetryOptions retryOptions
+ ) {
+ return this.createSendLink(linkName, entityPath, retryOptions, false, null);
}
/**
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java
index e0dd0c68a3bd8..40238e5871523 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubReactorSession.java
@@ -7,6 +7,7 @@
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
+import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
@@ -33,16 +34,11 @@
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME;
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
-import static com.azure.core.amqp.implementation.AmqpConstants.VENDOR;
/**
* An AMQP session for Event Hubs.
*/
class EventHubReactorSession extends ReactorSession implements EventHubSession {
- private static final Symbol EPOCH = Symbol.valueOf(VENDOR + ":epoch");
- private static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME =
- Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
-
private final ClientLogger logger = new ClientLogger(EventHubReactorSession.class);
/**
@@ -68,6 +64,32 @@ class EventHubReactorSession extends ReactorSession implements EventHubSession {
messageSerializer, openTimeout, retryPolicy);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono createProducer(String linkName, String entityPath, Duration timeout,
+ AmqpRetryPolicy retry, boolean idempotentPartitionPublishing, PartitionPublishingState publishingState) {
+
+ Objects.requireNonNull(linkName, "'linkName' cannot be null.");
+ Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
+ Objects.requireNonNull(timeout, "'timeout' cannot be null.");
+ Objects.requireNonNull(retry, "'retry' cannot be null.");
+
+ Symbol[] desiredCapabilities = null;
+ Map properties = null;
+ if (idempotentPartitionPublishing) {
+ desiredCapabilities = new Symbol[]{ClientConstants.ENABLE_IDEMPOTENT_PRODUCER};
+
+ properties = new HashMap<>();
+ properties.put(ClientConstants.PRODUCER_EPOCH, publishingState.getOwnerLevel());
+ properties.put(ClientConstants.PRODUCER_ID, publishingState.getProducerGroupId());
+ properties.put(ClientConstants.PRODUCER_SEQUENCE_NUMBER, publishingState.getSequenceNumber());
+ }
+ return createProducer(linkName, entityPath, timeout, retry, properties, desiredCapabilities)
+ .cast(AmqpSendLink.class);
+ }
+
/**
* {@inheritDoc}
*/
@@ -88,11 +110,11 @@ public Mono createConsumer(String linkName, String entityPath,
final Map properties = new HashMap<>();
if (options.getOwnerLevel() != null) {
- properties.put(EPOCH, options.getOwnerLevel());
+ properties.put(ClientConstants.EPOCH, options.getOwnerLevel());
}
final Symbol[] desiredCapabilities = options.getTrackLastEnqueuedEventProperties()
- ? new Symbol[]{ENABLE_RECEIVER_RUNTIME_METRIC_NAME}
+ ? new Symbol[]{ClientConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME}
: null;
// Use explicit settlement via dispositions (not pre-settled)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java
index f609170d0c3c8..da771fc05af64 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java
@@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
+import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
@@ -35,4 +36,17 @@ public interface EventHubSession extends AmqpSession {
*/
Mono createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry,
EventPosition eventPosition, ReceiveOptions options);
+
+ /**
+ * Create a new AMQP producer.
+ * @param linkName Name of the sender link.
+ * @param entityPath The entity path this link connects to receive events.
+ * @param timeout Timeout required for creating and opening AMQP link.
+ * @param retry The retry policy to use when sending messages.
+ * @param publishingState Options to use when creating the producer.
+ * @return A newly created AMQP link.
+ */
+ Mono createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry,
+ boolean idempotentPartitionPublishing, PartitionPublishingState publishingState);
+
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingState.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingState.java
new file mode 100644
index 0000000000000..f730df02cc67b
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingState.java
@@ -0,0 +1,188 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs.implementation;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.PartitionPublishingProperties;
+
+import java.util.concurrent.Semaphore;
+
+
+/**
+ * Store the starting and running state of a partition, which an idempotent producer sends events to.
+ */
+public final class PartitionPublishingState {
+ private Short ownerLevel;
+ private Long producerGroupId;
+ private Integer sequenceNumber;
+
+ /**
+ * Indicate whether the state has ever been retrieved from the link, or it's just a pure client created object.
+ * Once the state is retrieved from the link, this value is always true.
+ */
+ private boolean fromLink = false;
+ // Idempotent producer requires all event data batches of a partition are sent out sequentially.
+ private final Semaphore semaphore = new Semaphore(1);
+
+ /**
+ * Create a PartitionPublishingState with producer group id, owner level and starting sequence number
+ * being {@code null}.
+ */
+ public PartitionPublishingState() {
+ }
+
+ public PartitionPublishingState(PartitionPublishingProperties partitionPublishingProperties) {
+ this(partitionPublishingProperties.getProducerGroupId(),
+ partitionPublishingProperties.getOwnerLevel(), partitionPublishingProperties.getSequenceNumber());
+ }
+
+ public PartitionPublishingState(PartitionPublishingState that) {
+ this(that.getProducerGroupId(), that.getOwnerLevel(), that.getSequenceNumber());
+ }
+
+ /**
+ * Create a PartitionPublishingState with the producer group id, owner level and starting sequence number.
+ *
+ * @param producerGroupId See {@link #getProducerGroupId()}}
+ * @param ownerLevel See {@link #getOwnerLevel()}
+ * @param sequenceNumber See {@link #getSequenceNumber()} ()}
+ */
+ public PartitionPublishingState(Long producerGroupId, Short ownerLevel, Integer sequenceNumber) {
+ this.ownerLevel = ownerLevel;
+ this.producerGroupId = producerGroupId;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ /**
+ * Gets the owner level that indicates a publishing is intended to be performed exclusively for events in the
+ * requested partition in the context of the associated producer group. To do so, publishing will attempt to assert
+ * ownership over the partition; in the case where more than one publisher in the producer group attempts to assert
+ * ownership for the same partition, the one having a larger owner level value will "win".
+ *
+ * When an owner level is specified, other exclusive publishers which have a lower owner level within the context of
+ * the same producer group will either not be allowed to be created or, if they already exist, will encounter an
+ * exception during the next attempted operation. Should there be multiple producers in the producer group with the
+ * same owner level, each of them will be able to publish to the partition.
+ *
+ * Producers with no owner level or which belong to a different producer group are permitted to publish to the
+ * associated partition without restriction or awareness of other exclusive producers.
+ *
+ * The owner level is only recognized and relevant when certain features of the producer are enabled. For example,
+ * it is used by idempotent publishing.
+ *
+ * An {@link com.azure.core.amqp.exception.AmqpException} will occur if an {@link EventHubProducerAsyncClient} or
+ * {@link EventHubProducerClient} is unable to publish events to the
+ * Event Hub partition for the given producer group id. In this case, the errorCondition of
+ * {@link com.azure.core.amqp.exception.AmqpException} will be set to
+ * {@link com.azure.core.amqp.exception.AmqpErrorCondition#PRODUCER_EPOCH_STOLEN}.
+ *
+ * @see EventHubClientBuilder#idempotentPartitionPublishing()
+ *
+ * @return The relative priority to associate with an exclusive publisher; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+
+ public PartitionPublishingProperties toPartitionPublishingProperties() {
+ return new PartitionPublishingProperties(producerGroupId, ownerLevel, sequenceNumber);
+ }
+
+ public Short getOwnerLevel() {
+ return ownerLevel;
+ }
+
+ /**
+ * Gets the identifier of the producer group that this producer is associated with when publishing to the
+ * associated partition. Events will be published in the context of this group.
+ *
+ * The producer group is only recognized and relevant when certain features of the producer are enabled.
+ * For example, it is used by idempotent publishing.
+ *
+ * @see EventHubClientBuilder#idempotentPartitionPublishing()
+ *
+ * @return The identifier of the producer group to associate with the partition; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+ public Long getProducerGroupId() {
+ return producerGroupId;
+ }
+
+ /**
+ * Get the starting number that should be used for the automatic sequencing of events for the associated partition,
+ * when published by this producer.
+ *
+ * The starting sequence number is only recognized and relevant when certain features of the producer are enabled.
+ * For example, it is used by idempotent publishing.
+ *
+ * @see EventHubClientBuilder#enableIdempotentPartitionPublishing() ()
+ *
+ * @return The starting sequence number to associate with the partition; if {@code null},
+ * the Event Hubs service will control the value.
+ */
+ public Integer getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * Set the owner level.
+ * @param ownerLevel The owner level of the idempotent producer.
+ */
+ public void setOwnerLevel(Short ownerLevel) {
+ this.ownerLevel = ownerLevel;
+ }
+
+ /**
+ * Set the producer group id.
+ * @param producerGroupId The producer group id of the idempotent producer.
+ */
+ public void setProducerGroupId(Long producerGroupId) {
+ this.producerGroupId = producerGroupId;
+ }
+
+ /**
+ * Set the sequence number.
+ * @param sequenceNumber The next publishing sequence number of a partition when an idempotent producer send
+ * an {@link EventData} to.
+ */
+ public void setSequenceNumber(Integer sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+
+ public void incrementSequenceNumber(int delta) {
+ this.setSequenceNumber(PartitionPublishingUtils.incrementSequenceNumber(this.sequenceNumber, delta));
+ }
+
+
+ /**
+ * An idempotent producer must sequentially send events to an EventHubs partition. This {@link Semaphore}
+ * is used to send sequentially.
+ * @return The {@link Semaphore} used to ensure the send to the partition sequentially.
+ */
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+
+ /**
+ * @return whether the state has ever been retrieved from the link, or it's just a pure client created object.
+ */
+ public boolean isFromLink() {
+ return fromLink;
+ }
+
+ public void setFromLink(boolean fromLink) {
+ this.fromLink = fromLink;
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionPublishingState{"
+ + "ownerLevel=" + ownerLevel
+ + ", producerGroupId=" + producerGroupId
+ + ", sequenceNumber=" + sequenceNumber
+ + '}';
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingUtils.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingUtils.java
new file mode 100644
index 0000000000000..b7421c99803a5
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/PartitionPublishingUtils.java
@@ -0,0 +1,32 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs.implementation;
+
+/**
+ * A util class for idempotent producer partition publishing.
+ */
+public class PartitionPublishingUtils {
+ /**
+ * Increase an int value. If the increased value is over {@link Integer#MAX_VALUE}, restart from 0.
+ * @param value The number to be incremented.
+ * @param delta The number is to be incremented by delta.
+ * @return The incremented value.
+ */
+ public static int incrementSequenceNumber(int value, int delta) {
+ if (Integer.MAX_VALUE - delta >= value) {
+ return value + delta;
+ } else {
+ return delta - (Integer.MAX_VALUE - value) - 1;
+ }
+ }
+
+ /**
+ * Increase an int value by 1. If the increased value is over {@link Integer#MAX_VALUE}, restart from 0.
+ * @param value The number to be incremented.
+ * @return The incremented value.
+ */
+ public static int incrementSequenceNumber(int value) {
+ return incrementSequenceNumber(value, 1);
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java
index 838025b1eff4e..7ef112501d8ea 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/PartitionEvent.java
@@ -27,7 +27,7 @@ public class PartitionEvent {
* @throws NullPointerException if {@code partitionContext} or {@code eventData} is {@code null}.
*/
public PartitionEvent(final PartitionContext partitionContext, final EventData eventData,
- LastEnqueuedEventProperties lastEnqueuedEventProperties) {
+ LastEnqueuedEventProperties lastEnqueuedEventProperties) {
this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null");
this.eventData = Objects.requireNonNull(eventData, "'eventData' cannot be null");
this.lastEnqueuedEventProperties = lastEnqueuedEventProperties;
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java
index 0386dee075a72..647e65473054c 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/module-info.java
@@ -4,6 +4,7 @@
module com.azure.messaging.eventhubs {
requires transitive com.azure.core;
requires transitive com.azure.core.amqp;
+ requires transitive com.azure.core.experimental;
requires com.microsoft.azure.qpid.protonj.extensions;
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java
index ea85afedd71f0..c8e52a08f9bb6 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java
@@ -33,7 +33,7 @@ public void setup() {
@Test
public void nullEventData() {
- assertThrows(IllegalArgumentException.class, () -> {
+ assertThrows(NullPointerException.class, () -> {
final EventDataBatch batch = new EventDataBatch(1024, null, PARTITION_KEY, null, null, null, null);
batch.tryAdd(null);
});
@@ -90,4 +90,21 @@ public void setsPartitionId() {
Assertions.assertEquals(partitionId, batch.getPartitionId());
Assertions.assertEquals(0, batch.getEvents().size());
}
+
+ /**
+ * Verify that a batch created by an idempotent producer has larger size than a normal batch
+ */
+ @Test
+ public void preAllocateForIdempotentProducer() {
+ final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY,
+ null, new TracerProvider(Collections.emptyList()), null, null);
+ batch.tryAdd(new EventData(new byte[1024]));
+ int size = batch.getSizeInBytes();
+
+ final EventDataBatch batchIdempotent = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY,
+ null, new TracerProvider(Collections.emptyList()), null, null, true);
+ batchIdempotent.tryAdd(new EventData(new byte[1024]));
+
+ Assertions.assertTrue(batchIdempotent.getSizeInBytes() > batch.getSizeInBytes());
+ }
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java
index 957fa0346e5ff..0dd477ef743a6 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClientTest.java
@@ -139,7 +139,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS
.subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
"event-hub-name", connectionOptions.getRetry()));
- consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer,
+ consumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null,
CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed);
}
@@ -159,7 +159,7 @@ void teardown() {
@Test
void lastEnqueuedEventInformationIsNull() {
final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, parallelScheduler, false, onClientClosed);
+ connectionProcessor, messageSerializer, null, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, parallelScheduler, false, onClientClosed);
final int numberOfEvents = 10;
when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents);
final int numberToReceive = 3;
@@ -182,7 +182,7 @@ void lastEnqueuedEventInformationIsNull() {
void lastEnqueuedEventInformationCreated() {
// Arrange
final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, messageSerializer, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, Schedulers.parallel(), false, onClientClosed);
+ connectionProcessor, messageSerializer, null, CONSUMER_GROUP, DEFAULT_PREFETCH_COUNT, Schedulers.parallel(), false, onClientClosed);
final int numberOfEvents = 10;
final ReceiveOptions receiveOptions = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true);
when(amqpReceiveLink.getCredits()).thenReturn(numberOfEvents);
@@ -235,7 +235,7 @@ void receivesNumberOfEventsAllowsBlock() throws InterruptedException {
// Scheduling on elastic to simulate a user passed in scheduler (this is the default in EventHubClientBuilder).
final EventHubConsumerAsyncClient myConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.elastic(), false, onClientClosed);
+ connectionProcessor, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.elastic(), false, onClientClosed);
final Flux eventsFlux = myConsumer.receiveFromPartition(PARTITION_ID, EventPosition.earliest())
.take(numberOfEvents);
@@ -297,7 +297,7 @@ void returnsNewListener() {
any(ReceiveOptions.class))).thenReturn(Mono.just(link2), Mono.just(link3));
EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, testScheduler, false, onClientClosed);
+ eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, testScheduler, false, onClientClosed);
// Act & Assert
StepVerifier.create(asyncClient.receiveFromPartition(PARTITION_ID, EventPosition.earliest()).take(numberOfEvents))
@@ -520,7 +520,7 @@ void receivesMultiplePartitions() {
.thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions)));
EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed);
+ eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, parallelScheduler, false, onClientClosed);
EmitterProcessor processor2 = EmitterProcessor.create();
FluxSink processor2sink = processor2.sink();
@@ -595,7 +595,7 @@ void receivesMultiplePartitionsWhenOneCloses() {
.thenReturn(Mono.just(new EventHubProperties(EVENT_HUB_NAME, Instant.EPOCH, partitions)));
EventHubConsumerAsyncClient asyncClient = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
+ eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
EmitterProcessor processor2 = EmitterProcessor.create();
FluxSink processor2sink = processor2.sink();
@@ -655,7 +655,7 @@ void doesNotCloseSharedConnection() {
EventHubConnectionProcessor eventHubConnection = Flux.create(sink -> sink.next(connection1))
.subscribeWith(new EventHubConnectionProcessor(HOSTNAME, EVENT_HUB_NAME, retryOptions));
EventHubConsumerAsyncClient sharedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- eventHubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), true, onClientClosed);
+ eventHubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), true, onClientClosed);
// Act
sharedConsumer.close();
@@ -675,7 +675,7 @@ void closesDedicatedConnection() {
// Arrange
EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class);
EventHubConsumerAsyncClient dedicatedConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- hubConnection, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
+ hubConnection, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
// Act
dedicatedConsumer.close();
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java
index def755ce76841..ce0c2ff564dd7 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerClientTest.java
@@ -121,7 +121,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS
}));
asyncConsumer = new EventHubConsumerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
+ connectionProcessor, messageSerializer, null, CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
consumer = new EventHubConsumerClient(asyncConsumer, Duration.ofSeconds(10));
}
@@ -145,8 +145,8 @@ public static void dispose() {
public void lastEnqueuedEventInformationIsNull() {
// Arrange
final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(
- HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP,
- PREFETCH, Schedulers.parallel(), false, onClientClosed);
+ HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null,
+ CONSUMER_GROUP, PREFETCH, Schedulers.parallel(), false, onClientClosed);
final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5));
final int numberOfEvents = 10;
sendMessages(sink, numberOfEvents, PARTITION_ID);
@@ -175,8 +175,8 @@ public void lastEnqueuedEventInformationCreated() {
// Arrange
final ReceiveOptions options = new ReceiveOptions().setTrackLastEnqueuedEventProperties(true);
final EventHubConsumerAsyncClient runtimeConsumer = new EventHubConsumerAsyncClient(
- HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, CONSUMER_GROUP, PREFETCH,
- Schedulers.parallel(), false, onClientClosed);
+ HOSTNAME, EVENT_HUB_NAME, connectionProcessor, messageSerializer, null, CONSUMER_GROUP,
+ PREFETCH, Schedulers.parallel(), false, onClientClosed);
final EventHubConsumerClient consumer = new EventHubConsumerClient(runtimeConsumer, Duration.ofSeconds(5));
final int numberOfEvents = 10;
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java
index 3510200950493..051105680623d 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java
@@ -8,8 +8,11 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
+import com.azure.core.experimental.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.serializer.ObjectSerializer;
+import com.azure.core.util.serializer.TypeReference;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
@@ -30,9 +33,12 @@
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
@@ -117,7 +123,7 @@ void receivesMessages(boolean trackLastEnqueuedProperties) {
// Arrange
linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection));
consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME,
- CONSUMER_GROUP, PARTITION_ID, currentPosition, trackLastEnqueuedProperties, Schedulers.parallel());
+ CONSUMER_GROUP, PARTITION_ID, null, currentPosition, trackLastEnqueuedProperties, Schedulers.parallel());
final EventData event1 = new EventData("Foo");
final EventData event2 = new EventData("Bar");
@@ -165,14 +171,17 @@ void receiveMultipleTimes() {
// Arrange
linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection));
consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME,
- CONSUMER_GROUP, PARTITION_ID, currentPosition, false, Schedulers.parallel());
+ CONSUMER_GROUP, PARTITION_ID, null, currentPosition, false, Schedulers.parallel());
final Message message3 = mock(Message.class);
final String secondOffset = "54";
final String lastOffset = "65";
- final EventData event1 = new EventData("Foo".getBytes(), getSystemProperties("25", 14), Context.NONE);
- final EventData event2 = new EventData("Bar".getBytes(), getSystemProperties(secondOffset, 21), Context.NONE);
- final EventData event3 = new EventData("Baz".getBytes(), getSystemProperties(lastOffset, 53), Context.NONE);
+ final EventData event1 = new EventData(BinaryData.fromBytes("Foo".getBytes()), getSystemProperties("25", 14),
+ Context.NONE);
+ final EventData event2 = new EventData(BinaryData.fromBytes("Bar".getBytes()),
+ getSystemProperties(secondOffset, 21), Context.NONE);
+ final EventData event3 = new EventData(BinaryData.fromBytes("Baz".getBytes()),
+ getSystemProperties(lastOffset, 53), Context.NONE);
when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event1);
when(messageSerializer.deserialize(same(message2), eq(EventData.class))).thenReturn(event2);
@@ -224,6 +233,70 @@ void receiveMultipleTimes() {
Assertions.assertFalse(actual.isInclusive());
}
+ @Test
+ void receiveAndDeserialize() {
+ // just a test value
+ Object o = 0;
+ ObjectSerializer testSerializer = new ObjectSerializer() {
+ @Override
+ public T deserialize(InputStream inputStream, TypeReference typeReference) {
+ return deserializeAsync(inputStream, typeReference).block();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Mono deserializeAsync(InputStream inputStream, TypeReference typeReference) {
+ if (typeReference.getJavaType().getTypeName().equals(o.getClass().getTypeName())) {
+ return Mono.just((T) o);
+ }
+ return null;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream, Object o) {
+ this.serializeAsync(outputStream, o).block();
+ }
+
+ @Override
+ public Mono serializeAsync(OutputStream outputStream, Object o) {
+ return Mono.empty();
+ }
+
+ };
+
+ // Arrange
+ linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection));
+ consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME,
+ CONSUMER_GROUP, PARTITION_ID, testSerializer, currentPosition, false, Schedulers.parallel());
+
+ final EventData event = new EventData("Foo");
+ final LastEnqueuedEventProperties last = new LastEnqueuedEventProperties(10L, 15L,
+ Instant.ofEpochMilli(1243454), Instant.ofEpochMilli(1240004));
+
+ when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event);
+ when(messageSerializer.deserialize(same(message1), eq(LastEnqueuedEventProperties.class))).thenReturn(last);
+
+ // Act & Assert
+ StepVerifier.create(consumer.receive())
+ .then(() -> {
+ messageProcessorSink.next(message1);
+ })
+ .assertNext(partitionEvent -> {
+ verifyPartitionContext(partitionEvent.getPartitionContext());
+ verifyLastEnqueuedInformation(false, last,
+ partitionEvent.getLastEnqueuedEventProperties());
+ Assertions.assertSame(event, partitionEvent.getData());
+ Assertions.assertSame(Integer.class.cast(o),
+ partitionEvent.getData().getDeserializedObject(Integer.class));
+ })
+ .thenCancel()
+ .verify();
+
+ // The emitter processor is not closed until the partition consumer is.
+ Assertions.assertFalse(linkProcessor.isTerminated());
+ Assertions.assertSame(originalPosition, currentPosition.get().get());
+ }
+
/**
* Verifies that the consumer closes and completes any listeners on a shutdown signal.
@@ -233,14 +306,17 @@ void listensToShutdownSignals() throws InterruptedException {
// Arrange
linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection));
consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME,
- CONSUMER_GROUP, PARTITION_ID, currentPosition, false, Schedulers.parallel());
+ CONSUMER_GROUP, PARTITION_ID, null, currentPosition, false, Schedulers.parallel());
final Message message3 = mock(Message.class);
final String secondOffset = "54";
final String lastOffset = "65";
- final EventData event1 = new EventData("Foo".getBytes(), getSystemProperties("25", 14), Context.NONE);
- final EventData event2 = new EventData("Bar".getBytes(), getSystemProperties(secondOffset, 21), Context.NONE);
- final EventData event3 = new EventData("Baz".getBytes(), getSystemProperties(lastOffset, 53), Context.NONE);
+ final EventData event1 = new EventData(BinaryData.fromBytes("Foo".getBytes()), getSystemProperties("25", 14),
+ Context.NONE);
+ final EventData event2 = new EventData(BinaryData.fromBytes("Bar".getBytes()), getSystemProperties(secondOffset,
+ 21), Context.NONE);
+ final EventData event3 = new EventData(BinaryData.fromBytes("Baz".getBytes()), getSystemProperties(lastOffset,
+ 53), Context.NONE);
when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event1);
when(messageSerializer.deserialize(same(message2), eq(EventData.class))).thenReturn(event2);
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIdempotentTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIdempotentTest.java
new file mode 100644
index 0000000000000..5e40133509f44
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIdempotentTest.java
@@ -0,0 +1,438 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.AmqpEndpointState;
+import com.azure.core.amqp.AmqpMessageConstant;
+import com.azure.core.amqp.AmqpRetryMode;
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.amqp.ProxyOptions;
+import com.azure.core.amqp.implementation.AmqpSendLink;
+import com.azure.core.amqp.implementation.CbsAuthorizationType;
+import com.azure.core.amqp.implementation.ConnectionOptions;
+import com.azure.core.amqp.implementation.MessageSerializer;
+import com.azure.core.amqp.implementation.TracerProvider;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.experimental.util.BinaryData;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.serializer.ObjectSerializer;
+import com.azure.core.util.serializer.TypeReference;
+import com.azure.messaging.eventhubs.implementation.ClientConstants;
+import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
+import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
+import com.azure.messaging.eventhubs.implementation.PartitionPublishingState;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.message.Message;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.test.StepVerifier;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+class EventHubProducerAsyncClientIdempotentTest {
+ private static final String HOSTNAME = "something.servicebus.windows.net";
+ private static final String EVENT_HUB_NAME = "anEventHub";
+ private static final String PARTITION_0 = "0";
+ private static final String PARTITION_1 = "1";
+ private static final String ENTITY_PATH_0 = EVENT_HUB_NAME + "/Partitions/" + PARTITION_0;
+ private static final String ENTITY_PATH_1 = EVENT_HUB_NAME + "/Partitions/" + PARTITION_1;
+ private static final String TEST_CONNECTION_STRING = "Endpoint=sb://something.servicebus.windows.net/;"
+ + "SharedAccessKeyName=anAccessKeyName;"
+ + "SharedAccessKey=anAccessKey;EntityPath=anEventHub";
+
+ private static final Long PRODUCER_GROUP_ID = 1L;
+ private static final Short PRODUCER_OWNER_LEVEL = (short) 10;
+ private static final Integer PRODUCER_SEQ_NUMBER = 100;
+
+ @Mock
+ private AmqpSendLink sendLink;
+
+ @Mock
+ private AmqpSendLink sendLink2;
+
+ @Mock
+ private EventHubAmqpConnection connection;
+
+ @Mock
+ private TokenCredential tokenCredential;
+ @Mock
+ private Runnable onClientClosed;
+
+ private final MessageSerializer messageSerializer = new EventHubMessageSerializer();
+ private final AmqpRetryOptions retryOptions = new AmqpRetryOptions()
+ .setDelay(Duration.ofMillis(500))
+ .setMode(AmqpRetryMode.FIXED)
+ .setTryTimeout(Duration.ofSeconds(5))
+ .setMaxRetries(2);
+ private final DirectProcessor endpointProcessor = DirectProcessor.create();
+ private final FluxSink endpointSink = endpointProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+ private EventHubProducerAsyncClient producer;
+ private EventHubConnectionProcessor connectionProcessor;
+ private TracerProvider tracerProvider;
+ private ConnectionOptions connectionOptions;
+ private final Scheduler testScheduler = Schedulers.newElastic("test");
+
+ private PartitionPublishingProperties partition0InitialState;
+ private Map initialStates = new HashMap<>();
+ private ObjectSerializer objectSerializer;
+
+ @BeforeAll
+ static void beforeAll() {
+ StepVerifier.setDefaultTimeout(Duration.ofSeconds(30));
+ }
+
+ @AfterAll
+ static void afterAll() {
+ StepVerifier.resetDefaultTimeout();
+ }
+
+ @BeforeEach
+ void setup(TestInfo testInfo) {
+ MockitoAnnotations.initMocks(this);
+
+ tracerProvider = new TracerProvider(Collections.emptyList());
+ connectionOptions = new ConnectionOptions(HOSTNAME, tokenCredential,
+ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, retryOptions,
+ ProxyOptions.SYSTEM_DEFAULTS, testScheduler, new ClientOptions(), SslDomain.VerifyMode.VERIFY_PEER);
+
+ when(connection.getEndpointStates()).thenReturn(endpointProcessor);
+ endpointSink.next(AmqpEndpointState.ACTIVE);
+
+ connectionProcessor = Mono.fromCallable(() -> connection).repeat(10).subscribeWith(
+ new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
+ EVENT_HUB_NAME, connectionOptions.getRetry()));
+
+ partition0InitialState = new PartitionPublishingProperties(PRODUCER_GROUP_ID, PRODUCER_OWNER_LEVEL, PRODUCER_SEQ_NUMBER);
+ initialStates = new HashMap<>();
+ initialStates.put(PARTITION_0, partition0InitialState);
+
+ Map internalStates = new HashMap<>();
+ initialStates.forEach((k, v) -> internalStates.put(k, new PartitionPublishingState(v)));
+ objectSerializer = new ObjectSerializer() {
+ private Object o = new Object();
+ @Override
+ public T deserialize(InputStream inputStream, TypeReference typeReference) {
+ return deserializeAsync(inputStream, typeReference).block();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Mono deserializeAsync(InputStream inputStream, TypeReference typeReference) {
+ if (typeReference.getJavaType().getTypeName().equals(o.getClass().getTypeName())) {
+ return Mono.just((T) o);
+ }
+ return null;
+ }
+
+ @Override
+ public void serialize(OutputStream outputStream, Object o) {
+ this.serializeAsync(outputStream, o).block();
+ }
+
+ @Override
+ public Mono serializeAsync(OutputStream outputStream, Object o) {
+ return Mono.empty();
+ }
+
+ };
+ producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
+ tracerProvider, messageSerializer, objectSerializer, testScheduler, false, onClientClosed,
+ true, internalStates);
+
+ Map remoteProperties = new HashMap<>();
+ remoteProperties.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getOwnerLevel());
+ remoteProperties.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getSequenceNumber());
+ remoteProperties.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getProducerGroupId());
+
+ when(connection.createSendLink(eq(ENTITY_PATH_0), eq(ENTITY_PATH_0),
+ eq(retryOptions), eq(true), any(PartitionPublishingState.class))).thenReturn(Mono.just(sendLink));
+ when(sendLink.getRemoteProperties()).thenReturn(
+ Mono.just(remoteProperties));
+ when(sendLink.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
+ when(sendLink.send(any(Message.class))).thenReturn(Mono.empty());
+ when(sendLink.send(anyList())).thenReturn(Mono.empty());
+ }
+
+ @AfterEach
+ void teardown(TestInfo testInfo) {
+ testScheduler.dispose();
+ Mockito.framework().clearInlineMocks();
+ Mockito.reset(sendLink, connection);
+ }
+
+ @Test
+ void buildClientIllegalArgument() {
+ assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
+ .connectionString(TEST_CONNECTION_STRING)
+ .initialPartitionPublishingStates(initialStates) // Not an idempotent producer. Shouldn't set.
+ .buildAsyncProducerClient());
+ }
+
+ @Test
+ void getPartitionPublishingProperties() {
+ StepVerifier.create(producer.getPartitionPublishingProperties(PARTITION_0))
+ .assertNext(properties -> {
+ assertEquals(properties.getOwnerLevel(), partition0InitialState.getOwnerLevel());
+ assertEquals(properties.getProducerGroupId(), partition0InitialState.getProducerGroupId());
+ assertEquals(properties.getSequenceNumber(), partition0InitialState.getSequenceNumber());
+ })
+ .verifyComplete();
+ }
+
+ @Test
+ void createEventDataBatch() {
+ CreateBatchOptions options = new CreateBatchOptions();
+ options.setPartitionId(PARTITION_0);
+ StepVerifier.create(producer.createBatch(options))
+ .assertNext(eventDataBatch -> {
+ assertNull(eventDataBatch.getStartingPublishedSequenceNumber());
+ })
+ .verifyComplete();
+ }
+
+ @Test
+ void createBatchWithoutPartitionId() {
+ StepVerifier.create(producer.createBatch())
+ .expectError(IllegalArgumentException.class)
+ .verify();
+ }
+
+ @Test
+ void sendEventDataBatch() {
+ CreateBatchOptions options = new CreateBatchOptions();
+ options.setPartitionId(PARTITION_0);
+ EventDataBatch batch = producer.createBatch(options).block();
+ assertNotNull(batch);
+ EventData eventData = new EventData("This is a test event");
+ batch.tryAdd(eventData);
+ StepVerifier.create(producer.send(batch)).verifyComplete();
+ assertEquals(eventData.getSystemProperties().get(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()),
+ PRODUCER_GROUP_ID);
+ assertEquals(eventData.getSystemProperties().get(AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()),
+ PRODUCER_OWNER_LEVEL);
+ assertEquals(eventData.getSystemProperties().get(
+ AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()),
+ PRODUCER_SEQ_NUMBER);
+ assertEquals(batch.getStartingPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+
+ StepVerifier.create(producer.getPartitionPublishingState(PARTITION_0))
+ .assertNext(state -> {
+ assertEquals(state.getSequenceNumber(), PRODUCER_SEQ_NUMBER + batch.getCount());
+ }).verifyComplete();
+ }
+
+ @Test
+ void sendEventList() {
+ EventData eventData = new EventData("This is a test event");
+ List eventDataList = new ArrayList<>();
+ eventDataList.add(eventData);
+ StepVerifier.create(producer.send(eventDataList, new SendOptions().setPartitionId(PARTITION_0))).verifyComplete();
+ assertEquals(eventData.getSystemProperties().get(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()),
+ PRODUCER_GROUP_ID);
+ assertEquals(eventData.getSystemProperties().get(AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()),
+ PRODUCER_OWNER_LEVEL);
+ assertEquals(eventData.getSystemProperties().get(
+ AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()),
+ PRODUCER_SEQ_NUMBER);
+
+ StepVerifier.create(producer.getPartitionPublishingState(PARTITION_0))
+ .assertNext(state -> {
+ assertEquals(state.getSequenceNumber(), PRODUCER_SEQ_NUMBER + eventDataList.size());
+ }).verifyComplete();
+ }
+
+ @Test
+ void sendEventDataListFail() {
+ Map remoteProperties1 = new HashMap<>();
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getOwnerLevel());
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getSequenceNumber());
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getProducerGroupId());
+
+ when(connection.createSendLink(eq(ENTITY_PATH_1), eq(ENTITY_PATH_1),
+ eq(retryOptions), eq(true), any(PartitionPublishingState.class))).thenReturn(Mono.just(sendLink2));
+ when(sendLink2.getRemoteProperties()).thenReturn(
+ Mono.just(remoteProperties1));
+ when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
+ when(sendLink2.send(any(Message.class))).thenReturn(Mono.error(new RuntimeException("simulated error")));
+ when(sendLink2.send(anyList())).thenReturn(Mono.error(new RuntimeException("simulated error")));
+
+ EventData eventData = new EventData("This is a test event");
+ List eventDataList = new ArrayList<>();
+ eventDataList.add(eventData);
+ StepVerifier.create(producer.send(eventDataList)).expectError(RuntimeException.class).verify();
+ StepVerifier.create(producer.getPartitionPublishingProperties("1"))
+ .assertNext(publishingProperties -> {
+ assertEquals(publishingProperties.getProducerGroupId(), 1);
+ assertEquals(publishingProperties.getOwnerLevel(), (short) 10);
+ assertEquals(publishingProperties.getSequenceNumber(), 100);
+ })
+ .verifyComplete();
+ assertNull(eventData.getPublishedGroupId());
+ assertNull(eventData.getPublishedOwnerLevel());
+ assertNull(eventData.getPublishedSequenceNumber());
+ }
+
+ @Test
+ void sendEventDataListWithoutPartition() {
+ EventData eventData = new EventData("This is a test event");
+ List eventDataList = new ArrayList<>();
+ eventDataList.add(eventData);
+ StepVerifier.create(producer.send(eventDataList)).verifyError(IllegalArgumentException.class);
+ }
+
+ @Test
+ void sendEventBatchesToSamePartitionConcurrently() {
+ CreateBatchOptions options = new CreateBatchOptions();
+ options.setPartitionId(PARTITION_0);
+ EventDataBatch batch1 = producer.createBatch(options).block();
+ assertNotNull(batch1);
+ EventData eventData1 = new EventData("This is a test event");
+ batch1.tryAdd(eventData1);
+
+ EventDataBatch batch2 = producer.createBatch(options).block();
+ assertNotNull(batch2);
+ EventData eventData2 = new EventData("This is a test event");
+ batch2.tryAdd(eventData2);
+
+ assertNull(eventData1.getPublishedSequenceNumber());
+ assertNull(eventData1.getPublishedGroupId());
+ assertNull(eventData1.getPublishedOwnerLevel());
+ assertNull(batch1.getStartingPublishedSequenceNumber());
+
+ StepVerifier.create(Mono.when(producer.send(batch1), producer.send(batch2))).verifyComplete();
+
+ assertEquals(eventData1.getPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+ assertEquals(eventData1.getPublishedGroupId(), PRODUCER_GROUP_ID);
+ assertEquals(eventData1.getPublishedOwnerLevel(), PRODUCER_OWNER_LEVEL);
+ assertEquals(batch1.getStartingPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+
+ assertEquals(eventData2.getPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER + 1);
+ assertEquals(eventData2.getPublishedGroupId(), PRODUCER_GROUP_ID);
+ assertEquals(eventData2.getPublishedOwnerLevel(), PRODUCER_OWNER_LEVEL);
+ assertEquals(batch2.getStartingPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER + 1);
+
+ StepVerifier.create(producer.getPartitionPublishingState(PARTITION_0))
+ .assertNext(state -> {
+ assertEquals(state.getSequenceNumber(), PRODUCER_SEQ_NUMBER + batch1.getCount() + batch2.getCount());
+ }).verifyComplete();
+ }
+
+ @Test
+ void sendEventBatchesToTwoPartitionsConcurrently() {
+ Map remoteProperties1 = new HashMap<>();
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_EPOCH_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getOwnerLevel());
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getSequenceNumber());
+ remoteProperties1.put(
+ Symbol.getSymbol(AmqpMessageConstant.PRODUCER_ID_ANNOTATION_NAME.getValue()),
+ partition0InitialState.getProducerGroupId());
+
+ when(connection.createSendLink(eq(ENTITY_PATH_1), eq(ENTITY_PATH_1),
+ eq(retryOptions), eq(true), any(PartitionPublishingState.class))).thenReturn(Mono.just(sendLink2));
+ when(sendLink2.getRemoteProperties()).thenReturn(
+ Mono.just(remoteProperties1));
+ when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
+ when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());
+ when(sendLink2.send(anyList())).thenReturn(Mono.empty());
+
+ CreateBatchOptions options = new CreateBatchOptions();
+ options.setPartitionId(PARTITION_0);
+ EventDataBatch batch1 = producer.createBatch(options).block();
+ assertNotNull(batch1);
+ EventData eventData1 = new EventData("This is a test event");
+ batch1.tryAdd(eventData1);
+ try {
+ EventDataBatch batch2 = producer.createBatch(new CreateBatchOptions()
+ .setPartitionId(PARTITION_1)).block();
+ assertNotNull(batch2);
+ EventData eventData2 = new EventData("This is a test event");
+ batch2.tryAdd(eventData2);
+
+ StepVerifier.create(Mono.when(producer.send(batch1), producer.send(batch2)))
+ .verifyComplete();
+
+ assertEquals(eventData1.getPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+ assertEquals(eventData2.getPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+
+ StepVerifier.create(producer.getPartitionPublishingState(PARTITION_0))
+ .assertNext(state -> {
+ assertEquals(state.getSequenceNumber(), PRODUCER_SEQ_NUMBER + batch1.getCount());
+ }).verifyComplete();
+
+ StepVerifier.create(producer.getPartitionPublishingState(PARTITION_1))
+ .assertNext(state -> {
+ assertEquals(state.getSequenceNumber(), PRODUCER_SEQ_NUMBER + batch2.getCount());
+ }).verifyComplete();
+ } finally {
+ Mockito.reset(sendLink2);
+ }
+ }
+
+ @Test
+ void sendObjectBatch() {
+
+ EventDataBatch eventDataBatch = producer.createBatch(new CreateBatchOptions().setPartitionId(PARTITION_0))
+ .block();
+
+ eventDataBatch.tryAdd(new EventData(BinaryData.fromObject(new Object(), objectSerializer)));
+ eventDataBatch.tryAdd(new EventData(BinaryData.fromObject(new Object(), objectSerializer)));
+ StepVerifier.create(producer.send(eventDataBatch))
+ .verifyComplete();
+ assertEquals(eventDataBatch.getStartingPublishedSequenceNumber(), PRODUCER_SEQ_NUMBER);
+
+ StepVerifier.create(producer.getPartitionPublishingProperties(PARTITION_0))
+ .assertNext(properties -> {
+ assertEquals(properties.getOwnerLevel(), PRODUCER_OWNER_LEVEL);
+ assertEquals(properties.getProducerGroupId(), PRODUCER_GROUP_ID);
+ assertEquals(properties.getSequenceNumber(), PRODUCER_SEQ_NUMBER + eventDataBatch.getCount());
+ }).verifyComplete();
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java
index 792b814ce6b06..f3df1abca0c11 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java
@@ -149,7 +149,7 @@ void setup(TestInfo testInfo) {
new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
"event-hub-path", connectionOptions.getRetry()));
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, testScheduler, false, onClientClosed);
+ tracerProvider, messageSerializer, null, testScheduler, false, onClientClosed);
when(sendLink.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
when(sendLink2.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
@@ -241,7 +241,7 @@ void sendSingleMessageWithBlock() throws InterruptedException {
final Semaphore semaphore = new Semaphore(1);
// In our actual client builder, we allow this.
final EventHubProducerAsyncClient flexibleProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, testScheduler,
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, testScheduler,
false, onClientClosed);
// EC is the prefix they use when creating a link that sends to the service round-robin.
@@ -322,7 +322,7 @@ void sendStartSpanSingleMessage() {
final SendOptions sendOptions = new SendOptions()
.setPartitionId(partitionId);
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
when(connection.createSendLink(
@@ -380,7 +380,7 @@ void sendMessageRetrySpanTest() {
TracerProvider tracerProvider = new TracerProvider(tracers);
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final String failureKey = "fail";
final EventData testData = new EventData("test");
@@ -519,7 +519,7 @@ void startMessageSpansOnCreateBatch() {
final List tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
final AmqpSendLink link = mock(AmqpSendLink.class);
@@ -825,7 +825,7 @@ void doesNotCloseSharedConnection() {
// Arrange
EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class);
EventHubProducerAsyncClient sharedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
true, onClientClosed);
// Act
@@ -844,7 +844,7 @@ void closesDedicatedConnection() {
// Arrange
EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class);
EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
// Act
@@ -863,7 +863,7 @@ void closesDedicatedConnectionOnlyOnce() {
// Arrange
EventHubConnectionProcessor hubConnection = mock(EventHubConnectionProcessor.class);
EventHubProducerAsyncClient dedicatedProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- hubConnection, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ hubConnection, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
// Act
@@ -900,7 +900,7 @@ void reopensOnFailure() {
new EventHubConnectionProcessor(EVENT_HUB_NAME, connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getRetry()));
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final int count = 4;
final byte[] contents = TEST_CONTENTS.getBytes(UTF_8);
@@ -975,7 +975,7 @@ void closesOnNonTransientFailure() {
new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
EVENT_HUB_NAME, connectionOptions.getRetry()));
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final int count = 4;
final byte[] contents = TEST_CONTENTS.getBytes(UTF_8);
@@ -1051,7 +1051,7 @@ void resendMessageOnTransientLinkFailure() {
new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
EVENT_HUB_NAME, connectionOptions.getRetry()));
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final int count = 4;
final byte[] contents = TEST_CONTENTS.getBytes(UTF_8);
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java
index 54232602fa900..27a13cb22d053 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java
@@ -108,7 +108,7 @@ public void setup() {
.subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
"event-hub-path", connectionOptions.getRetry()));
asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
- tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
}
@@ -160,7 +160,7 @@ public void sendStartSpanSingleMessage() {
final List tracers = Collections.singletonList(tracer1);
final TracerProvider tracerProvider = new TracerProvider(tracers);
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));
@@ -222,7 +222,7 @@ public void sendMessageRetrySpanTest() {
.thenReturn(Mono.just(sendLink));
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventData eventData = new EventData("hello-world".getBytes(UTF_8))
.addContext(SPAN_CONTEXT_KEY, Context.NONE);
@@ -270,7 +270,7 @@ public void sendEventsExceedsBatchSize() {
when(sendLink.getLinkSize()).thenReturn(Mono.just(1024));
TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(), false, onClientClosed);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
//Act & Assert
@@ -372,7 +372,7 @@ public void startsMessageSpanOnEventBatch() {
final List tracers = Collections.singletonList(tracer1);
final TracerProvider tracerProvider = new TracerProvider(tracers);
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
- connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(),
+ connectionProcessor, retryOptions, tracerProvider, messageSerializer, null, Schedulers.parallel(),
false, onClientClosed);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/PartitionPublishigUtilsTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/PartitionPublishigUtilsTest.java
new file mode 100644
index 0000000000000..e3ba7cc26502e
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/PartitionPublishigUtilsTest.java
@@ -0,0 +1,43 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs.implementation;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PartitionPublishigUtilsTest {
+ @ParameterizedTest
+ @MethodSource("increaseNumberParameterProvider")
+ void increaseNumber(int value, int delta, int expected) {
+ assertEquals(PartitionPublishingUtils.incrementSequenceNumber(value, delta), expected);
+ }
+
+ static Stream increaseNumberParameterProvider() {
+ return Stream.of(
+ Arguments.arguments(0, 10, 10),
+ Arguments.arguments(Integer.MAX_VALUE, 10, 9),
+ Arguments.arguments(Integer.MAX_VALUE - 3, 10, 6),
+ Arguments.arguments(0, 0, 0)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("increaseNumberByOneParameterProvider")
+ void increaseNumberByOne(int value, int expected) {
+ assertEquals(PartitionPublishingUtils.incrementSequenceNumber(value), expected);
+ }
+
+ static Stream increaseNumberByOneParameterProvider() {
+ return Stream.of(
+ Arguments.arguments(0, 1),
+ Arguments.arguments(Integer.MAX_VALUE, 0),
+ Arguments.arguments(Integer.MAX_VALUE - 1, Integer.MAX_VALUE)
+ );
+ }
+}
diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml
index 8852a04531fd5..999ee1b508284 100644
--- a/sdk/servicebus/azure-messaging-servicebus/pom.xml
+++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml
@@ -52,7 +52,7 @@
com.azure
azure-core-amqp
- 1.7.0-beta.1
+ 1.7.0-beta.2
com.azure