diff --git a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndDeleteMessageTest.java b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndDeleteMessageTest.java index 1c7f220390048..510a598b413bf 100644 --- a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndDeleteMessageTest.java +++ b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ReceiveAndDeleteMessageTest.java @@ -7,7 +7,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Mono; import java.util.ArrayList; @@ -26,7 +26,7 @@ public class ReceiveAndDeleteMessageTest extends ServiceTest { * @param options to set performance test options. */ public SendMessageTest(ServiceBusStressOptions options) { - super(options, ReceiveMode.PEEK_LOCK); + super(options, ServiceBusReceiveMode.PEEK_LOCK); String messageId = UUID.randomUUID().toString(); message = new ServiceBusMessage(CONTENTS); message.setMessageId(messageId); diff --git a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/SendMessagesTest.java b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/SendMessagesTest.java index c79ffa2ec81b9..9e5575fa1f8f1 100644 --- a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/SendMessagesTest.java +++ b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/SendMessagesTest.java @@ -4,7 +4,7 @@ package com.azure.messaging.servicebus.perf; import com.azure.messaging.servicebus.ServiceBusMessage; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Mono; import java.util.ArrayList; @@ -22,7 +22,7 @@ public class SendMessagesTest extends ServiceTest { * @param options to set performance test options. */ public SendMessagesTest(ServiceBusStressOptions options) { - super(options, ReceiveMode.PEEK_LOCK); + super(options, ServiceBusReceiveMode.PEEK_LOCK); messages = new ArrayList<>(); for (int i = 0; i < options.getMessagesToSend(); ++i) { ServiceBusMessage message = new ServiceBusMessage(CONTENTS); diff --git a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ServiceTest.java b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ServiceTest.java index c2c42829a63bb..12b0e7ae70441 100644 --- a/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ServiceTest.java +++ b/sdk/servicebus/azure-messaging-servicebus-track2-perf/src/main/java/com/azure/messaging/servicebus/perf/ServiceTest.java @@ -13,7 +13,7 @@ import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient; import com.azure.messaging.servicebus.ServiceBusSenderClient; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.perf.test.core.PerfStressOptions; import com.azure.perf.test.core.PerfStressTest; @@ -44,7 +44,7 @@ abstract class ServiceTest extends PerfStres * @param receiveMode to receive messages. * @throws IllegalArgumentException if environment variable not being available. */ - ServiceTest(TOptions options, ReceiveMode receiveMode) { + ServiceTest(TOptions options, ServiceBusReceiveMode receiveMode) { super(options); String connectionString = System.getenv(AZURE_SERVICE_BUS_CONNECTION_STRING); if (CoreUtils.isNullOrEmpty(connectionString)) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java index a3748b714d98d..6b5570923aadf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java @@ -3,7 +3,7 @@ package com.azure.messaging.servicebus; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import java.time.Duration; @@ -11,19 +11,19 @@ * Options set when creating a service bus receiver. */ class ReceiverOptions { - private final ReceiveMode receiveMode; + private final ServiceBusReceiveMode receiveMode; private final int prefetchCount; private final boolean enableAutoComplete; private final String sessionId; private final Integer maxConcurrentSessions; private final Duration maxLockRenewDuration; - ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, boolean enableAutoComplete) { this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null); } - ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, + ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; @@ -46,7 +46,7 @@ Duration getMaxLockRenewDuration() { * * @return the receive mode for the message. */ - ReceiveMode getReceiveMode() { + ServiceBusReceiveMode getReceiveMode() { return receiveMode; } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index af6acb706b88f..3723c3c170ed0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -34,7 +34,7 @@ import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection; import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential; import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import org.apache.qpid.proton.engine.SslDomain; import reactor.core.publisher.Flux; @@ -642,7 +642,7 @@ public final class ServiceBusSessionProcessorClientBuilder { private final ServiceBusProcessorClientOptions processorClientOptions; private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder; private Consumer processMessage; - private Consumer processError; + private Consumer processError; private ServiceBusSessionProcessorClientBuilder() { sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder(); @@ -669,8 +669,8 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc } /** - * Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link - * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application starts the processor. @@ -702,7 +702,7 @@ public ServiceBusSessionProcessorClientBuilder queueName(String queueName) { * * @return The modified {@link ServiceBusSessionProcessorClientBuilder} object. */ - public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode) { + public ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) { sessionReceiverClientBuilder.receiveMode(receiveMode); return this; } @@ -750,7 +750,8 @@ public ServiceBusSessionProcessorClientBuilder processMessage( * * @return The updated {@link ServiceBusProcessorClientBuilder} object */ - public ServiceBusSessionProcessorClientBuilder processError(Consumer processError) { + public ServiceBusSessionProcessorClientBuilder processError( + Consumer processError) { this.processError = processError; return this; } @@ -820,7 +821,7 @@ public final class ServiceBusSessionReceiverClientBuilder { private Integer maxConcurrentSessions = null; private int prefetchCount = DEFAULT_PREFETCH_COUNT; private String queueName; - private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; + private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK; private String subscriptionName; private String topicName; private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; @@ -843,8 +844,8 @@ public ServiceBusSessionReceiverClientBuilder disableAutoComplete() { /** * Sets the amount of time to continue auto-renewing the session lock. Setting {@link Duration#ZERO} or - * {@code null} disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode, - * auto-renewal is disabled. + * {@code null} disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} + * mode, auto-renewal is disabled. * * @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the session lock. * {@link Duration#ZERO} or {@code null} indicates that auto-renewal is disabled. @@ -877,8 +878,8 @@ ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSe } /** - * Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link - * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}. @@ -915,7 +916,7 @@ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) { * * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. */ - public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMode) { + public ServiceBusSessionReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) { this.receiveMode = receiveMode; return this; } @@ -992,12 +993,12 @@ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAut logger.warning( "'enableAutoComplete' is not supported in synchronous client except through callback receive."); enableAutoComplete = false; - } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + } else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode."); enableAutoComplete = false; } - if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { maxAutoLockRenewDuration = Duration.ZERO; } @@ -1059,12 +1060,12 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp logger.warning( "'enableAutoComplete' is not supported in synchronous client except through callback receive."); enableAutoComplete = false; - } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + } else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode."); enableAutoComplete = false; } - if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { maxAutoLockRenewDuration = Duration.ZERO; } @@ -1095,7 +1096,7 @@ public final class ServiceBusProcessorClientBuilder { private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder; private final ServiceBusProcessorClientOptions processorClientOptions; private Consumer processMessage; - private Consumer processError; + private Consumer processError; private ServiceBusProcessorClientBuilder() { serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder(); @@ -1103,8 +1104,8 @@ private ServiceBusProcessorClientBuilder() { } /** - * Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link - * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application starts the processor. @@ -1136,7 +1137,7 @@ public ServiceBusProcessorClientBuilder queueName(String queueName) { * * @return The modified {@link ServiceBusProcessorClientBuilder} object. */ - public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode) { + public ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) { serviceBusReceiverClientBuilder.receiveMode(receiveMode); return this; } @@ -1184,7 +1185,7 @@ public ServiceBusProcessorClientBuilder processMessage( * * @return The updated {@link ServiceBusProcessorClientBuilder} object */ - public ServiceBusProcessorClientBuilder processError(Consumer processError) { + public ServiceBusProcessorClientBuilder processError(Consumer processError) { this.processError = processError; return this; } @@ -1254,7 +1255,7 @@ public final class ServiceBusReceiverClientBuilder { private int prefetchCount = DEFAULT_PREFETCH_COUNT; private String queueName; private SubQueue subQueue; - private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; + private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK; private String subscriptionName; private String topicName; private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; @@ -1277,8 +1278,8 @@ public ServiceBusReceiverClientBuilder disableAutoComplete() { /** * Sets the amount of time to continue auto-renewing the lock. Setting {@link Duration#ZERO} or {@code null} - * disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode, auto-renewal is - * disabled. + * disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode, + * auto-renewal is disabled. * * @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the lock. {@link Duration#ZERO} * or {@code null} indicates that auto-renewal is disabled. @@ -1293,8 +1294,8 @@ public ServiceBusReceiverClientBuilder maxAutoLockRenewDuration(Duration maxAuto } /** - * Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link - * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}. @@ -1331,7 +1332,7 @@ public ServiceBusReceiverClientBuilder queueName(String queueName) { * * @return The modified {@link ServiceBusReceiverClientBuilder} object. */ - public ServiceBusReceiverClientBuilder receiveMode(ReceiveMode receiveMode) { + public ServiceBusReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) { this.receiveMode = receiveMode; return this; } @@ -1421,12 +1422,12 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { logger.warning( "'enableAutoComplete' is not supported in synchronous client except through callback receive."); enableAutoComplete = false; - } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + } else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode."); enableAutoComplete = false; } - if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { maxAutoLockRenewDuration = Duration.ZERO; } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorContext.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorContext.java new file mode 100644 index 0000000000000..b2db59133bce7 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorContext.java @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +/** + * Context for errors handled by the Service Bus processor. + */ +public final class ServiceBusErrorContext { + private final Throwable exception; + private final ServiceBusErrorSource errorSource; + private final String fullyQualifiedNamespace; + private final String entityPath; + + ServiceBusErrorContext(Throwable throwable, String fullyQualifiedNamespace, String entityPath) { + this.exception = throwable; + this.fullyQualifiedNamespace = fullyQualifiedNamespace; + this.entityPath = entityPath; + + if (throwable instanceof ServiceBusException) { + final ServiceBusException serviceBusException = ((ServiceBusException) throwable); + this.errorSource = serviceBusException.getErrorSource(); + } else { + this.errorSource = ServiceBusErrorSource.RECEIVE; + } + } + + /** + * Gets the exception that triggered the call to the error event handler. + * @return The exception that triggered the call to the error event handler. + */ + public Throwable getException() { + return exception; + } + + /** + * Gets the source associated with the error. + * @return The source associated with the error. + */ + public ServiceBusErrorSource getErrorSource() { + return errorSource; + } + + /** + * Gets the namespace name associated with the error event. + * @return The namespace name associated with the error event. + */ + public String getFullyQualifiedNamespace() { + return fullyQualifiedNamespace; + } + + /** + * Gets the entity path associated with the error event. + * @return The entity path associated with the error event. + */ + public String getEntityPath() { + return entityPath; + } +} + diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java index 43f3be2609af4..1faffcdb06371 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusErrorSource.java @@ -11,7 +11,7 @@ public final class ServiceBusErrorSource extends ExpandableStringEnum { /** Error while abandoning the message.*/ - public static final ServiceBusErrorSource ABANDONED = fromString("ABANDONED", ServiceBusErrorSource.class); + public static final ServiceBusErrorSource ABANDON = fromString("ABANDON", ServiceBusErrorSource.class); /** Error while completing the message.*/ public static final ServiceBusErrorSource COMPLETE = fromString("COMPLETE", ServiceBusErrorSource.class); @@ -36,4 +36,12 @@ public final class ServiceBusErrorSource extends ExpandableStringEnum { + // NOTE: this list is intended to mirror the reasons we have in .net + // https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging + // .ServiceBus/src/Primitives/ServiceBusFailureReason.cs + + /** The exception was the result of a general error within the client library. */ + public static final ServiceBusFailureReason GENERAL_ERROR = fromString("GENERAL_ERROR", + ServiceBusFailureReason.class); + + /** The lock on the message is lost. Callers should call attempt to receive and process the message again. */ + public static final ServiceBusFailureReason MESSAGE_LOCK_LOST = fromString("MESSAGE_LOCK_LOST", + ServiceBusFailureReason.class); + + /** The requested message was not found. */ + public static final ServiceBusFailureReason MESSAGE_NOT_FOUND = fromString("MESSAGE_NOT_FOUND", + ServiceBusFailureReason.class); + + /** A message is larger than the maximum size allowed for its transport. */ + public static final ServiceBusFailureReason MESSAGE_SIZE_EXCEEDED = fromString("MESSAGE_SIZE_EXCEEDED", + ServiceBusFailureReason.class); + + /** An entity with the same name exists under the same namespace. */ + public static final ServiceBusFailureReason MESSAGING_ENTITY_ALREADY_EXISTS = fromString( + "MESSAGING_ENTITY_ALREADY_EXISTS", ServiceBusFailureReason.class); + + /** The Messaging Entity is disabled. Enable the entity again using Portal. */ + public static final ServiceBusFailureReason MESSAGING_ENTITY_DISABLED = fromString("MESSAGING_ENTITY_DISABLED", + ServiceBusFailureReason.class); + + /** A Service Bus resource cannot be found by the Service Bus service. */ + public static final ServiceBusFailureReason MESSAGING_ENTITY_NOT_FOUND = fromString("MESSAGING_ENTITY_NOT_FOUND", + ServiceBusFailureReason.class); + + /** The quota applied to an Service Bus resource has been exceeded while interacting with the Azure Service Bus + * service. */ + public static final ServiceBusFailureReason QUOTA_EXCEEDED = fromString("QUOTA_EXCEEDED", + ServiceBusFailureReason.class); + + /** The Azure Service Bus service reports that it is busy in response to a client request to perform an operation + * . */ + public static final ServiceBusFailureReason SERVICE_BUSY = fromString("SERVICE_BUSY", + ServiceBusFailureReason.class); + + /** An operation or other request timed out while interacting with the Azure Service Bus service. */ + public static final ServiceBusFailureReason SERVICE_TIMEOUT = fromString("SERVICE_TIMEOUT", + ServiceBusFailureReason.class); + + /** There was a general communications error encountered when interacting with the Azure Service Bus service. */ + public static final ServiceBusFailureReason SERVICE_COMMUNICATION_ERROR = fromString( + "SERVICE_COMMUNICATION_ERROR", ServiceBusFailureReason.class); + + /** The requested session cannot be locked. */ + public static final ServiceBusFailureReason SESSION_CANNOT_BE_LOCKED = fromString("SESSION_CANNOT_BE_LOCKED", + ServiceBusFailureReason.class); + + /** The lock on the session has expired. Callers should request the session again. */ + public static final ServiceBusFailureReason SESSION_LOCK_LOST = fromString("SESSION_LOCK_LOST", + ServiceBusFailureReason.class); + + /** The user doesn't have access to the entity. */ + public static final ServiceBusFailureReason UNAUTHORIZED = fromString("UNAUTHORIZED", + ServiceBusFailureReason.class); +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java index 1849bd3defba2..ea0d437b5b674 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java @@ -101,10 +101,12 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) { try { size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty()); } catch (BufferOverflowException exception) { - throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, - String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", - maxMessageSize / 1024), - contextProvider.getErrorContext())); + final RuntimeException ex = new ServiceBusException( + new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, + String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", + maxMessageSize / 1024), contextProvider.getErrorContext()), ServiceBusErrorSource.SEND); + + throw logger.logExceptionAsWarning(ex); } synchronized (lock) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 8f47419d6bf5e..883de4a336c0c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -39,7 +39,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder; private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder; private final Consumer processMessage; - private final Consumer processError; + private final Consumer processError; private final ServiceBusProcessorClientOptions processorOptions; private final AtomicReference receiverSubscription = new AtomicReference<>(); private final AtomicReference asyncClient = new AtomicReference<>(); @@ -55,8 +55,9 @@ public final class ServiceBusProcessorClient implements AutoCloseable { * @param processorOptions Options to configure this instance of the processor. */ ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder, - Consumer processMessage, - Consumer processError, ServiceBusProcessorClientOptions processorOptions) { + Consumer processMessage, + Consumer processError, + ServiceBusProcessorClientOptions processorOptions) { this.sessionReceiverBuilder = Objects.requireNonNull(sessionReceiverBuilder, "'sessionReceiverBuilder' cannot be null"); this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null"); @@ -75,8 +76,8 @@ public final class ServiceBusProcessorClient implements AutoCloseable { * @param processorOptions Options to configure this instance of the processor. */ ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder, - Consumer processMessage, - Consumer processError, ServiceBusProcessorClientOptions processorOptions) { + Consumer processMessage, + Consumer processError, ServiceBusProcessorClientOptions processorOptions) { this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null"); this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null"); this.processError = Objects.requireNonNull(processError, "'processError' cannot be null"); @@ -168,7 +169,7 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); processMessage.accept(serviceBusReceivedMessageContext); } catch (Exception ex) { - handleError(new ServiceBusReceiverException(ex, ServiceBusErrorSource.USER_CALLBACK)); + handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); if (!processorOptions.isDisableAutoComplete()) { logger.warning("Error when processing message. Abandoning message.", ex); abandonMessage(serviceBusMessageContext, receiverClient); @@ -211,7 +212,10 @@ private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, private void handleError(Throwable throwable) { try { - processError.accept(throwable); + ServiceBusReceiverAsyncClient client = asyncClient.get(); + final String fullyQualifiedNamespace = client.getFullyQualifiedNamespace(); + final String entityPath = client.getEntityPath(); + processError.accept(new ServiceBusErrorContext(throwable, fullyQualifiedNamespace, entityPath)); } catch (Exception ex) { logger.verbose("Error from error handler. Ignoring error.", ex); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index 74c6df584c826..384f930dede0d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -9,7 +9,7 @@ import com.azure.core.amqp.models.AmqpDataBody; import com.azure.core.experimental.util.BinaryData; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import java.time.Duration; import java.time.OffsetDateTime; @@ -240,7 +240,7 @@ public String getLabel() { * Gets the lock token for the current message. *

* The lock token is a reference to the lock that is being held by the broker in - * {@link ReceiveMode#PEEK_LOCK} mode. + * {@link ServiceBusReceiveMode#PEEK_LOCK} mode. * Locks are used to explicitly settle messages as explained in the * product * documentation in more detail. The token can also be used to pin the lock permanently @@ -248,7 +248,8 @@ public String getLabel() { * href="https://docs.microsoft.com/azure/service-bus-messaging/message-deferral">Deferral API and, with that, * take the message out of the regular delivery state flow. This property is read-only. * - * @return Lock-token for this message. Could return {@code null} for {@link ReceiveMode#RECEIVE_AND_DELETE} mode. + * @return Lock-token for this message. Could return {@code null} for + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode. * * @see Message * transfers, locks, and settlement @@ -266,7 +267,7 @@ public String getLockToken() { * is read-only. * * @return the datetime at which the lock of this message expires if the message is received using {@link - * ReceiveMode#PEEK_LOCK} mode. Otherwise it returns null. + * ServiceBusReceiveMode#PEEK_LOCK} mode. Otherwise it returns null. * * @see Message * transfers, locks, and settlement diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 3c35ac976a07e..d20e0aed23f05 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -26,7 +26,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -60,7 +60,8 @@ * other terminal scenarios. See {@link #receiveMessages()} for more information.

* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all} * - *

Receive messages in {@link ReceiveMode#RECEIVE_AND_DELETE} mode from Service Bus resource

+ *

Receive messages in {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode from Service Bus + * resource

* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode} * *

Receive messages from a specific session

@@ -193,8 +194,8 @@ public String getEntityPath() { * * @return A {@link Mono} that completes when the Service Bus abandon operation completes. * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public Mono abandon(ServiceBusReceivedMessage message) { return updateDisposition(message, DispositionStatus.ABANDONED, null, null, @@ -217,8 +218,8 @@ public Mono abandon(ServiceBusReceivedMessage message) { * @return A {@link Mono} that completes when the Service Bus operation finishes. * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options) { if (Objects.isNull(options)) { @@ -240,8 +241,8 @@ public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions opti * * @return A {@link Mono} that finishes when the message is completed on Service Bus. * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public Mono complete(ServiceBusReceivedMessage message) { return updateDisposition(message, DispositionStatus.COMPLETED, null, null, @@ -261,8 +262,8 @@ public Mono complete(ServiceBusReceivedMessage message) { * @return A {@link Mono} that finishes when the message is completed on Service Bus. * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options) { if (Objects.isNull(options)) { @@ -284,8 +285,8 @@ public Mono complete(ServiceBusReceivedMessage message, CompleteOptions op * * @return A {@link Mono} that completes when the Service Bus defer operation finishes. * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} mode - * or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Message deferral */ public Mono defer(ServiceBusReceivedMessage message) { @@ -307,8 +308,8 @@ public Mono defer(ServiceBusReceivedMessage message) { * @return A {@link Mono} that completes when the defer operation finishes. * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Message deferral */ public Mono defer(ServiceBusReceivedMessage message, DeferOptions options) { @@ -331,8 +332,8 @@ public Mono defer(ServiceBusReceivedMessage message, DeferOptions options) * * @return A {@link Mono} that completes when the dead letter operation finishes. * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Dead letter * queues */ @@ -353,8 +354,8 @@ public Mono deadLetter(ServiceBusReceivedMessage message) { * @return A {@link Mono} that completes when the dead letter operation finishes. * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Dead letter * queues */ @@ -654,7 +655,7 @@ sessionId, getLinkName(sessionId), Collections.singleton(sequenceNumber)).last() if (CoreUtils.isNullOrEmpty(receivedMessage.getLockToken())) { return receivedMessage; } - if (receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) { + if (receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) { receivedMessage.setLockedUntil(managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil())); @@ -699,7 +700,7 @@ sessionId, getLinkName(sessionId), sequenceNumbers)) if (CoreUtils.isNullOrEmpty(receivedMessage.getLockToken())) { return receivedMessage; } - if (receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) { + if (receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) { receivedMessage.setLockedUntil(managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil())); @@ -711,17 +712,17 @@ sessionId, getLinkName(sessionId), sequenceNumbers)) /** * Asynchronously renews the lock on the message. The lock will be renewed based on the setting specified on the - * entity. When a message is received in {@link ReceiveMode#PEEK_LOCK} mode, the message is locked on the server for - * this receiver instance for a duration as specified during the entity creation (LockDuration). If processing of - * the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is reset - * to the entity's LockDuration value. + * entity. When a message is received in {@link ServiceBusReceiveMode#PEEK_LOCK} mode, the message is locked on + * the server for this receiver instance for a duration as specified during the entity creation (LockDuration). + * If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, + * the lock is reset to the entity's LockDuration value. * * @param message The {@link ServiceBusReceivedMessage} to perform auto-lock renewal. * * @return The new expiration time for the message. * @throws NullPointerException if {@code message} or {@code message.getLockToken()} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @throws IllegalStateException if the receiver is a session receiver. * @throws IllegalArgumentException if {@code message.getLockToken()} is an empty value. */ @@ -982,7 +983,7 @@ private Mono updateDisposition(ServiceBusReceivedMessage message, Disposit final String lockToken = message.getLockToken(); final String sessionId = message.getSessionId(); - if (receiverOptions.getReceiveMode() != ReceiveMode.PEEK_LOCK) { + if (receiverOptions.getReceiveMode() != ServiceBusReceiveMode.PEEK_LOCK) { return Mono.error(logger.logExceptionAsError(new UnsupportedOperationException(String.format( "'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", dispositionStatus)))); } else if (message.isSettled()) { @@ -1055,17 +1056,17 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId return updateDispositionOperation .onErrorMap(throwable -> { - if (throwable instanceof ServiceBusReceiverException) { + if (throwable instanceof ServiceBusException) { return throwable; } switch (dispositionStatus) { case COMPLETED: - return new ServiceBusReceiverException(throwable, ServiceBusErrorSource.COMPLETE); + return new ServiceBusException(throwable, ServiceBusErrorSource.COMPLETE); case ABANDONED: - return new ServiceBusReceiverException(throwable, ServiceBusErrorSource.ABANDONED); + return new ServiceBusException(throwable, ServiceBusErrorSource.ABANDON); default: - return new ServiceBusReceiverException(throwable, ServiceBusErrorSource.UNKNOWN); + return new ServiceBusException(throwable, ServiceBusErrorSource.UNKNOWN); } }); } @@ -1190,7 +1191,8 @@ Mono setSessionState(String sessionId, byte[] sessionState) { return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) - .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)); + .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)) + .onErrorMap((err) -> mapError(err, ServiceBusErrorSource.RECEIVE)); } Mono getSessionState(String sessionId) { @@ -1200,22 +1202,26 @@ Mono getSessionState(String sessionId) { } else if (!receiverOptions.isSessionReceiver()) { return monoError(logger, new IllegalStateException("Cannot get session state on a non-session receiver.")); } + + Mono result; + if (sessionManager != null) { - return sessionManager.getSessionState(sessionId); + result = sessionManager.getSessionState(sessionId); } else { - return connectionProcessor + result = connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId))); } + + return result.onErrorMap((err) -> mapError(err, ServiceBusErrorSource.RECEIVE)); } /** - * Map the error to {@link ServiceBusReceiverException} + * Map the error to {@link ServiceBusException} */ private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) { - // If it is already `ServiceBusReceiverException`, we can just throw it. - if (!(throwable instanceof ServiceBusReceiverException)) { - return new ServiceBusReceiverException(throwable, errorSource); + if (!(throwable instanceof ServiceBusException)) { + return new ServiceBusException(throwable, errorSource); } return throwable; } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 28031e6ea4abd..a584df4c194d3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -11,7 +11,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -79,8 +79,8 @@ public String getEntityPath() { * @param message The {@link ServiceBusReceivedMessage} to perform this operation. * * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public void abandon(ServiceBusReceivedMessage message) { asyncClient.abandon(message).block(operationTimeout); @@ -100,8 +100,8 @@ public void abandon(ServiceBusReceivedMessage message) { * * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) { asyncClient.abandon(message, options).block(operationTimeout); @@ -113,8 +113,8 @@ public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) { * @param message The {@link ServiceBusReceivedMessage} to perform this operation. * * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public void complete(ServiceBusReceivedMessage message) { asyncClient.complete(message).block(operationTimeout); @@ -131,8 +131,8 @@ public void complete(ServiceBusReceivedMessage message) { * * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public void complete(ServiceBusReceivedMessage message, CompleteOptions options) { asyncClient.complete(message, options).block(operationTimeout); @@ -144,8 +144,8 @@ public void complete(ServiceBusReceivedMessage message, CompleteOptions options) * @param message The {@link ServiceBusReceivedMessage} to perform this operation. * * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Message deferral */ public void defer(ServiceBusReceivedMessage message) { @@ -165,8 +165,8 @@ public void defer(ServiceBusReceivedMessage message) { * * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Message deferral */ public void defer(ServiceBusReceivedMessage message, DeferOptions options) { @@ -179,8 +179,8 @@ public void defer(ServiceBusReceivedMessage message, DeferOptions options) { * @param message The {@link ServiceBusReceivedMessage} to perform this operation. * * @throws NullPointerException if {@code message} is null. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. * @see Dead letter * queues */ @@ -202,8 +202,8 @@ public void deadLetter(ServiceBusReceivedMessage message) { * * @throws NullPointerException if {@code message} or {@code options} is null. Also if * {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified. - * @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} - * mode or if the message was received from peekMessage. + * @throws UnsupportedOperationException if the receiver was opened in + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE} mode or if the message was received from peekMessage. */ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) { asyncClient.deadLetter(message, options).block(operationTimeout); @@ -368,8 +368,8 @@ public IterableStream receiveMessages(int maxMessages /** * Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. The - * default receive mode is {@link ReceiveMode#PEEK_LOCK } unless it is changed during creation of {@link - * ServiceBusReceiverClient} using {@link ServiceBusReceiverClientBuilder#receiveMode(ReceiveMode)}. + * default receive mode is {@link ServiceBusReceiveMode#PEEK_LOCK } unless it is changed during creation of {@link + * ServiceBusReceiverClient} using {@link ServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode)}. * * @param maxMessages The maximum number of messages to receive. * @param maxWaitTime The time the client waits for receiving a message before it times out. @@ -457,17 +457,17 @@ IterableStream receiveDeferredMessageBatch(Iterable createMessageBatch(CreateMessageBatchOptions return Mono.just( new ServiceBusMessageBatch(batchSize, link::getErrorContext, tracerProvider, messageSerializer, entityName, getFullyQualifiedNamespace())); - })); + })).onErrorMap(this::mapError); } /** @@ -619,8 +619,7 @@ private Mono sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact if (isTracingEnabled) { tracerProvider.endSpan(parentContext.get(), signal); } - }); - + }).onErrorMap(this::mapError); } private Mono sendInternal(Flux messages, ServiceBusTransactionContext transactionContext) { @@ -634,7 +633,8 @@ private Mono sendInternal(Flux messages, ServiceBusTran link::getErrorContext, tracerProvider, messageSerializer, entityName, link.getHostname())); }) - .flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext))); + .flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext))) + .onErrorMap(this::mapError); } private Mono sendInternalBatch(Flux eventBatches, @@ -658,6 +658,13 @@ private Mono getSendLink() { .doOnNext(next -> linkName.compareAndSet(null, next.getLinkName())); } + private Throwable mapError(Throwable throwable) { + if (!(throwable instanceof ServiceBusException)) { + return new ServiceBusException(throwable, ServiceBusErrorSource.SEND); + } + return throwable; + } + private static class AmqpMessageCollector implements Collector, List> { private final int maxMessageSize; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java index 25076cedd4b74..f716984ef8da3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java @@ -15,10 +15,12 @@ import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.ServiceBusErrorSource; +import com.azure.messaging.servicebus.ServiceBusException; import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusTransactionContext; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedInteger; @@ -193,7 +195,7 @@ public Flux peek(long fromSequenceNumber, String sess * {@inheritDoc} */ @Override - public Flux receiveDeferredMessages(ReceiveMode receiveMode, String sessionId, + public Flux receiveDeferredMessages(ServiceBusReceiveMode receiveMode, String sessionId, String associatedLinkName, Iterable sequenceNumbers) { if (sequenceNumbers == null) { return fluxError(logger, new NullPointerException("'sequenceNumbers' cannot be null")); @@ -217,7 +219,7 @@ public Flux receiveDeferredMessages(ReceiveMode recei requestBodyMap.put(ManagementConstants.SEQUENCE_NUMBERS, numbers.toArray(new Long[0])); requestBodyMap.put(ManagementConstants.RECEIVER_SETTLE_MODE, - UnsignedInteger.valueOf(receiveMode == ReceiveMode.RECEIVE_AND_DELETE ? 0 : 1)); + UnsignedInteger.valueOf(receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE ? 0 : 1)); if (!CoreUtils.isNullOrEmpty(sessionId)) { requestBodyMap.put(ManagementConstants.SESSION_ID, sessionId); @@ -234,6 +236,14 @@ public Flux receiveDeferredMessages(ReceiveMode recei })); } + private Throwable mapError(Throwable throwable) { + if (throwable instanceof AmqpException) { + return new ServiceBusException(throwable, ServiceBusErrorSource.MANAGEMENT); + } + + return throwable; + } + /** * {@inheritDoc} */ @@ -499,17 +509,21 @@ private Mono sendWithVerify(RequestResponseChannel channel, Message mes sink.error(throwable); }) .switchIfEmpty(Mono.error(new AmqpException(true, "No response received from management channel.", - channel.getErrorContext()))); + channel.getErrorContext()))) + .onErrorMap(this::mapError); } private Mono isAuthorized(String operation) { return tokenManager.getAuthorizationResults() + .onErrorMap(this::mapError) .next() .handle((response, sink) -> { if (response != AmqpResponseCode.ACCEPTED && response != AmqpResponseCode.OK) { - sink.error(new AmqpException(false, String.format( - "User does not have authorization to perform operation [%s] on entity [%s]. Response: [%s]", - operation, entityPath, response), getErrorContext())); + final String message = String.format("User does not have authorization to perform operation " + + "[%s] on entity [%s]. Response: [%s]", operation, entityPath, response); + final Throwable exc = new AmqpException(false, AmqpErrorCondition.UNAUTHORIZED_ACCESS, + message, getErrorContext()); + sink.error(new ServiceBusException(exc, ServiceBusErrorSource.MANAGEMENT)); } else { sink.complete(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageManagementOperations.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageManagementOperations.java index 26a66fec64aba..923072fb25d92 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageManagementOperations.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageManagementOperations.java @@ -3,7 +3,7 @@ package com.azure.messaging.servicebus.implementation; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.transport.DeliveryState; import reactor.core.publisher.Mono; @@ -23,7 +23,7 @@ public interface MessageManagementOperations { /** * Asynchronously renews the lock on the message specified by the lock token. The lock will be renewed based on - * the setting specified on the entity. When a message is received in {@link ReceiveMode#PEEK_LOCK} mode, + * the setting specified on the entity. When a message is received in {@link ServiceBusReceiveMode#PEEK_LOCK} mode, * the message is locked on the server for this receiver instance for a duration as specified during the * Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, * the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java index 38bc5a34de730..1f1c3ccf31d8a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusAmqpConnection.java @@ -6,7 +6,7 @@ import com.azure.core.amqp.AmqpConnection; import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.implementation.AmqpSendLink; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Mono; public interface ServiceBusAmqpConnection extends AmqpConnection { @@ -41,11 +41,11 @@ Mono createSendLink(String linkName, String entityPath, AmqpRetryO * @param entityPath The remote address to connect to for the message broker. * @param transferEntityPath Path if the message should be transferred to another link after being received * from this link. - * @param receiveMode {@link ReceiveMode} to use when creating the link. + * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. * * @return A new or existing receive link that is connected to the given {@code entityPath}. */ - Mono createReceiveLink(String linkName, String entityPath, ReceiveMode receiveMode, + Mono createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType); /** @@ -56,12 +56,12 @@ Mono createReceiveLink(String linkName, String entityPath * @param entityPath The remote address to connect to for the message broker. * @param transferEntityPath Path if the events should be transferred to another link after being received * from this link. - * @param receiveMode {@link ReceiveMode} to use when creating the link. + * @param receiveMode {@link ServiceBusReceiveMode} to use when creating the link. * @param sessionId to use when creating the link. * @param entityType {@link MessagingEntityType} to use when creating the link. * * @return A new or existing receive link that is connected to the given {@code entityPath}. */ - Mono createReceiveLink(String linkName, String entityPath, ReceiveMode receiveMode, + Mono createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, String sessionId); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java index 3221aed34adce..730f4723b833f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java @@ -6,7 +6,7 @@ import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusTransactionContext; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,12 +63,12 @@ Flux peek(long fromSequenceNumber, String sessionId, * * @return The received {@link ServiceBusReceivedMessage} message for given sequence number. */ - Flux receiveDeferredMessages(ReceiveMode receiveMode, String sessionId, + Flux receiveDeferredMessages(ServiceBusReceiveMode receiveMode, String sessionId, String associatedLinkName, Iterable sequenceNumbers); /** * Asynchronously renews the lock on the message specified by the lock token. The lock will be renewed based on - * the setting specified on the entity. When a message is received in {@link ReceiveMode#PEEK_LOCK} mode, + * the setting specified on the entity. When a message is received in {@link ServiceBusReceiveMode#PEEK_LOCK} mode, * the message is locked on the server for this receiver instance for a duration as specified during the * Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, * the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java index 644fff57cc5c3..46d17f14e19eb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java @@ -19,7 +19,7 @@ import com.azure.core.amqp.implementation.TokenManagerProvider; import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; @@ -165,8 +165,8 @@ public Mono createSendLink(String linkName, String entityPath, Amq * @return A new or existing receive link that is connected to the given {@code entityPath}. */ @Override - public Mono createReceiveLink(String linkName, String entityPath, ReceiveMode receiveMode, - String transferEntityPath, MessagingEntityType entityType) { + public Mono createReceiveLink(String linkName, String entityPath, + ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType) { return createSession(entityPath).cast(ServiceBusSession.class) .flatMap(session -> { logger.verbose("Get or create consumer for path: '{}'", entityPath); @@ -191,8 +191,9 @@ public Mono createReceiveLink(String linkName, String ent * @return A new or existing receive link that is connected to the given {@code entityPath}. */ @Override - public Mono createReceiveLink(String linkName, String entityPath, ReceiveMode receiveMode, - String transferEntityPath, MessagingEntityType entityType, String sessionId) { + public Mono createReceiveLink(String linkName, String entityPath, + ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, + String sessionId) { return createSession(entityPath).cast(ServiceBusSession.class) .flatMap(session -> { logger.verbose("Get or create consumer for path: '{}'", entityPath); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index a9475a57f7e6d..a3d36683c8880 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -13,7 +13,7 @@ import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -69,7 +69,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic /** * Indicates whether the message has already been settled from the sender side. This is the case when {@link - * ReceiveMode#RECEIVE_AND_DELETE} is used. + * ServiceBusReceiveMode#RECEIVE_AND_DELETE} is used. */ private final boolean isSettled; private final Duration timeout; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java index f672b3542e616..e98336fc9d6c1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java @@ -18,7 +18,7 @@ import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; @@ -79,7 +79,7 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi @Override public Mono createConsumer(String linkName, String entityPath, - MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ReceiveMode receiveMode) { + MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode) { final Map filter = new HashMap<>(); return createConsumer(linkName, entityPath, entityType, timeout, retry, receiveMode, filter); @@ -87,7 +87,7 @@ public Mono createConsumer(String linkName, String entity @Override public Mono createConsumer(String linkName, String entityPath, - MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ReceiveMode receiveMode, + MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, String sessionId) { final Map filter = new HashMap<>(); @@ -132,7 +132,7 @@ protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, } private Mono createConsumer(String linkName, String entityPath, - MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ReceiveMode receiveMode, + MessagingEntityType entityType, Duration timeout, AmqpRetryPolicy retry, ServiceBusReceiveMode receiveMode, Map filter) { Objects.requireNonNull(linkName, "'linkName' cannot be null."); Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index 98d2eddd32e95..93f31e5ff8a21 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -7,7 +7,7 @@ import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.implementation.AmqpReceiveLink; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.message.Message; import org.reactivestreams.Subscription; @@ -60,7 +60,7 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor createConsumer(String linkName, String entityPath, MessagingEntityType entityType, - Duration timeout, AmqpRetryPolicy retryPolicy, ReceiveMode receiveMode); + Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode); /** * Creates a new AMQP link that consumes events from the message broker. @@ -42,14 +42,14 @@ Mono createConsumer(String linkName, String entityPath, M * @param entityPath The entity path this link connects to, so that it may read events from the message broker. * @param timeout Timeout required for creating and opening an AMQP link. * @param retryPolicy The retry policy to use when consuming messages. - * @param receiveMode The {@link ReceiveMode} for the messages to be received. + * @param receiveMode The {@link ServiceBusReceiveMode} for the messages to be received. * @param sessionId The sessionId for the messages to be received. If {@code null}, then the next, unnamed session * is retrieved. * * @return A newly created AMQP link. */ Mono createConsumer(String linkName, String entityPath, MessagingEntityType entityType, - Duration timeout, AmqpRetryPolicy retryPolicy, ReceiveMode receiveMode, String sessionId); + Duration timeout, AmqpRetryPolicy retryPolicy, ServiceBusReceiveMode receiveMode, String sessionId); /** * Creates a new {@link AmqpLink} that can send events to the message broker. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveMode.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java similarity index 98% rename from sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveMode.java rename to sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java index 5b78675099c92..964f4eff96cd9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveMode.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java @@ -6,7 +6,7 @@ /** * Defines the Receive modes. */ -public enum ReceiveMode { +public enum ServiceBusReceiveMode { /** * In this mode, received message is not deleted from the queue or subscription, instead it is temporarily locked * to the receiver, making it invisible to other receivers. Then the service waits for one of the three events diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index b78af2c69fe5f..aa619eee2fc95 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -6,7 +6,7 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.IterableStream; import com.azure.identity.DefaultAzureCredentialBuilder; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -139,7 +139,7 @@ public void completeMessage() { .receiver() .topicName("<< TOPIC NAME >>") .subscriptionName("<< SUBSCRIPTION NAME >>") - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildClient(); // This fetches a batch of 10 messages or until the default operation timeout has elapsed. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java index 810099b5e4071..405469aa583cb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -3,7 +3,7 @@ package com.azure.messaging.servicebus; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -36,7 +36,7 @@ public static void main(String[] args) throws InterruptedException { ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .queueName("<>") .buildAsyncClient(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java index 6ca51776064b4..296c44cdfc287 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -3,7 +3,7 @@ package com.azure.messaging.servicebus; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -35,7 +35,7 @@ public static void main(String[] args) throws InterruptedException { ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .topicName("<>") .subscriptionName("<>") .buildAsyncClient(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java index 7d86cc7ec12cc..489f622073abb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java @@ -4,7 +4,7 @@ package com.azure.messaging.servicebus; import com.azure.core.experimental.util.BinaryData; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import reactor.core.publisher.Mono; import java.util.Arrays; @@ -52,7 +52,7 @@ public static void main(String[] args) throws InterruptedException { // Instantiate a client that will be used to receive messages from the session. ServiceBusSessionReceiverAsyncClient sessionReceiver = builder.sessionReceiver() - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .queueName(queueName) .buildAsyncClient(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java index 694cae8a56e0f..a3b3ad48ba066 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorClientJavaDocCodeSamples.java @@ -20,11 +20,12 @@ public void createServiceBusProcessorClient() { System.out.println("Received message " + message.getBody().toString()); }; - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; @@ -49,11 +50,12 @@ public void createSessionEnabledServiceBusProcessorClient() { + " session: " + message.getSessionId()); }; - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; @@ -79,11 +81,12 @@ public void createAndStartServiceBusProcessorClient() throws InterruptedExceptio System.out.println("Received message " + message.getBody().toString()); }; - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; @@ -111,11 +114,12 @@ public void createAndStartSessionEnabledServiceBusProcessorClient() { + " session: " + message.getSessionId()); }; - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java index 1588f02228cdb..7a2018e0ac724 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java @@ -25,11 +25,12 @@ public static void main(String[] args) throws InterruptedException { }; // Consumer that handles any errors that occur when receiving messages - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java index d76eafd62442f..f5837c3928a85 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java @@ -6,7 +6,7 @@ import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.servicebus.models.AbandonOptions; import com.azure.messaging.servicebus.models.CompleteOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.publisher.BaseSubscriber; @@ -55,7 +55,7 @@ public void receiveWithReceiveAndDeleteMode() { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("fake-string") .receiver() - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .queueName("<< QUEUE NAME >>") .buildAsyncClient(); // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSessionProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSessionProcessorSample.java index 12bbfd9656ba9..9a474712f7a37 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSessionProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusSessionProcessorSample.java @@ -26,11 +26,12 @@ public static void main(String[] args) throws InterruptedException { }; // Consumer that handles any errors that occur when receiving messages - Consumer errorHandler = throwable -> { - System.out.println("Error when receiving messages " + throwable.getMessage()); - if (throwable instanceof ServiceBusReceiverException) { - ServiceBusReceiverException serviceBusReceiverException = (ServiceBusReceiverException) throwable; - System.out.println("Error source " + serviceBusReceiverException.getErrorSource()); + Consumer errorHandler = errorContext -> { + System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); + if (errorContext.getException() instanceof ServiceBusException) { + ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), + serviceBusException.getReason()); } }; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java index 51fca9029d0be..92f3076170815 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java @@ -7,7 +7,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.jproxy.ProxyServer; import com.azure.messaging.servicebus.jproxy.SimpleProxy; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -93,7 +93,7 @@ public void receiveMessage() { .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .connectionString(getConnectionString()) .receiver() - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .queueName(queueName) .buildAsyncClient(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java index 98bc1ec053727..f4734f58e87c6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java @@ -11,7 +11,7 @@ import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterAll; @@ -87,7 +87,7 @@ void setup(TestInfo testInfo) { when(link.getEndpointStates()).thenReturn(endpointStateFlux); when(link.receive()).thenReturn(messageFlux); linkProcessor = linkFlux.subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, - ReceiveMode.RECEIVE_AND_DELETE)); + ServiceBusReceiveMode.RECEIVE_AND_DELETE)); when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE))); when(link.updateDisposition(anyString(), any(DeliveryState.class))).thenReturn(Mono.empty()); @@ -115,7 +115,7 @@ void receiveNoAutoComplete() { final int prefetch = 10; final Duration maxAutoLockRenewDuration = Duration.ofSeconds(0); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, @@ -162,7 +162,7 @@ void canDispose() { final Duration maxAutoLockRenewDuration = Duration.ofSeconds(40); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, @@ -202,7 +202,7 @@ void onError() { final Duration maxAutoLockRenewDuration = Duration.ofSeconds(40); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, prefetch, maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java index fae12997fb9c0..d73562ff7648a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java @@ -9,7 +9,7 @@ import com.azure.core.util.Configuration; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSenderClientBuilder; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.*; class ServiceBusClientBuilderTest { private static final String NAMESPACE_NAME = "dummyNamespaceName"; @@ -200,7 +199,7 @@ void invalidPrefetch() { .connectionString(NAMESPACE_CONNECTION_STRING) .receiver() .topicName("baz").subscriptionName("bar") - .receiveMode(ReceiveMode.PEEK_LOCK); + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK); // Act & Assert assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(0)); @@ -218,7 +217,7 @@ public void testProxyOptionsConfiguration(String proxyConfiguration, boolean exp .configuration(configuration) .receiver() .topicName("baz").subscriptionName("bar") - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildClient(); clientCreated = true; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusExceptionTestHelper.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusExceptionTestHelper.java new file mode 100644 index 0000000000000..8cbd78d07a081 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusExceptionTestHelper.java @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +public class ServiceBusExceptionTestHelper { + /** + * Get the error source out of the ServiceBusException (used in tests where we're not in the + * same package, since errorSource is an internal field) + */ + public static ServiceBusErrorSource getInternalErrorSource(ServiceBusException exc) { + return exc.getErrorSource(); + } +} + diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageBatchTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageBatchTest.java index ca3983ae2e133..929d4f247972b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageBatchTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageBatchTest.java @@ -3,9 +3,7 @@ package com.azure.messaging.servicebus; -import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; -import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.TracerProvider; @@ -52,11 +50,12 @@ public void payloadExceededException() { final ServiceBusMessage tooBig = new ServiceBusMessage(BinaryData.fromBytes(new byte[1024 * 1024 * 2])); // Act - AmqpException amqpException = assertThrows(AmqpException.class, () -> batch.tryAddMessage(tooBig)); + ServiceBusException thrownException = assertThrows(ServiceBusException.class, () -> batch.tryAddMessage(tooBig)); // Assert - Assertions.assertFalse(amqpException.isTransient()); - Assertions.assertEquals(AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, amqpException.getErrorCondition()); + Assertions.assertFalse(thrownException.isTransient()); + Assertions.assertEquals(ServiceBusErrorSource.SEND, thrownException.getErrorSource()); + Assertions.assertEquals(ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, thrownException.getReason()); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index 57a8b8f1076ba..334b2929bb64d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -256,9 +256,9 @@ public void testUserMessageHandlerError() throws InterruptedException { assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); throw new IllegalStateException(); // throw error from user handler }, - error -> { - assertTrue(error instanceof ServiceBusReceiverException); - ServiceBusReceiverException exception = (ServiceBusReceiverException) error; + serviceBusProcessErrorContext -> { + assertTrue(serviceBusProcessErrorContext instanceof ServiceBusErrorContext); + ServiceBusException exception = (ServiceBusException) serviceBusProcessErrorContext.getException(); assertTrue(exception.getErrorSource() == ServiceBusErrorSource.USER_CALLBACK); countDownLatch.countDown(); }, @@ -303,9 +303,9 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); throw new IllegalStateException(); // throw error from user handler }, - error -> { - assertTrue(error instanceof ServiceBusReceiverException); - ServiceBusReceiverException exception = (ServiceBusReceiverException) error; + serviceBusProcessErrorContext -> { + assertTrue(serviceBusProcessErrorContext instanceof ServiceBusErrorContext); + ServiceBusException exception = (ServiceBusException) serviceBusProcessErrorContext.getException(); assertTrue(exception.getErrorSource() == ServiceBusErrorSource.USER_CALLBACK); countDownLatch.countDown(); }, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index e3d0876fe2d38..af78e7ff45c4d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -7,6 +7,7 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ConnectionOptions; @@ -28,7 +29,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.DeliveryState; @@ -169,9 +170,9 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE)) .thenReturn(Mono.just(managementNode)); - when(connection.createReceiveLink(anyString(), anyString(), any(ReceiveMode.class), any(), + when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), any(MessagingEntityType.class))).thenReturn(Mono.just(amqpReceiveLink)); - when(connection.createReceiveLink(anyString(), anyString(), any(ReceiveMode.class), any(), + when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), any(MessagingEntityType.class), anyString())).thenReturn(Mono.just(sessionReceiveLink)); connectionProcessor = @@ -180,11 +181,11 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe connectionOptions.getRetry())); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, false), + new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false), connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, false, "Some-Session", + new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false, "Some-Session", null), connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); } @@ -344,7 +345,7 @@ void completeNullMessage() { */ @Test void completeInReceiveAndDeleteMode() { - final ReceiverOptions options = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false); + final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -364,7 +365,7 @@ void completeInReceiveAndDeleteMode() { @Test void throwsExceptionAboutSettlingPeekedMessagesWithNullLockToken() { - final ReceiverOptions options = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, false); + final ReceiverOptions options = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, false); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -482,7 +483,7 @@ void errorSourceOnRenewMessageLock() { when(managementNode.renewMessageLock(lockToken, null)) .thenReturn(Mono.error(new AzureException("some error occurred."))); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -490,8 +491,8 @@ void errorSourceOnRenewMessageLock() { // Act & Assert StepVerifier.create(receiver2.renewMessageLock(receivedMessage, maxDuration)) .verifyErrorSatisfies(throwable -> { - Assertions.assertTrue(throwable instanceof ServiceBusReceiverException); - final ServiceBusErrorSource actual = ((ServiceBusReceiverException) throwable).getErrorSource(); + Assertions.assertTrue(throwable instanceof ServiceBusException); + final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource(); Assertions.assertEquals(ServiceBusErrorSource.RENEW_LOCK, actual); }); @@ -506,7 +507,7 @@ void errorSourceOnSessionLock() { // Arrange when(managementNode.renewSessionLock(SESSION_ID, null)).thenReturn(Mono.error(new AzureException("some error occurred."))); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true, "Some-Session", + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, "Some-Session", null); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -514,8 +515,8 @@ void errorSourceOnSessionLock() { // Act & Assert StepVerifier.create(sessionReceiver2.renewSessionLock(SESSION_ID)) .verifyErrorSatisfies(throwable -> { - Assertions.assertTrue(throwable instanceof ServiceBusReceiverException); - final ServiceBusErrorSource actual = ((ServiceBusReceiverException) throwable).getErrorSource(); + Assertions.assertTrue(throwable instanceof ServiceBusException); + final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource(); Assertions.assertEquals(ServiceBusErrorSource.RENEW_LOCK, actual); }); } @@ -560,8 +561,8 @@ void errorSourceNoneOnSettlement(DispositionStatus dispositionStatus, DeliverySt .then(() -> messageSink.next(message)) .expectNext() .verifyErrorSatisfies(throwable -> { - Assertions.assertTrue(throwable instanceof ServiceBusReceiverException); - final ServiceBusErrorSource actual = ((ServiceBusReceiverException) throwable).getErrorSource(); + Assertions.assertTrue(throwable instanceof ServiceBusException); + final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource(); Assertions.assertEquals(errorSource, actual); }); @@ -579,7 +580,7 @@ void errorSourceAutoCompleteMessage() { final int messagesToReceive = 1; final List messages = getMessages(); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -588,7 +589,7 @@ void errorSourceAutoCompleteMessage() { when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class))) .thenReturn(receivedMessage); - when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.error(new AmqpException(false, "some error occurred.", null))); + when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.error(new AmqpException(false, AmqpErrorCondition.MESSAGE_LOCK_LOST, "some error occurred.", null))); try { // Act & Assert @@ -596,9 +597,13 @@ void errorSourceAutoCompleteMessage() { .then(() -> messages.forEach(m -> messageSink.next(m))) .expectNextCount(messagesToReceive) .verifyErrorSatisfies(throwable -> { - Assertions.assertTrue(throwable instanceof ServiceBusReceiverException); - final ServiceBusErrorSource actual = ((ServiceBusReceiverException) throwable).getErrorSource(); + Assertions.assertTrue(throwable instanceof ServiceBusException); + + ServiceBusException serviceBusException = (ServiceBusException) throwable; + final ServiceBusErrorSource actual = serviceBusException.getErrorSource(); + Assertions.assertEquals(ServiceBusErrorSource.COMPLETE, actual); + Assertions.assertEquals(ServiceBusFailureReason.MESSAGE_LOCK_LOST, serviceBusException.getReason()); }); } finally { receiver2.close(); @@ -623,19 +628,19 @@ void errorSourceOnReceiveMessage() { when(receivedMessage.getLockToken()).thenReturn(lockToken); when(receivedMessage.getLockedUntil()).thenReturn(expiration); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); - when(connection.createReceiveLink(anyString(), anyString(), any(ReceiveMode.class), any(), + when(connection.createReceiveLink(anyString(), anyString(), any(ServiceBusReceiveMode.class), any(), any(MessagingEntityType.class))).thenReturn(Mono.error(new AzureException("some receive link Error."))); // Act & Assert StepVerifier.create(receiver2.receiveMessages().take(1)) .verifyErrorSatisfies(throwable -> { - Assertions.assertTrue(throwable instanceof ServiceBusReceiverException); - final ServiceBusErrorSource actual = ((ServiceBusReceiverException) throwable).getErrorSource(); + Assertions.assertTrue(throwable instanceof ServiceBusException); + final ServiceBusErrorSource actual = ((ServiceBusException) throwable).getErrorSource(); Assertions.assertEquals(ServiceBusErrorSource.RECEIVE, actual); }); @@ -669,7 +674,7 @@ void settleMessageOnManagement(DispositionStatus dispositionStatus) { when(connection.getManagementNode(eq(ENTITY_PATH), eq(ENTITY_TYPE))) .thenReturn(Mono.just(managementNode)); - when(managementNode.receiveDeferredMessages(eq(ReceiveMode.PEEK_LOCK), isNull(), isNull(), argThat(arg -> { + when(managementNode.receiveDeferredMessages(eq(ServiceBusReceiveMode.PEEK_LOCK), isNull(), isNull(), argThat(arg -> { boolean foundFirst = false; boolean foundSecond = false; for (Long seq : arg) { @@ -792,7 +797,7 @@ void receiveIllegalOptions() { .connectionString(NAMESPACE_CONNECTION_STRING) .receiver() .topicName("baz").subscriptionName("bar") - .receiveMode(ReceiveMode.PEEK_LOCK); + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK); // Act & Assert @@ -810,7 +815,7 @@ void topicCorrectEntityPath() { .receiver() .topicName(topicName) .subscriptionName(subscriptionName) - .receiveMode(ReceiveMode.PEEK_LOCK) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); // Act @@ -1107,7 +1112,7 @@ void autoCompleteMessage() { final int numberOfEvents = 3; final List messages = getMessages(); final String lockToken = UUID.randomUUID().toString(); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, true); + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true); final ServiceBusReceiverAsyncClient receiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -1140,7 +1145,7 @@ void autoCompleteMessageSessionReceiver() { final String lockToken2 = "token2"; final String lockToken3 = "token3"; - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, null, true, "Some-Session", null); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, @@ -1184,6 +1189,6 @@ private List getMessages() { private static Stream errorSourceNoneOnSettlement() { return Stream.of( Arguments.of(DispositionStatus.COMPLETED, DeliveryStateType.Accepted, ServiceBusErrorSource.COMPLETE), - Arguments.of(DispositionStatus.ABANDONED, DeliveryStateType.Modified, ServiceBusErrorSource.ABANDONED)); + Arguments.of(DispositionStatus.ABANDONED, DeliveryStateType.Modified, ServiceBusErrorSource.ABANDON)); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index d4a28fa63a0cd..e43e5d3dd9ae3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -11,7 +11,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -821,14 +821,14 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde .buildClient().acceptSession(sessionId); this.receiveAndDeleteReceiverMono = Mono.fromCallable(() -> getSessionReceiverBuilder(false, entityType, entityIndex, sharedConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .buildClient().acceptSession(sessionId)); } else { this.receiver = getReceiverBuilder(false, entityType, entityIndex, sharedConnection) .buildClient(); this.receiveAndDeleteReceiver = getReceiverBuilder(false, entityType, entityIndex, sharedConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .buildClient(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 5f9c900459b91..4f2623993ec6a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -9,7 +9,7 @@ import com.azure.messaging.servicebus.models.CompleteOptions; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.DeferOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,7 +80,7 @@ void setup() { MockitoAnnotations.initMocks(this); when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); - when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, false)); + when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, null, false)); when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); client = new ServiceBusReceiverClient(asyncClient, OPERATION_TIMEOUT); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java index bdaad2e54e7eb..553ccc11a8456 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java @@ -6,7 +6,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.CreateMessageBatchOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; @@ -172,7 +172,7 @@ void viaQueueMessageSendTest() { .buildAsyncClient(); final ServiceBusReceiverAsyncClient destination1Receiver = getReceiverBuilder(useCredentials, entityType, destinationEntity, shareConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .disableAutoComplete() .buildAsyncClient(); @@ -257,7 +257,7 @@ void viaTopicMessageSendTest() { final ServiceBusReceiverAsyncClient destination1Receiver = getReceiverBuilder(useCredentials, entityType, destinationEntity, shareConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .disableAutoComplete() .buildAsyncClient(); @@ -515,7 +515,7 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde this.sender = getSenderBuilder(useCredentials, entityType, entityIndex, isSessionAware, sharedConnection) .buildAsyncClient(); this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, sharedConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .disableAutoComplete() .buildAsyncClient(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index f918ff65952d9..4ebb319329ec5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -9,8 +9,6 @@ import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; -import com.azure.core.amqp.exception.AmqpErrorCondition; -import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ConnectionOptions; @@ -243,7 +241,7 @@ void createBatchWhenSizeTooBig() { // Act & Assert StepVerifier.create(sender.createMessageBatch(options)) - .expectError(IllegalArgumentException.class) + .expectError(ServiceBusException.class) .verify(); } @@ -523,8 +521,25 @@ void sendMessagesListExceedSize() { // Act & Assert StepVerifier.create(sender.sendMessages(messages)) - .verifyErrorMatches(error -> error instanceof AmqpException - && ((AmqpException) error).getErrorCondition() == AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED); + .verifyErrorMatches(error -> error instanceof ServiceBusException + && ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED); + + verify(sendLink, never()).send(anyList()); + } + + @Test + void sendSingleMessageThatExceedsSize() { + // arrange + ServiceBusMessage message = TestUtils.getServiceBusMessages(1, UUID.randomUUID().toString()).get(0); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + .thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(1)); + + // Act & Assert + StepVerifier.create(sender.sendMessage(message)) + .verifyErrorMatches(error -> error instanceof ServiceBusException + && ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED); verify(sendLink, never()).send(anyList()); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java index ffe1f8190a4a7..7bf763cdce80e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java @@ -6,7 +6,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.CreateMessageBatchOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; @@ -183,7 +183,7 @@ void setSenderAndReceiver(MessagingEntityType entityType, int entityIndex) { .buildClient(); receiver = getBuilder().receiver() .queueName(queueName) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .buildAsyncClient(); break; case SUBSCRIPTION: @@ -199,7 +199,7 @@ void setSenderAndReceiver(MessagingEntityType entityType, int entityIndex) { receiver = getBuilder().receiver() .topicName(topicName) .subscriptionName(subscriptionName) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .buildAsyncClient(); break; default: diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index 032facf49d257..dcf99abbdb04d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -19,7 +19,7 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; @@ -155,7 +155,7 @@ void afterEach(TestInfo testInfo) { @Test void receiveNull() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); + ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -171,7 +171,7 @@ void receiveNull() { @Test void singleUnnamedSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, + ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -196,7 +196,7 @@ void singleUnnamedSession() { .thenAnswer(invocation -> Mono.just(sessionLockedUntil)); when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty()); - when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink)); when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn( @@ -224,7 +224,7 @@ void singleUnnamedSession() { @Test void multipleSessions() { // Arrange - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -270,7 +270,7 @@ void multipleSessions() { when(amqpReceiveLink2.updateDisposition(lockToken2, Accepted.getInstance())).thenReturn(Mono.empty()); final AtomicInteger count = new AtomicInteger(); - when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { @@ -348,7 +348,7 @@ void multipleReceiveUnnamedSession() { // Arrange final int expectedLinksCreated = 2; final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); - final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, + final ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, 1); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, @@ -377,7 +377,7 @@ void multipleReceiveUnnamedSession() { when(amqpReceiveLink2.getSessionLockedUntil()).thenReturn(Mono.fromCallable(onRenewal)); final AtomicInteger count = new AtomicInteger(); - when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { @@ -401,7 +401,8 @@ void multipleReceiveUnnamedSession() { .thenCancel() .verify(); - verify(connection, times(2)).createReceiveLink(linkNameCaptor.capture(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + verify(connection, times(2)).createReceiveLink(linkNameCaptor.capture(), eq(ENTITY_PATH), any( + ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), isNull()); final List actualLinksCreated = linkNameCaptor.getAllValues(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java index fd1b45221e5c2..50e48096f98ff 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java @@ -19,7 +19,7 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; @@ -146,7 +146,7 @@ void afterEach(TestInfo testInfo) { @Test void acceptSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); + ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); final String lockToken = "a-lock-token"; final String linkName = "my-link-name"; final String sessionId = linkName; @@ -167,7 +167,7 @@ void acceptSession() { .thenAnswer(invocation -> Mono.just(sessionLockedUntil)); when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty()); - when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink)); ServiceBusSessionReceiverAsyncClient client = new ServiceBusSessionReceiverAsyncClient( @@ -196,7 +196,7 @@ void acceptSession() { @Test void acceptNextSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); + ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -241,7 +241,7 @@ void acceptNextSession() { when(amqpReceiveLink2.updateDisposition(lockToken2, Accepted.getInstance())).thenReturn(Mono.empty()); final AtomicInteger count = new AtomicInteger(); - when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(), any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { final int number = count.getAndIncrement(); switch (number) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityHelperTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityServiceBusExceptionTestHelperTest.java similarity index 99% rename from sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityHelperTest.java rename to sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityServiceBusExceptionTestHelperTest.java index 6cb2eda76868f..5026af3d10bd7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityHelperTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/EntityServiceBusExceptionTestHelperTest.java @@ -22,7 +22,7 @@ /** * {@link EntityHelper} tests. */ -class EntityHelperTest { +class EntityServiceBusExceptionTestHelperTest { @Test void createTopic() { // Arrange diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ManagementChannelTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ManagementChannelTests.java index 9a9360bdd994f..c6afe1309387b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ManagementChannelTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ManagementChannelTests.java @@ -3,16 +3,15 @@ package com.azure.messaging.servicebus.implementation; -import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseChannel; import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.ServiceBusTransactionContext; +import com.azure.messaging.servicebus.*; import com.azure.messaging.servicebus.models.DeadLetterOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -42,6 +41,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; +import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Date; import java.util.HashMap; @@ -88,6 +88,7 @@ class ManagementChannelTests { // Mocked response values from the RequestResponseChannel. private final Message responseMessage = Proton.message(); private final Map applicationProperties = new HashMap<>(); + private AmqpResponseCode authorizationResponseCode; private ManagementChannel managementChannel; @@ -118,9 +119,11 @@ void setup(TestInfo testInfo) { MockitoAnnotations.initMocks(this); + authorizationResponseCode = AmqpResponseCode.OK; + Flux results = Flux.create(sink -> sink.onRequest(requested -> { logger.info("Requested {} authorization results.", requested); - sink.next(AmqpResponseCode.OK); + sink.next(authorizationResponseCode); })); applicationProperties.put(STATUS_CODE_KEY, AmqpResponseCode.OK.getValue()); @@ -431,28 +434,86 @@ void updateDispositionWithTransaction() { void unauthorized() { // Arrange final String sessionId = "A session-id"; - applicationProperties.put(STATUS_CODE_KEY, AmqpResponseCode.NOT_FOUND.getValue()); + authorizationResponseCode = AmqpResponseCode.UNAUTHORIZED; // Act & Assert StepVerifier.create(managementChannel.getSessionState(sessionId, LINK_NAME)) .expectErrorSatisfies(error -> { - assertTrue(error instanceof AmqpException); - assertFalse(((AmqpException) error).isTransient()); + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); }) .verify(); + + StepVerifier.create(managementChannel.renewMessageLock(sessionId, LINK_NAME)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); + + StepVerifier.create(managementChannel.renewMessageLock(sessionId, LINK_NAME)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); + + StepVerifier.create(managementChannel.renewSessionLock(sessionId, LINK_NAME)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); + + StepVerifier.create(managementChannel.setSessionState(sessionId, new byte[0], LINK_NAME)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); + + StepVerifier.create(managementChannel.schedule(new ArrayList<>(), OffsetDateTime.now(), 1, LINK_NAME, null)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); + + StepVerifier.create(managementChannel.updateDisposition(UUID.randomUUID().toString(), + DispositionStatus.ABANDONED, "", "", + null, sessionId, LINK_NAME, null)) + .expectErrorSatisfies(error -> { + assertTrue(error instanceof ServiceBusException); + assertEquals(ServiceBusErrorSource.MANAGEMENT, ServiceBusExceptionTestHelper.getInternalErrorSource((ServiceBusException) error)); + assertEquals(ServiceBusFailureReason.UNAUTHORIZED, ((ServiceBusException) error).getReason()); + assertFalse(((ServiceBusException) error).isTransient()); + }) + .verify(); } @Test void getDeferredMessagesWithEmptyArrayReturnsAnEmptyFlux() { // Arrange, act, assert - StepVerifier.create(managementChannel.receiveDeferredMessages(ReceiveMode.PEEK_LOCK, null, null, new ArrayList<>())) + StepVerifier.create(managementChannel.receiveDeferredMessages(ServiceBusReceiveMode.PEEK_LOCK, null, null, new ArrayList<>())) .verifyComplete(); } @Test void getDeferredMessagesWithNullThrows() { // Arrange, act, assert - StepVerifier.create(managementChannel.receiveDeferredMessages(ReceiveMode.PEEK_LOCK, null, null, null)) + StepVerifier.create(managementChannel.receiveDeferredMessages(ServiceBusReceiveMode.PEEK_LOCK, null, null, null)) .verifyError(NullPointerException.class); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 237638dd18d0c..255fdb67d90d2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -7,7 +7,7 @@ import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -92,7 +92,7 @@ static void afterAll() { void setup() { MockitoAnnotations.initMocks(this); - linkProcessor = new ServiceBusReceiveLinkProcessor(PREFETCH, retryPolicy, ReceiveMode.PEEK_LOCK); + linkProcessor = new ServiceBusReceiveLinkProcessor(PREFETCH, retryPolicy, ServiceBusReceiveMode.PEEK_LOCK); when(link1.getEndpointStates()).thenReturn(endpointProcessor); when(link1.receive()).thenReturn(messageProcessor); @@ -106,9 +106,9 @@ void teardown() { @Test void constructor() { assertThrows(NullPointerException.class, () -> new ServiceBusReceiveLinkProcessor(PREFETCH, null, - ReceiveMode.PEEK_LOCK)); + ServiceBusReceiveMode.PEEK_LOCK)); assertThrows(IllegalArgumentException.class, () -> new ServiceBusReceiveLinkProcessor(-1, retryPolicy, - ReceiveMode.PEEK_LOCK)); + ServiceBusReceiveMode.PEEK_LOCK)); assertThrows(NullPointerException.class, () -> new ServiceBusReceiveLinkProcessor(PREFETCH, retryPolicy, null)); } diff --git a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusConfiguration.java b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusConfiguration.java index e5324e5c39de0..d9a9b50ffa88d 100644 --- a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusConfiguration.java +++ b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusConfiguration.java @@ -6,12 +6,12 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient; import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import static com.azure.messaging.servicebus.models.ReceiveMode.PEEK_LOCK; +import static com.azure.messaging.servicebus.models.ServiceBusReceiveMode.PEEK_LOCK; @Configuration @EnableConfigurationProperties(ServiceBusProperties.class) @@ -53,7 +53,7 @@ public ServiceBusSenderAsyncClient topicSender() { @Bean public ServiceBusReceiverAsyncClient topicSubscriber() { - final ReceiveMode subscriptionReceiveMode = properties.getSubscriptionReceiveMode(); + final ServiceBusReceiveMode subscriptionReceiveMode = properties.getSubscriptionReceiveMode(); return new ServiceBusClientBuilder() .connectionString(properties.getConnectionString()) diff --git a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusProperties.java b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusProperties.java index 3b11e6bc11c29..dcb19e8ea58a1 100644 --- a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusProperties.java +++ b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/src/main/java/com/azure/spring/sample/servicebus/ServiceBusProperties.java @@ -3,7 +3,7 @@ package com.azure.spring.sample.servicebus; -import com.azure.messaging.servicebus.models.ReceiveMode; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; @@ -26,7 +26,7 @@ public class ServiceBusProperties { /** * Queue receive mode. */ - private ReceiveMode queueReceiveMode; + private ServiceBusReceiveMode queueReceiveMode; /** * Topic name. Entity path of the topic. @@ -41,7 +41,7 @@ public class ServiceBusProperties { /** * Subscription receive mode. */ - private ReceiveMode subscriptionReceiveMode; + private ServiceBusReceiveMode subscriptionReceiveMode; public String getConnectionString() { return connectionString; @@ -59,11 +59,11 @@ public void setQueueName(String queueName) { this.queueName = queueName; } - public ReceiveMode getQueueReceiveMode() { + public ServiceBusReceiveMode getQueueReceiveMode() { return queueReceiveMode; } - public void setQueueReceiveMode(ReceiveMode queueReceiveMode) { + public void setQueueReceiveMode(ServiceBusReceiveMode queueReceiveMode) { this.queueReceiveMode = queueReceiveMode; } @@ -83,11 +83,11 @@ public void setSubscriptionName(String subscriptionName) { this.subscriptionName = subscriptionName; } - public ReceiveMode getSubscriptionReceiveMode() { + public ServiceBusReceiveMode getSubscriptionReceiveMode() { return subscriptionReceiveMode; } - public void setSubscriptionReceiveMode(ReceiveMode subscriptionReceiveMode) { + public void setSubscriptionReceiveMode(ServiceBusReceiveMode subscriptionReceiveMode) { this.subscriptionReceiveMode = subscriptionReceiveMode; } }