Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparing Event Hubs 5.4.0-beta.1 release #17502

Merged
merged 12 commits into from
Nov 13, 2020
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ com.azure:azure-communication-common;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-administration;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-communication-sms;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.10.0;1.11.0-beta.1
com.azure:azure-core-amqp;1.6.0;1.7.0-beta.2
com.azure:azure-core-amqp;1.7.0-beta.2;1.7.0-beta.3
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.8;1.0.0-beta.9
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down Expand Up @@ -180,4 +180,4 @@ unreleased_com.azure:azure-messaging-eventhubs;5.3.0
# note: Released beta versions will not be manipulated with the automatic PR creation code.
beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.1
beta_com.azure:azure-storage-common;12.9.0-beta.1
beta_com.azure:azure-core-amqp;1.7.0-beta.1
unreleased_com.azure:azure-core-amqp;1.7.0-beta.2
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
7 changes: 6 additions & 1 deletion sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History
## 1.7.0-beta.2 (Unreleased)
## 1.7.0-beta.3 (Unreleased)

## 1.7.0-beta.2 (2020-11-10)
### New Features
- Optionally enable idempotency of a send link to send AMQP messages with producer group id, producer owner level and
producer sequence number in the message annotations.

## 1.7.0-beta.1 (2020-11-03)
### Dependency Updates
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.7.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>1.7.0-beta.3</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public enum AmqpMessageConstant {
* The name of the entity that published a message.
*/
PUBLISHER_ANNOTATION_NAME("x-opt-publisher"),

/**
* The name representing scheduled enqueue time.
*/
Expand Down Expand Up @@ -122,7 +123,22 @@ public enum AmqpMessageConstant {
/**
* The identifier for deadletter reason.
*/
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason");
DEAD_LETTER_REASON_ANNOTATION_NAME("DeadLetterReason"),

/**
* The published sequence number when a message was sent from an idempotent producer.
*/
PRODUCER_SEQUENCE_NUMBER_ANNOTATION_NAME("com.microsoft:producer-sequence-number"),

/**
* The published epoch when a message was sent from an idempotent producer.
*/
PRODUCER_EPOCH_ANNOTATION_NAME("com.microsoft:producer-epoch"),

/**
* The published producer id when a message was sent from an idempotent producer.
*/
PRODUCER_ID_ANNOTATION_NAME("com.microsoft:producer-id");

private static final Map<String, AmqpMessageConstant> RESERVED_CONSTANTS_MAP = new HashMap<>();
private final String constant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,17 @@ public enum AmqpErrorCondition {
/**
* Error condition when a subscription client tries to create a rule with the name of an already existing rule.
*/
ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists");
ENTITY_ALREADY_EXISTS("com.microsoft:entity-already-exists"),

/**
* A producer is disconnected because another higher epoc producer connects to the service.
*/
PRODUCER_EPOCH_STOLEN("com.microsoft:producer-epoch-stolen"),

/**
* An idempotent producer is sending an event without a consecutive producer sequence number.
*/
OUT_OF_ORDER_SEQUENCE("com.microsoft:out-of-order-sequence");

private static final Map<String, AmqpErrorCondition> ERROR_CONSTANT_MAP = new HashMap<>();
private final String errorCondition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.engine.Delivery;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;

/**
* An AMQP link that sends information to the remote endpoint.
Expand Down Expand Up @@ -77,6 +79,12 @@ public interface AmqpSendLink extends AmqpLink {
*/
Mono<Integer> getLinkSize();

/**
* Gets the properties of the send link returned from the service.
* @return A Mono that completes and returns the properties of the send link.
*/
Mono<Map<Symbol, Object>> getRemoteProperties();

/**
* Gets the context for this AMQP send link.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public static Exception toException(String errorCondition, String description, A
case ENTITY_ALREADY_EXISTS:
case MESSAGE_NOT_FOUND:
case SESSION_NOT_FOUND:
case PRODUCER_EPOCH_STOLEN:
case OUT_OF_ORDER_SEQUENCE:
isTransient = false;
break;
case NOT_IMPLEMENTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -276,6 +277,15 @@ public Mono<Integer> getLinkSize() {
}
}

@Override
public Mono<Map<Symbol, Object>> getRemoteProperties() {
return RetryUtil.withRetry(
getEndpointStates()
.takeUntil(state -> state == AmqpEndpointState.ACTIVE)
.then(Mono.fromCallable(sender::getRemoteProperties)),
timeout, retry);
}

@Override
public boolean isDisposed() {
return isDisposed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,14 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
* @param linkName Name of the receive link.
* @param entityPath Address in the message broker for the link.
* @param linkProperties The properties needed to be set on the link.
* @param senderDesiredCapabilities Capabilities that the sender link supports.
* @param timeout Operation timeout when creating the link.
* @param retry Retry policy to apply when link creation times out.
*
* @return A new instance of an {@link AmqpLink} with the correct properties set.
*/
protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout,
AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties, Symbol[] senderDesiredCapabilities) {

if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
Expand Down Expand Up @@ -451,8 +452,8 @@ protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Dura
}

logger.info("Creating a new sender link with linkName {}", linkName);
return getSubscription(linkName, entityPath, linkProperties, timeout, retry,
tokenManager);
return getSubscription(linkName, entityPath, linkProperties, senderDesiredCapabilities,
timeout, retry, tokenManager);
});

sink.success(computed.getLink());
Expand All @@ -463,11 +464,17 @@ protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Dura
}));
}

protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry,
Map<Symbol, Object> linkProperties) {

return this.createProducer(linkName, entityPath, timeout, retry, linkProperties, null);
}
/**
* NOTE: Ensure this is invoked using the reactor dispatcher because proton-j is not thread-safe.
*/
private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String entityPath,
Map<Symbol, Object> linkProperties, Duration timeout, AmqpRetryPolicy retry, TokenManager tokenManager) {
Map<Symbol, Object> linkProperties, Symbol[] senderDesiredCapabilities, Duration timeout, AmqpRetryPolicy retry,
TokenManager tokenManager) {

final Sender sender = session.sender(linkName);
final Target target = new Target();
Expand All @@ -482,6 +489,10 @@ private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String e
sender.setProperties(linkProperties);
}

if (senderDesiredCapabilities != null && senderDesiredCapabilities.length > 0) {
sender.setDesiredCapabilities(senderDesiredCapabilities);
}

final SendLinkHandler sendLinkHandler = handlerProvider.createSendLinkHandler(
sessionHandler.getConnectionId(), sessionHandler.getHostname(), linkName, entityPath);
BaseHandler.setHandler(sender, sendLinkHandler);
Expand Down
7 changes: 6 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@
<artifactId>azure-core</artifactId>
<version>1.10.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
<version>1.0.0-beta.8</version> <!-- {x-version-update;com.azure:azure-core-experimental;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>1.6.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>1.7.0-beta.2</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
Loading