Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service bus] Use ServiceBusException rather than AmqpException #17601

Merged
merged 19 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5ebd1de
Changing over to using ServiceBusException
richardpark-msft Nov 13, 2020
1a2b7c1
Removing import collapse.
richardpark-msft Nov 17, 2020
dcfd301
Fixing verify errors.
richardpark-msft Nov 17, 2020
8e965d9
Fixing verify errors.
richardpark-msft Nov 17, 2020
810a107
Pass in an entire context object, rather than just the throwable.
richardpark-msft Nov 17, 2020
1edca1c
Fixing more verify issues
richardpark-msft Nov 17, 2020
185cd59
Fixing more verify issues
richardpark-msft Nov 17, 2020
9459733
Make errorSource a package private field and add in a helper to extra…
richardpark-msft Nov 17, 2020
32c48aa
Make the ServiceBusErrorContext final (and rename from ServiceBusProc…
richardpark-msft Nov 17, 2020
44bcc36
Moving the location of the onErrorMap so I don't accidentally create …
richardpark-msft Nov 17, 2020
654909c
Remove lame comment.
richardpark-msft Nov 17, 2020
649179c
Rename ReceiveMode to ServiceBusReceiveMode
richardpark-msft Nov 17, 2020
b51d5fa
* Doing some renames for consistency (SENDER -> SEND, ABANDONED -> AB…
richardpark-msft Nov 17, 2020
962cc32
* Be idiomatic with the naming of SERVICE_COMMUNICATION_PROBLEM and c…
richardpark-msft Nov 17, 2020
5be5cab
Fixing formatting errors caught by verify
richardpark-msft Nov 17, 2020
9a84432
Fixing compile errors with the perf tests from changing the receive m…
richardpark-msft Nov 17, 2020
a6926d5
Missed one.
richardpark-msft Nov 17, 2020
de7ce77
Updating to use the new ServiceBusReceiveMode name rather than just R…
richardpark-msft Nov 17, 2020
4113891
Missed one
richardpark-msft Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public final class ServiceBusSessionProcessorClientBuilder {
private final ServiceBusProcessorClientOptions processorClientOptions;
private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusProcessErrorContext> processError;

private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
Expand Down Expand Up @@ -750,7 +750,8 @@ public ServiceBusSessionProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusSessionProcessorClientBuilder processError(
Consumer<ServiceBusProcessErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down Expand Up @@ -1095,7 +1096,7 @@ public final class ServiceBusProcessorClientBuilder {
private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder;
private final ServiceBusProcessorClientOptions processorClientOptions;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusProcessErrorContext> processError;

private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
Expand Down Expand Up @@ -1184,7 +1185,7 @@ public ServiceBusProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusProcessErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,12 @@ public final class ServiceBusErrorSource extends ExpandableStringEnum<ServiceBus
/** Error while session is closed.*/
public static final ServiceBusErrorSource CLOSE_SESSION = fromString("CLOSE_SESSION",
ServiceBusErrorSource.class);

/** Error while sending a message.*/
public static final ServiceBusErrorSource SENDING = fromString("SENDING",
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
ServiceBusErrorSource.class);

/** Error while trying to do an operation on the management link. */
public static final ServiceBusErrorSource MANAGEMENT = fromString("MANAGEMENT",
ServiceBusErrorSource.class);
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.exception.AzureException;

/**
* Defines {@link ServiceBusException} which has additional information about the operation that caused the
* error.
* @see ServiceBusErrorSource
*/
public final class ServiceBusException extends AzureException {
private final transient ServiceBusErrorSource errorSource;
private final transient ServiceBusFailureReason reason;
private final boolean isTransient;

/**
* @param throwable for the error happened.
* @param errorSource indicating which api caused the error.
*/
public ServiceBusException(Throwable throwable, ServiceBusErrorSource errorSource) {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
super(throwable.getMessage(), throwable);
this.errorSource = errorSource;

if (throwable instanceof AmqpException) {
AmqpException amqpException = (AmqpException) throwable;
reason = getServiceBusFailureReasonFromException(amqpException);
isTransient = amqpException.isTransient();
} else {
reason = ServiceBusFailureReason.GENERAL_ERROR;
isTransient = false;
}
}

/**
* Gets the {@link ServiceBusErrorSource} in case of any errors.
* @return the {@link ServiceBusErrorSource}
*/
public ServiceBusErrorSource getErrorSource() {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
return errorSource;
}

/**
* Gets the {@link ServiceBusFailureReason} in case of any errors.
* @return the {@link ServiceBusFailureReason}
*/
public ServiceBusFailureReason getReason() {
return reason;
}

/**
* A boolean indicating if the exception is a transient error or not.
*
* @return returns true when user can retry the operation that generated the exception without additional
* intervention.
*/
public boolean isTransient() {
return isTransient;
}

private ServiceBusFailureReason getServiceBusFailureReasonFromException(AmqpException throwable) {
final AmqpErrorCondition errorCondition = throwable.getErrorCondition();

if (errorCondition == null) {
return ServiceBusFailureReason.GENERAL_ERROR;
}

switch (errorCondition) {
case NOT_FOUND:
return ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND;
case MESSAGE_LOCK_LOST:
return ServiceBusFailureReason.MESSAGE_LOCK_LOST;
case MESSAGE_NOT_FOUND:
return ServiceBusFailureReason.MESSAGE_NOT_FOUND;
case LINK_PAYLOAD_SIZE_EXCEEDED:
return ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED;
case ENTITY_ALREADY_EXISTS:
return ServiceBusFailureReason.MESSAGING_ENTITY_ALREADY_EXISTS;
case ENTITY_DISABLED_ERROR:
return ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED;
case RESOURCE_LIMIT_EXCEEDED:
return ServiceBusFailureReason.QUOTA_EXCEEDED;
case SERVER_BUSY_ERROR:
return ServiceBusFailureReason.SERVICE_BUSY;
case TIMEOUT_ERROR:
return ServiceBusFailureReason.SERVICE_TIMEOUT;
case SESSION_CANNOT_BE_LOCKED:
return ServiceBusFailureReason.SESSION_CANNOT_BE_LOCKED;
case SESSION_LOCK_LOST:
return ServiceBusFailureReason.SESSION_LOCK_LOST;
case UNAUTHORIZED_ACCESS:
return ServiceBusFailureReason.UNAUTHORIZED;
case PROTON_IO: // does this mapping make sense?
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
default:
return ServiceBusFailureReason.GENERAL_ERROR;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.ExpandableStringEnum;

/**
* The set of well-known reasons for an Service Bus operation failure that was the cause of an exception.
*/
public final class ServiceBusFailureReason extends ExpandableStringEnum<ServiceBusFailureReason> {
// NOTE: this list is intended to mirror the reasons we have in .net
// https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging
// .ServiceBus/src/Primitives/ServiceBusFailureReason.cs

/** The exception was the result of a general error within the client library. */
public static final ServiceBusFailureReason GENERAL_ERROR = fromString("GENERAL_ERROR",
ServiceBusFailureReason.class);

/** The lock on the message is lost. Callers should call attempt to receive and process the message again. */
public static final ServiceBusFailureReason MESSAGE_LOCK_LOST = fromString("MESSAGE_LOCK_LOST",
ServiceBusFailureReason.class);

/** The requested message was not found. */
public static final ServiceBusFailureReason MESSAGE_NOT_FOUND = fromString("MESSAGE_NOT_FOUND",
ServiceBusFailureReason.class);

/** A message is larger than the maximum size allowed for its transport. */
public static final ServiceBusFailureReason MESSAGE_SIZE_EXCEEDED = fromString("MESSAGE_SIZE_EXCEEDED",
ServiceBusFailureReason.class);

/** An entity with the same name exists under the same namespace. */
public static final ServiceBusFailureReason MESSAGING_ENTITY_ALREADY_EXISTS = fromString(
"MESSAGING_ENTITY_ALREADY_EXISTS", ServiceBusFailureReason.class);

/** The Messaging Entity is disabled. Enable the entity again using Portal. */
public static final ServiceBusFailureReason MESSAGING_ENTITY_DISABLED = fromString("MESSAGING_ENTITY_DISABLED",
ServiceBusFailureReason.class);

/** A Service Bus resource cannot be found by the Service Bus service. */
public static final ServiceBusFailureReason MESSAGING_ENTITY_NOT_FOUND = fromString("MESSAGING_ENTITY_NOT_FOUND",
ServiceBusFailureReason.class);

/** The quota applied to an Service Bus resource has been exceeded while interacting with the Azure Service Bus
* service. */
public static final ServiceBusFailureReason QUOTA_EXCEEDED = fromString("QUOTA_EXCEEDED",
ServiceBusFailureReason.class);

/** The Azure Service Bus service reports that it is busy in response to a client request to perform an operation
* . */
public static final ServiceBusFailureReason SERVICE_BUSY = fromString("SERVICE_BUSY",
ServiceBusFailureReason.class);

/** An operation or other request timed out while interacting with the Azure Service Bus service. */
public static final ServiceBusFailureReason SERVICE_TIMEOUT = fromString("SERVICE_TIMEOUT",
ServiceBusFailureReason.class);

/** There was a general communications error encountered when interacting with the Azure Service Bus service. */
public static final ServiceBusFailureReason SERVICE_COMMUNICATION_PROBLEM = fromString(
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
"SERVICE_COMMUNICATION_PROBLEM", ServiceBusFailureReason.class);

/** The requested session cannot be locked. */
public static final ServiceBusFailureReason SESSION_CANNOT_BE_LOCKED = fromString("SESSION_CANNOT_BE_LOCKED",
ServiceBusFailureReason.class);

/** The lock on the session has expired. Callers should request the session again. */
public static final ServiceBusFailureReason SESSION_LOCK_LOST = fromString("SESSION_LOCK_LOST",
ServiceBusFailureReason.class);

/** The user doesn't have access to the entity. */
public static final ServiceBusFailureReason UNAUTHORIZED = fromString("UNAUTHORIZED",
ServiceBusFailureReason.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
try {
size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty());
} catch (BufferOverflowException exception) {
throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb",
maxMessageSize / 1024),
contextProvider.getErrorContext()));
final RuntimeException ex = new ServiceBusException(
new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb",
maxMessageSize / 1024), contextProvider.getErrorContext()), ServiceBusErrorSource.SENDING);

throw logger.logExceptionAsWarning(ex);
}

synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

/**
* Context for errors handled by the Service Bus processor.
*/
public class ServiceBusProcessErrorContext {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
private final Throwable exception;
private final ServiceBusErrorSource errorSource;
private final String fullyQualifiedNamespace;
private final String entityPath;

ServiceBusProcessErrorContext(Throwable throwable, String fullyQualifiedNamespace, String entityPath) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need fullyQualifiedNamespace and entityPath? If this is needed for the error context, we should consider adding it to the ServiceBusReceivedMessageContext too? I know Event Hubs had these properties on both event and error context. So, we should consider adding it to both places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just mimicking what's already in .net, so this is more a question for @JoshLove-msft .

this.exception = throwable;
this.fullyQualifiedNamespace = fullyQualifiedNamespace;
this.entityPath = entityPath;

if (throwable instanceof ServiceBusException) {
final ServiceBusException serviceBusException = ((ServiceBusException) throwable);
this.errorSource = serviceBusException.getErrorSource();
} else {
this.errorSource = ServiceBusErrorSource.RECEIVE;
}
}

/**
* Gets the exception that triggered the call to the error event handler.
* @return The exception that triggered the call to the error event handler.
*/
public Throwable getException() {
return exception;
}

/**
* Gets the source associated with the error.
* @return The source associated with the error.
*/
public ServiceBusErrorSource getErrorSource() {
return errorSource;
}

/**
* Gets the namespace name associated with the error event.
* @return The namespace name associated with the error event.
*/
public String getFullyQualifiedNamespace() {
return fullyQualifiedNamespace;
}

/**
* Gets the entity path associated with the error event.
* @return The entity path associated with the error event.
*/
public String getEntityPath() {
return entityPath;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
private final Consumer<ServiceBusReceivedMessageContext> processMessage;
private final Consumer<Throwable> processError;
private final Consumer<ServiceBusProcessErrorContext> processError;
private final ServiceBusProcessorClientOptions processorOptions;
private final AtomicReference<Subscription> receiverSubscription = new AtomicReference<>();
private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
Expand All @@ -55,8 +55,9 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<Throwable> processError, ServiceBusProcessorClientOptions processorOptions) {
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusProcessErrorContext> processError,
ServiceBusProcessorClientOptions processorOptions) {
this.sessionReceiverBuilder = Objects.requireNonNull(sessionReceiverBuilder,
"'sessionReceiverBuilder' cannot be null");
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
Expand All @@ -75,8 +76,8 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
* @param processorOptions Options to configure this instance of the processor.
*/
ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder,
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<Throwable> processError, ServiceBusProcessorClientOptions processorOptions) {
Consumer<ServiceBusReceivedMessageContext> processMessage,
Consumer<ServiceBusProcessErrorContext> processError, ServiceBusProcessorClientOptions processorOptions) {
this.receiverBuilder = Objects.requireNonNull(receiverBuilder, "'receiverBuilder' cannot be null");
this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null");
this.processError = Objects.requireNonNull(processError, "'processError' cannot be null");
Expand Down Expand Up @@ -168,7 +169,7 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
handleError(new ServiceBusReceiverException(ex, ServiceBusErrorSource.USER_CALLBACK));
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));
if (!processorOptions.isDisableAutoComplete()) {
logger.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
Expand Down Expand Up @@ -211,7 +212,10 @@ private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext,

private void handleError(Throwable throwable) {
try {
processError.accept(throwable);
ServiceBusReceiverAsyncClient client = asyncClient.get();
final String fullyQualifiedNamespace = client.getFullyQualifiedNamespace();
final String entityPath = client.getEntityPath();
processError.accept(new ServiceBusProcessErrorContext(throwable, fullyQualifiedNamespace, entityPath));
} catch (Exception ex) {
logger.verbose("Error from error handler. Ignoring error.", ex);
}
Expand Down
Loading