Skip to content

Commit

Permalink
Preparing Event Hubs 5.4.0-beta.1 release (#17502)
Browse files Browse the repository at this point in the history
  • Loading branch information
srnagar authored Nov 13, 2020
1 parent 6e39307 commit a1fa175
Show file tree
Hide file tree
Showing 45 changed files with 1,781 additions and 149 deletions.
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
4 changes: 1 addition & 3 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ com.azure:azure-communication-common;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-administration;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-sms;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.10.0;1.11.0-beta.1
com.azure:azure-core-amqp;1.6.0;1.7.0-beta.2
com.azure:azure-core-amqp;1.7.0-beta.2;1.7.0-beta.3
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.8;1.0.0-beta.9
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down Expand Up @@ -178,7 +178,6 @@ com.microsoft:microsoft-opentelemetry-exporter-azuremonitor;1.0.0-beta.1;1.0.0-b
# note: The unreleased dependencies will not be manipulated with the automatic PR creation code.
unreleased_com.azure:azure-core-experimental;1.0.0-beta.9
unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.7
unreleased_com.azure:azure-messaging-eventhubs;5.3.0

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
Expand All @@ -189,4 +188,3 @@ unreleased_com.azure:azure-messaging-eventhubs;5.3.0
# note: Released beta versions will not be manipulated with the automatic PR creation code.
beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.1
beta_com.azure:azure-storage-common;12.9.0-beta.1
beta_com.azure:azure-core-amqp;1.7.0-beta.1
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
7 changes: 6 additions & 1 deletion sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History
## 1.7.0-beta.2 (Unreleased)
## 1.7.0-beta.3 (Unreleased)

## 1.7.0-beta.2 (2020-11-10)
### New Features
- Optionally enable idempotency of a send link to send AMQP messages with producer group id, producer owner level and
producer sequence number in the message annotations.

## 1.7.0-beta.1 (2020-11-03)
### Dependency Updates
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public enum AmqpMessageConstant {
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),

/**
* The name representing scheduled enqueue time.
*/
Expand Down Expand Up @@ -122,7 +123,22 @@ public enum AmqpMessageConstant {
/**
* The identifier for deadletter reason.
*/
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason"),

/**
* The published sequence number when a message was sent from an idempotent producer.
*/
PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME("com.microsoft:producer-sequence-number"),

/**
* The published epoch when a message was sent from an idempotent producer.
*/
PRODUCER_EPOCH_ANNOTATION_NAME("com.microsoft:producer-epoch"),

/**
* The published producer id when a message was sent from an idempotent producer.
*/
PRODUCER_ID_ANNOTATION_NAME("com.microsoft:producer-id");

private static final Map<String, AmqpMessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,17 @@ public enum AmqpErrorCondition {
/**
* Error condition when a subscription client tries to create a rule with the name of an already existing rule.
*/
ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists");
ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists"),

/**
* A producer is disconnected because another higher epoc producer connects to the service.
*/
PRODUCER_EPOCH_STOLEN("com.microsoft:producer-epoch-stolen"),

/**
* An idempotent producer is sending an event without a consecutive producer sequence number.
*/
OUT_OF_ORDER_SEQUENCE("com.microsoft:out-of-order-sequence");

private static final Map<String, AmqpErrorCondition> ERROR_CONSTANT_MAP = new HashMap<>();
private final String errorCondition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.engine.Delivery;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;

/**
* An AMQP link that sends information to the remote endpoint.
Expand Down Expand Up @@ -77,6 +79,12 @@ public interface AmqpSendLink extends AmqpLink {
*/
Mono<Integer> getLinkSize();

/**
* Gets the properties of the send link returned from the service.
* @return A Mono that completes and returns the properties of the send link.
*/
Mono<Map<Symbol, Object>> getRemoteProperties();

/**
* Gets the context for this AMQP send link.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public static Exception toException(String errorCondition, String description, A
case ENTITY_ALREADY_EXISTS:
case MESSAGE_NOT_FOUND:
case SESSION_NOT_FOUND:
case PRODUCER_EPOCH_STOLEN:
case OUT_OF_ORDER_SEQUENCE:
isTransient = false;
break;
case NOT_IMPLEMENTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -276,6 +277,15 @@ public Mono<Integer> getLinkSize() {
}
}

@Override
public Mono<Map<Symbol, Object>> getRemoteProperties() {
return RetryUtil.withRetry(
getEndpointStates()
.takeUntil(state -> state == AmqpEndpointState.ACTIVE)
.then(Mono.fromCallable(sender::getRemoteProperties)),
timeout, retry);
}

@Override
public boolean isDisposed() {
return isDisposed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,14 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
* @param linkName Name of the receive link.
* @param entityPath Address in the message broker for the link.
* @param linkProperties The properties needed to be set on the link.
* @param senderDesiredCapabilities Capabilities that the sender link supports.
* @param timeout Operation timeout when creating the link.
* @param retry Retry policy to apply when link creation times out.
*
* @return A new instance of an {@link AmqpLink} with the correct properties set.
*/
protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties, Symbol[] senderDesiredCapabilities) {

if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
Expand Down Expand Up @@ -451,8 +452,8 @@ protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Dura
}

logger.info("Creating a new sender link with linkName {}", linkName);
return getSubscription(linkName, entityPath, linkProperties, timeout, retry,
tokenManager);
return getSubscription(linkName, entityPath, linkProperties, senderDesiredCapabilities,
timeout, retry, tokenManager);
});

sink.success(computed.getLink());
Expand All @@ -463,11 +464,17 @@ protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Dura
}));
}

protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry,
Map<Symbol, Object> linkProperties) {

return this.createProducer(linkName, entityPath, timeout, retry, linkProperties, null);
}
/**
* NOTE: Ensure this is invoked using the reactor dispatcher because proton-j is not thread-safe.
*/
private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String entityPath,
Map<Symbol, Object> linkProperties, Duration timeout, AmqpRetryPolicy retry, TokenManager tokenManager) {
Map<Symbol, Object> linkProperties, Symbol[] senderDesiredCapabilities, Duration timeout, AmqpRetryPolicy retry,
TokenManager tokenManager) {

final Sender sender = session.sender(linkName);
final Target target = new Target();
Expand All @@ -482,6 +489,10 @@ private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String e
sender.setProperties(linkProperties);
}

if (senderDesiredCapabilities != null && senderDesiredCapabilities.length > 0) {
sender.setDesiredCapabilities(senderDesiredCapabilities);
}

final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(
sessionHandler.getConnectionId(), sessionHandler.getHostname(), linkName, entityPath);
BaseHandler.setHandler(sender, sendLinkHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Release History

## 1.4.0-beta.1 (Unreleased)
## 1.4.0-beta.1 (2020-11-12)
### Dependency Updates
- Update `azure-messaging-eventhubs` dependency to `5.4.0-beta.1`.

## 1.3.1 (2020-10-30)
### Dependency Updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.1</version>
<version>1.4.0-beta.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
10 changes: 9 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Release History

## 5.4.0-beta.1 (Unreleased)
## 5.4.0-beta.1 (2020-11-12)
### Breaking changes
- Removed `ObjectBatch` and related `createBatch()` and `send()` operations in favor of
supporting `BinaryData` in `EventData`.

## 5.3.1 (2020-10-30)
### Bug fixes
Expand All @@ -23,6 +26,11 @@ the partition consumer to rebuild the connection later.
- Update `azure-core-amqp` dependency to `1.6.0`.
- Update `azure-identity` dependency to `1.1.3`.

## 5.3.0-beta.1 (2020-09-25)
### New Features
- A producer client can be configured to be an idempotent producer. When an event with the
same producer group id and publishing sequence number is sent twice, only one will be accepted to the event hub.

## 5.2.0 (2020-09-11)
- Default scheme to 'sb://' if no scheme is set in 'Endpoint'.
- Update dependency version of `azure-core-amp` to `1.5.1`
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.1</version>
<version>5.4.0-beta.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
7 changes: 6 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@
<artifactId>azure-core</artifactId>
<version>1.10.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
<version>1.0.0-beta.8</version> <!-- {x-version-update;com.azure:azure-core-experimental;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.6.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
Loading

0 comments on commit a1fa175

Please sign in to comment.