From ebb2985d450cccd382f34f78a4eedc58727ad203 Mon Sep 17 00:00:00 2001 From: Anu Thomas Date: Fri, 17 Feb 2023 12:57:31 -0800 Subject: [PATCH 1/4] Adding ReceiverUnsettledDeliveries type dedicated for disposition handlings and tests --- .../resources/spotbugs/spotbugs-exclude.xml | 16 + .../implementation/handler/LinkHandler.java | 6 +- .../handler/ReceiverUnsettledDeliveries.java | 730 ++++++++++++++++++ ...ceiverUnsettledDeliveriesIsolatedTest.java | 121 +++ .../ReceiverUnsettledDeliveriesTest.java | 435 +++++++++++ 5 files changed, 1307 insertions(+), 1 deletion(-) create mode 100644 sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index b8345a62bfbe6..125535e037f6f 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -2328,6 +2328,22 @@ + + + + + + + + + + + + + + diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java index 3acb046c9dc0d..df89a68447d9e 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java @@ -85,6 +85,10 @@ public void onLinkFinal(Event event) { } public AmqpErrorContext getErrorContext(Link link) { + return getErrorContext(getHostname(), entityPath, link); + } + + static AmqpErrorContext getErrorContext(String hostName, String entityPath, Link link) { final String referenceId; if (link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(TRACKING_ID_PROPERTY)) { referenceId = link.getRemoteProperties().get(TRACKING_ID_PROPERTY).toString(); @@ -92,7 +96,7 @@ public AmqpErrorContext getErrorContext(Link link) { referenceId = link.getName(); } - return new LinkErrorContext(getHostname(), entityPath, referenceId, link.getCredit()); + return new LinkErrorContext(hostName, entityPath, referenceId, link.getCredit()); } private void handleRemoteLinkClosed(final String eventName, final Event event) { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java new file mode 100644 index 0000000000000..9d6e9d07bbe82 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java @@ -0,0 +1,730 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation.handler; + +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.ExceptionUtil; +import com.azure.core.amqp.implementation.ReactorDispatcher; +import com.azure.core.amqp.implementation.RetryUtil; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import reactor.core.Disposable; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.azure.core.amqp.implementation.ClientConstants.DELIVERY_STATE_KEY; +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Manages the received deliveries which are not settled on the broker. The client can later request settlement + * of each delivery by sending a disposition frame with a state representing the desired-outcome, + * which the application wishes to occur at the broker. The broker acknowledges this with a disposition frame + * with a state (a.k.a. remote-state) representing the actual outcome (a.k.a. remote-outcome) of any work + * the broker performed upon processing the settlement request and a flag (a.k.a. remotely-settled) indicating + * whether the broker settled the delivery. + */ +public final class ReceiverUnsettledDeliveries implements AutoCloseable { + // Ideally value of this const should be 'deliveryTag' but given the only use case today is as Service Bus + // LockToken, while logging, use the value 'lockToken' to ease log parsing. + // (TODO: anuchan; consider parametrizing the value of deliveryTag?). + private static final String DELIVERY_TAG_KEY = "lockToken"; + private final AtomicBoolean isTerminated = new AtomicBoolean(); + private final String hostName; + private final String entityPath; + private final String receiveLinkName; + private final ReactorDispatcher dispatcher; + private final AmqpRetryPolicy retryPolicy; + private final Duration timeout; + private final UUID deliveryEmptyTag; + private final ClientLogger logger; + private final Disposable timoutTimer; + + // The deliveries received, for those the application haven't sent disposition frame to the broker requesting + // settlement or disposition frame is sent, but yet to receive acknowledgment disposition frame from + // the broker indicating the outcome (a.k.a. remote-outcome). + private final ConcurrentHashMap deliveries = new ConcurrentHashMap<>(); + // A collection of work, where each work representing the disposition frame that the application sent, + // waiting to receive an acknowledgment disposition frame from the broker indicating the outcome + // (a.k.a. remote-outcome). + private final ConcurrentHashMap pendingDispositions = new ConcurrentHashMap<>(); + + /** + * Creates ReceiverUnsettledDeliveries. + * + * @param hostName the name of the host hosting the messaging entity identified by {@code entityPath}. + * @param entityPath the relative path identifying the messaging entity from which the deliveries are + * received from, the application can later disposition these deliveries by sending + * disposition frames to the broker. + * @param receiveLinkName the name of the amqp receive-link 'Attach'-ed to the messaging entity from + * which the deliveries are received from. + * @param dispatcher the dispatcher to invoke the ProtonJ library API to send disposition frame. + * @param retryOptions the retry configuration to use when resending a disposition frame that the broker 'Rejected'. + * @param deliveryEmptyTag reference to static UUID indicating absence of delivery tag in deliveries. + * @param logger the logger. + */ + public ReceiverUnsettledDeliveries(String hostName, String entityPath, String receiveLinkName, ReactorDispatcher dispatcher, + AmqpRetryOptions retryOptions, UUID deliveryEmptyTag, ClientLogger logger) { + this.hostName = hostName; + this.entityPath = entityPath; + this.receiveLinkName = receiveLinkName; + this.dispatcher = dispatcher; + this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions); + this.timeout = retryOptions.getTryTimeout(); + this.deliveryEmptyTag = deliveryEmptyTag; + this.logger = logger; + this.timoutTimer = Flux.interval(timeout).subscribe(__ -> completeUncompletedDispositionWorksOnTimeout("timer")); + } + + /** + * Function to notify a received delivery that is unsettled on the broker side; the application can later use + * {@link ReceiverUnsettledDeliveries#sendDisposition(String, DeliveryState)} to send a disposition frame requesting + * settlement of this delivery at the broker. + * + * @param deliveryTag the unique delivery tag associated with the {@code delivery}. + * @param delivery the delivery. + * @return {@code false} if the instance was closed upon notifying the delivery, {@code true} otherwise. + */ + public boolean onDelivery(UUID deliveryTag, Delivery delivery) { + if (isTerminated.get()) { + return false; + } else { + // Continue using putIfAbsent as legacy T1 library. + deliveries.putIfAbsent(deliveryTag.toString(), delivery); + return true; + } + } + + /** + * Check if a delivery with the given delivery tag was received. + * + * @param deliveryTag the delivery tag. + * @return {@code true} if delivery with the given delivery tag exists {@code false} otherwise. + */ + public boolean containsDelivery(UUID deliveryTag) { + return deliveryTag != deliveryEmptyTag && deliveries.containsKey(deliveryTag.toString()); + } + + /** + * Request settlement of delivery (with the unique {@code deliveryTag}) by sending a disposition frame + * with a state representing the desired-outcome, which the application wishes to occur at the broker. + * Disposition frame is sent via the same amqp receive-link that delivered the delivery, which was + * notified through {@link ReceiverUnsettledDeliveries#onDelivery(UUID, Delivery)}. + * + * @param deliveryTag the unique delivery tag identifying the delivery. + * @param desiredState The state to include in the disposition frame indicating the desired-outcome + * that the application wish to occur at the broker. + * @return the {@link Mono} upon subscription starts the work by requesting ProtonJ library to send + * disposition frame to settle the delivery on the broker, and this Mono terminates once the broker + * acknowledges with disposition frame indicating outcome (a.ka. remote-outcome). + * The Mono can terminate if the configured timeout elapses or cannot initiate the request to ProtonJ + * library. + */ + public Mono sendDisposition(String deliveryTag, DeliveryState desiredState) { + if (isTerminated.get()) { + return monoError(logger, new IllegalStateException("Cannot perform sendDisposition on a disposed receiver.")); + } else { + return sendDispositionImpl(deliveryTag, desiredState); + } + } + + /** + * The function to notify the broker's acknowledgment in response to a disposition frame sent to the broker + * via {@link ReceiverUnsettledDeliveries#sendDisposition(String, DeliveryState)}. + * The broker acknowledgment is also a disposition frame; the ProtonJ library will map this disposition + * frame to the same Delivery in-memory object for which the application requested disposition. + * As part of mapping, the remote-state (representing remote-outcome) and is-remotely-settled (boolean) + * property of the Delivery object is updated from the disposition frame ack. + * + * @param deliveryTag the unique delivery tag of the delivery that application requested disposition. + * @param delivery the delivery object updated from the broker's transfer frame ack. + */ + public void onDispositionAck(UUID deliveryTag, Delivery delivery) { + final DeliveryState remoteState = delivery.getRemoteState(); + + logger.atVerbose() + .addKeyValue(DELIVERY_TAG_KEY, deliveryTag) + .addKeyValue(DELIVERY_STATE_KEY, remoteState) + .log("Received update disposition delivery."); + + final Outcome remoteOutcome; + if (remoteState instanceof Outcome) { + remoteOutcome = (Outcome) remoteState; + } else if (remoteState instanceof TransactionalState) { + remoteOutcome = ((TransactionalState) remoteState).getOutcome(); + } else { + remoteOutcome = null; + } + + if (remoteOutcome == null) { + logger.atWarning() + .addKeyValue(DELIVERY_TAG_KEY, deliveryTag) + .addKeyValue("delivery", delivery) + .log("No outcome associated with delivery."); + + return; + } + + final DispositionWork work = pendingDispositions.get(deliveryTag.toString()); + if (work == null) { + logger.atWarning() + .addKeyValue(DELIVERY_TAG_KEY, deliveryTag) + .addKeyValue("delivery", delivery) + .log("No pending update for delivery."); + return; + } + + // the outcome that application desired. + final DeliveryStateType desiredOutcomeType = work.getDesiredState().getType(); + // the outcome that broker actually attained. + final DeliveryStateType remoteOutcomeType = remoteState.getType(); + + if (desiredOutcomeType == remoteOutcomeType) { + completeDispositionWorkWithSettle(work, delivery, null); + } else { + logger.atInfo() + .addKeyValue(DELIVERY_TAG_KEY, deliveryTag) + .addKeyValue("receivedDeliveryState", remoteState) + .addKeyValue(DELIVERY_STATE_KEY, work.getDesiredState()) + .log("Received delivery state doesn't match expected state."); + + if (remoteOutcomeType == DeliveryStateType.Rejected) { + handleRetriableRejectedRemoteOutcome(work, delivery, (Rejected) remoteOutcome); + } else { + handleReleasedOrUnknownRemoteOutcome(work, delivery, remoteOutcome); + } + } + } + + /** + * Attempt any optional graceful (if possible) cleanup and terminate this {@link ReceiverUnsettledDeliveries}. + * The function eagerly times out any expired disposition work and waits for the completion or timeout of + * the remaining disposition. Finally, disposes of the timeout timer. Future attempts to notify deliveries or + * send delivery dispositions will be rejected. + *

+ * From the point of view of this function's call site, it is still possible that the receive-link and dispatcher + * may healthy, but not guaranteed. If healthy, send-receive of disposition frames are possible, enabling + * 'graceful' completion of works. + *

+ * e.g., if the user proactively initiates the closing of client, it is likely that the receive-link may be + * healthy. On the other hand, if the broker initiates the closing of the link, further frame transfer may not be + * possible. + * + * @return a {@link Mono} that completes upon the termination. + */ + public Mono attemptGracefulClose() { + isTerminated.getAndSet(true); + + // Complete timed out works if any + completeUncompletedDispositionWorksOnTimeout("preClose"); + + // then wait for the completion of all remaining (not timed out) works. + final List> workMonoList = new ArrayList<>(); + final StringJoiner deliveryTags = new StringJoiner(", "); + for (DispositionWork work : pendingDispositions.values()) { + if (work.hasTimedout()) { + continue; + } + if (work.getDesiredState() instanceof TransactionalState) { + final Mono workMono = sendDispositionImpl(work.getDeliveryTag(), Released.getInstance()); + workMonoList.add(workMono); + } else { + workMonoList.add(work.getMono()); + } + deliveryTags.add(work.getDeliveryTag()); + } + + final Mono workMonoListMerged; + if (!workMonoList.isEmpty()) { + logger.info("Waiting for pending updates to complete. Locks: {}", deliveryTags.toString()); + workMonoListMerged = Mono.whenDelayError(workMonoList) + .onErrorResume(error -> { + logger.info("There was exception(s) while disposing of all disposition work.", error); + return Mono.empty(); + }); + } else { + workMonoListMerged = Mono.empty(); + } + return workMonoListMerged.doFinally(__ -> timoutTimer.dispose()); + } + + /** + * Closes this {@link ReceiverUnsettledDeliveries} and force complete any uncompleted work. Future attempts + * to notify deliveries or send delivery dispositions will be rejected. + */ + @Override + public void close() { + isTerminated.getAndSet(true); + + // Disposes of subscription to the global interval timer. + timoutTimer.dispose(); + + // Note: Once disposition API support is enabled in ReceiveLinkHandler - this close() method should have + // logic to free the tracked QPID deliveries. The ReceiveLinkHandler will no longer track "ALL" QPID + // deliveries (using 'queuedDeliveries' set), because, the plan is, upon arrival of any delivery in + // reactor-thread, we will be draining the delivery buffer and settle those deliveries already settled + // by the broker, so we need settle only the deliveries in ReceiverUnsettledDeliveries.deliveries in this close. + + // Force complete all uncompleted works. + completeUncompletedDispositionWorksOnClose(); + } + + /** + * See the doc for {@link ReceiverUnsettledDeliveries#sendDisposition(String, DeliveryState)}. + * + * @param deliveryTag the unique delivery tag identifying the delivery. + * @param desiredState The state to include in the disposition frame indicating the desired-outcome + * that the application wish to occur at the broker. + * @return the {@link Mono} representing disposition work. + */ + private Mono sendDispositionImpl(String deliveryTag, DeliveryState desiredState) { + final Delivery delivery = deliveries.get(deliveryTag); + if (delivery == null) { + logger.atWarning() + .addKeyValue(DELIVERY_TAG_KEY, deliveryTag) + .log("Delivery not found to update disposition."); + + return monoError(logger, Exceptions.propagate(new IllegalArgumentException( + "Delivery not on receive link."))); + } + + final DispositionWork work = new DispositionWork(deliveryTag, desiredState, timeout); + + final Mono mono = Mono.create(sink -> { + work.onStart(sink); + try { + dispatcher.invoke(() -> { + delivery.disposition(desiredState); + if (pendingDispositions.putIfAbsent(deliveryTag, work) != null) { + work.onComplete(new AmqpException(false, + "A disposition requested earlier is waiting for the broker's ack; " + + "a new disposition request is not allowed.", + null)); + } + }); + } catch (IOException | RejectedExecutionException dispatchError) { + work.onComplete(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", + dispatchError, getErrorContext(delivery))); + } + }); + + work.setMono(mono); + + return work.getMono(); + } + + /** + * Handles the 'Rejected' outcome (in a disposition ack) from the broker in-response to a disposition frame + * application sent. + * + * @param work the work that sent the disposition frame with a desired-outcome which broker 'Rejected'. + * @param delivery the Delivery in-memory object for which the application had sent the disposition frame; + * the ProtonJ library updates the remote-state (representing remote-outcome) and + * is-remotely-settled (boolean) property of the Delivery object from the disposition frame ack. + * @param remoteOutcome the 'Rejected' remote-outcome describing the rejection reason, this is derived from + * the remote-state. + */ + private void handleRetriableRejectedRemoteOutcome(DispositionWork work, Delivery delivery, Rejected remoteOutcome) { + final AmqpErrorContext amqpErrorContext = getErrorContext(delivery); + final ErrorCondition errorCondition = remoteOutcome.getError(); + final Throwable error = ExceptionUtil.toException(errorCondition.getCondition().toString(), + errorCondition.getDescription(), amqpErrorContext); + + final Duration retry = retryPolicy.calculateRetryDelay(error, work.getTryCount()); + if (retry != null) { + work.onRetriableRejectedOutcome(error); + try { + dispatcher.invoke(() -> delivery.disposition(work.getDesiredState())); + } catch (IOException | RejectedExecutionException dispatchError) { + final Throwable amqpException = logger.atError() + .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()) + .log(new AmqpException(false, + String.format("linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", + receiveLinkName, work.getDeliveryTag()), + dispatchError, getErrorContext(delivery))); + + completeDispositionWorkWithSettle(work, delivery, amqpException); + } + } else { + logger.atInfo() + .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()) + .addKeyValue(DELIVERY_STATE_KEY, delivery.getRemoteState()) + .log("Retry attempts exhausted.", error); + + completeDispositionWorkWithSettle(work, delivery, error); + } + } + + /** + * Handles the 'Released' or unknown outcome (in a disposition ack) from the broker in-response to a disposition + * frame application sent. + * + * @param work the work that sent the disposition frame with a desired-outcome. + * @param delivery the Delivery in-memory object for which the application had sent the disposition frame; + * the ProtonJ library updates the remote-state (representing remote-outcome) and + * is-remotely-settled (boolean) property of the Delivery object from the disposition frame ack. + * @param remoteOutcome the remote-outcome from the broker describing the reason for broker choosing an outcome + * different from requested desired-outcome, this is derived from the remote-state. + */ + private void handleReleasedOrUnknownRemoteOutcome(DispositionWork work, Delivery delivery, Outcome remoteOutcome) { + final AmqpErrorContext amqpErrorContext = getErrorContext(delivery); + final AmqpException completionError; + + final DeliveryStateType remoteOutcomeType = delivery.getRemoteState().getType(); + if (remoteOutcomeType == DeliveryStateType.Released) { + completionError = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, + "AMQP layer unexpectedly aborted or disconnected.", amqpErrorContext); + } else { + completionError = new AmqpException(false, remoteOutcome.toString(), amqpErrorContext); + } + + logger.atInfo() + .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag()) + .addKeyValue(DELIVERY_STATE_KEY, delivery.getRemoteState()) + .log("Completing pending updateState operation with exception.", completionError); + + completeDispositionWorkWithSettle(work, delivery, completionError); + } + + /** + * Iterate through all the current {@link DispositionWork} and complete the work those are timed out. + */ + private void completeUncompletedDispositionWorksOnTimeout(String callSite) { + if (pendingDispositions.isEmpty()) { + return; + } + + final int[] completionCount = new int[1]; + final StringJoiner deliveryTags = new StringJoiner(", "); + + pendingDispositions.forEach((deliveryTag, work) -> { + if (work == null || !work.hasTimedout()) { + return; + } + + if (completionCount[0] == 0) { + logger.info("Starting completion of timed out disposition works (call site:{}).", callSite); + } + + final Throwable completionError; + if (work.getRejectedOutcomeError() != null) { + completionError = work.getRejectedOutcomeError(); + } else { + completionError = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, + "Update disposition request timed out.", getErrorContext(deliveries.get(work.getDeliveryTag()))); + } + deliveryTags.add(work.getDeliveryTag()); + completeDispositionWork(work, completionError); + completionCount[0]++; + }); + + if (completionCount[0] > 0) { + // The log help debug if the user code chained to the work-mono (DispositionWork::getMono()) never returns. + logger.info("Completed {} timed-out disposition works (call site:{}). Locks {}", + callSite, completionCount[0], deliveryTags.toString()); + } + } + + /** + * Iterate through all the {@link DispositionWork}, and 'force' to complete the uncompleted works because + * this {@link ReceiverUnsettledDeliveries} is closed. + */ + private void completeUncompletedDispositionWorksOnClose() { + // Note: Possible to have one function for cleaning both timeout and incomplete works, but readability + // seems to be affected, hence separate functions. + + if (pendingDispositions.isEmpty()) { + return; + } + + final int[] completionCount = new int[1]; + final StringJoiner deliveryTags = new StringJoiner(", "); + + final AmqpException completionError = new AmqpException(false, + "The receiver didn't receive the disposition acknowledgment due to receive link closure.", null); + + pendingDispositions.forEach((deliveryTag, work) -> { + if (work == null || work.isCompleted()) { + return; + } + + if (completionCount[0] == 0) { + logger.info("Starting completion of disposition works as part of receive link closure."); + } + + deliveryTags.add(work.getDeliveryTag()); + completeDispositionWork(work, completionError); + completionCount[0]++; + }); + + if (completionCount[0] > 0) { + // The log help debug if the user code chained to the work-mono (DispositionWork::getMono()) never returns. + logger.info("Completed {} disposition works as part of receive link closure. Locks {}", + completionCount[0], deliveryTags.toString()); + } + } + + /** + * Completes the given {@link DispositionWork}, which results in termination of the {@link Mono} returned + * from the {@link DispositionWork#getMono()} API. If the broker settled the {@link Delivery} associated + * with the work, it would also be locally settled. + *

+ * Invocations of this function are guaranteed to be serial, as all call sites originate from + * {@link ReceiverUnsettledDeliveries#onDispositionAck(UUID, Delivery)} running on the ProtonJ Reactor event-loop thread. + * + * @param work the work to complete. + * @param delivery the delivery that the work attempted the disposition, to be locally settled if the broker + * settled it on the remote end. + * @param completionError a null value indicates that the work has to complete successfully, otherwise complete + * the work with the error value. + */ + private void completeDispositionWorkWithSettle(DispositionWork work, Delivery delivery, Throwable completionError) { + // The operation ordering same as the T1 Lib: "delivery-settling -> work-completion -> work-delivery-removal". + + final boolean isRemotelySettled = delivery.remotelySettled(); + if (isRemotelySettled) { + delivery.settle(); + } + + if (completionError != null) { + final Throwable loggedError = completionError instanceof RuntimeException + ? logger.logExceptionAsError((RuntimeException) completionError) + : completionError; + work.onComplete(loggedError); + } else { + work.onComplete(); + } + + if (isRemotelySettled) { + final String deliveryTag = work.getDeliveryTag(); + pendingDispositions.remove(deliveryTag); + deliveries.remove(deliveryTag); + } + } + + /** + * Completes the given {@link DispositionWork} with error, which results in termination of the {@link Mono} + * returned from the {@link DispositionWork#getMono()} API. + * + * @param work the work to complete with error. + * @param completionError the non-null error value. + */ + private void completeDispositionWork(DispositionWork work, Throwable completionError) { + // The operation ordering same as the T1 Lib: "work-removal -> work-completion". + + pendingDispositions.remove(work.getDeliveryTag()); + + final Throwable loggedError = completionError instanceof RuntimeException + ? logger.logExceptionAsError((RuntimeException) completionError) + : completionError; + work.onComplete(loggedError); + } + + /** + * Gets the error context from the receive-link associated with the delivery. + * + * @param delivery the delivery. + * @return the error context from delivery's receive-link, {@code null} if the delivery or + * receive-link is {@code null}. + */ + private AmqpErrorContext getErrorContext(Delivery delivery) { + if (delivery == null || delivery.getLink() == null) { + return null; + } + return LinkHandler.getErrorContext(hostName, entityPath, delivery.getLink()); + } + + /** + * Represents a work that, upon starting, requests ProtonJ library to send a disposition frame to settle + * a delivery on the broker and the work completes when the broker acknowledges with a disposition frame + * indicating the outcome. The work can complete with an error if it cannot initiate the request + * to the ProtonJ library or the configured timeout elapses. + *

+ * The work is started once the application is subscribed to the {@link Mono} returned by + * {@link DispositionWork#getMono()}; the Mono is terminated upon the work completion. + */ + private static final class DispositionWork extends AtomicBoolean { + private final AtomicInteger tryCount = new AtomicInteger(1); + private final String deliveryTag; + private final DeliveryState desiredState; + private final Duration timeout; + private Mono mono; + private MonoSink monoSink; + private Instant expirationTime; + private Throwable rejectedOutcomeError; + + /** + * Create a DispositionWork. + * + * @param deliveryTag The delivery tag of the Delivery for which to send the disposition frame requesting + * delivery settlement on the broker. + * @param desiredState The state to include in the disposition frame indicating the desired-outcome + * the application wish to occur at the broker. + * @param timeout after requesting the ProtonJ library to send the disposition frame, how long to wait for + * an acknowledgment disposition frame to arrive from the broker. + */ + DispositionWork(String deliveryTag, DeliveryState desiredState, Duration timeout) { + this.deliveryTag = deliveryTag; + this.desiredState = desiredState; + this.timeout = timeout; + this.monoSink = null; + } + + /** + * Gets the delivery tag. + * + * @return the delivery tag. + */ + String getDeliveryTag() { + return deliveryTag; + } + + /** + * Gets the state indicating the desired-outcome which the application wishes to occur at the broker. + * The disposition frame send to the broker includes this desired state. + * + * @return the desired state. + */ + DeliveryState getDesiredState() { + return desiredState; + } + + /** + * Gets the number of times the work was tried. + * + * @return the try count. + */ + int getTryCount() { + return tryCount.get(); + } + + /** + * Gets the error received from the broker when the outcome of the last disposition attempt + * (by sending a disposition frame) happened to be 'Rejected'. + * + * @return the error in the disposition ack frame from the broker with 'Rejected' outcome, + * null if no such disposition ack frame received. + */ + Throwable getRejectedOutcomeError() { + return rejectedOutcomeError; + } + + /** + * Check if the work has timed out. + * + * @return {@code true} if the work has timed out, {@code false} otherwise. + */ + boolean hasTimedout() { + return expirationTime != null && expirationTime.isBefore(Instant.now()); + } + + /** + * Gets the {@link Mono} upon subscription starts the work by requesting ProtonJ library to send + * disposition frame to settle a delivery on the broker, and this Mono terminates once the broker + * acknowledges with disposition frame indicating settlement outcome (a.k.a. remote-outcome) + * The Mono can terminate if the configured timeout elapses or cannot initiate the request to + * ProtonJ library. + * + * @return the mono + */ + Mono getMono() { + return mono; + } + + /** + * Sets the {@link Mono}, where the application can obtain cached version of it + * from {@link DispositionWork#getMono()} and subscribe to start the work, the mono terminates + * upon the successful or unsuccessful completion of the work. + * + * @param mono the mono + */ + void setMono(Mono mono) { + // cache() the mono to replay the result when subscribed more than once, avoid multiple + // disposition placement (and enables a possible second subscription to be safe when closing + // the UnsettledDeliveries type). + this.mono = mono.cache(); + } + + /** + * Check if this work is already completed. + * + * @return {@code true} if the work is completed, {@code true} otherwise. + */ + boolean isCompleted() { + return this.get(); + } + + /** + * The function invoked once the application start the work by subscribing to the {@link Mono} + * obtained from {@link DispositionWork#getMono()}. + * + * @param monoSink the {@link MonoSink} to notify the completion of the work, which triggers + * termination of the same {@link Mono} that started the work. + */ + void onStart(MonoSink monoSink) { + this.monoSink = monoSink; + expirationTime = Instant.now().plus(timeout); + } + + /** + * The function invoked when the work is about to be restarted/retried. The broker may return an + * outcome named 'Rejected' if it is unable to attain the desired-outcome that the application + * specified in the disposition frame; in this case, the work is retried based on the configured + * retry settings. + * + * @param error the error that the broker returned upon Reject-ing the last work execution attempting + * the disposition. + */ + void onRetriableRejectedOutcome(Throwable error) { + this.rejectedOutcomeError = error; + expirationTime = Instant.now().plus(timeout); + tryCount.incrementAndGet(); + } + + /** + * the function invoked upon the successful completion of the work. + */ + void onComplete() { + this.set(true); + Objects.requireNonNull(monoSink); + monoSink.success(); + } + + /** + * the function invoked when the work is completed with an error. + * + * @param error the error reason. + */ + void onComplete(Throwable error) { + this.set(true); + Objects.requireNonNull(monoSink); + monoSink.error(error); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java new file mode 100644 index 0000000000000..ebf87fa531d83 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation.handler; + +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.ReactorDispatcher; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.engine.Delivery; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Answer; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; + +import java.io.IOException; +import java.time.Duration; +import java.util.UUID; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; + +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class ReceiverUnsettledDeliveriesIsolatedTest { + private static final UUID DELIVERY_EMPTY_TAG = new UUID(0L, 0L); + private static final String HOSTNAME = "hostname"; + private static final String ENTITY_PATH = "/orders"; + private static final String RECEIVER_LINK_NAME = "orders-link"; + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(20); + private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(3); + private static final Duration VIRTUAL_TIME_SHIFT = OPERATION_TIMEOUT.plusSeconds(30); + private final ClientLogger logger = new ClientLogger(ReceiverUnsettledDeliveriesTest.class); + private final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); + private AutoCloseable mocksCloseable; + @Mock + private ReactorDispatcher reactorDispatcher; + @Mock + private Delivery delivery; + + @BeforeEach + public void setup() throws IOException { + mocksCloseable = MockitoAnnotations.openMocks(this); + retryOptions.setTryTimeout(OPERATION_TIMEOUT); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + @Execution(ExecutionMode.SAME_THREAD) + public void sendDispositionTimeoutOnExpiration() throws Exception { + final UUID deliveryTag = UUID.randomUUID(); + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance()); + try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { + verifier.create(() -> dispositionMono, VIRTUAL_TIME_SHIFT) + .expectErrorSatisfies(error -> { + Assertions.assertTrue(error instanceof AmqpException); + final AmqpException amqpError = (AmqpException) error; + Assertions.assertEquals(AmqpErrorCondition.TIMEOUT_ERROR, amqpError.getErrorCondition()); + }) + .verify(VERIFY_TIMEOUT); + } + } + } + + private ReceiverUnsettledDeliveries createUnsettledDeliveries() { + return new ReceiverUnsettledDeliveries(HOSTNAME, ENTITY_PATH, RECEIVER_LINK_NAME, + reactorDispatcher, retryOptions, DELIVERY_EMPTY_TAG, logger); + } + + private static Answer byRunningRunnable() { + return invocation -> { + final Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }; + } + + private static final class VirtualTimeStepVerifier implements AutoCloseable { + private final VirtualTimeScheduler scheduler; + + VirtualTimeStepVerifier() { + scheduler = VirtualTimeScheduler.create(); + } + + StepVerifier.Step create(Supplier> scenarioSupplier, Duration timeShift) { + return StepVerifier.withVirtualTime(scenarioSupplier, () -> scheduler, 1) + .thenAwait(timeShift); + } + + @Override + public void close() { + scheduler.dispose(); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java new file mode 100644 index 0000000000000..49cb376e15128 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java @@ -0,0 +1,435 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation.handler; + +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.AmqpErrorCode; +import com.azure.core.amqp.implementation.ReactorDispatcher; +import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.transaction.Declared; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.stubbing.Answer; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.RejectedExecutionException; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ReceiverUnsettledDeliveriesTest { + private static final UUID DELIVERY_EMPTY_TAG = new UUID(0L, 0L); + private static final String HOSTNAME = "hostname"; + private static final String ENTITY_PATH = "/orders"; + private static final String RECEIVER_LINK_NAME = "orders-link"; + private static final String DISPOSITION_ERROR_ON_CLOSE = "The receiver didn't receive the disposition " + + "acknowledgment due to receive link closure."; + private final ClientLogger logger = new ClientLogger(ReceiverUnsettledDeliveriesTest.class); + private final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); + private AutoCloseable mocksCloseable; + @Mock + private ReactorDispatcher reactorDispatcher; + @Mock + private Delivery delivery; + + @BeforeEach + public void setup() throws IOException { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + public void tracksOnDelivery() throws IOException { + doNothing().when(reactorDispatcher).invoke(any(Runnable.class)); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + final UUID deliveryTag = UUID.randomUUID(); + deliveries.onDelivery(deliveryTag, delivery); + assertTrue(deliveries.containsDelivery(deliveryTag)); + } + } + + @Test + public void sendDispositionErrorsForUntrackedDelivery() { + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + final UUID deliveryTag = UUID.randomUUID(); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance()); + StepVerifier.create(dispositionMono) + .verifyError(IllegalArgumentException.class); + } + } + + @Test + public void sendDispositionErrorsOnDispatcherIOException() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + + doThrow(new IOException()).when(reactorDispatcher).invoke(any(Runnable.class)); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance()); + StepVerifier.create(dispositionMono) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + Assertions.assertNotNull(error.getCause()); + Assertions.assertInstanceOf(IOException.class, error.getCause()); + }); + } + } + + @Test + public void sendDispositionErrorsOnDispatcherRejectedException() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + + doThrow(new RejectedExecutionException()).when(reactorDispatcher).invoke(any(Runnable.class)); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance()); + StepVerifier.create(dispositionMono) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + Assertions.assertNotNull(error.getCause()); + Assertions.assertInstanceOf(RejectedExecutionException.class, error.getCause()); + }); + } + } + + @Test + public void sendDispositionErrorsIfSameDeliveryDispositionInProgress() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono1 = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + dispositionMono1.subscribe(); + final Mono dispositionMono2 = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono2) + .verifyError(AmqpException.class); + } + } + + @Test + public void sendDispositionCompletesOnSuccessfulOutcome() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = desiredState; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyComplete(); + verify(delivery).disposition(desiredState); + Assertions.assertFalse(deliveries.containsDelivery(deliveryTag)); + } + } + + @Test + public void sendDispositionErrorsOnReleaseOutcome() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = Released.getInstance(); + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + final AmqpException amqpError = (AmqpException) error; + Assertions.assertNotNull(amqpError.getErrorCondition()); + Assertions.assertEquals(AmqpErrorCondition.OPERATION_CANCELLED, amqpError.getErrorCondition()); + }); + verify(delivery).disposition(desiredState); + Assertions.assertFalse(deliveries.containsDelivery(deliveryTag)); + } + } + + @Test + public void sendDispositionErrorsOnUnknownOutcome() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = new Declared(); + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + Assertions.assertEquals(remoteState.toString(), error.getMessage()); + }); + verify(delivery).disposition(desiredState); + Assertions.assertFalse(deliveries.containsDelivery(deliveryTag)); + } + } + + @Test + public void sendDispositionRetriesOnRejectedOutcome() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final Rejected remoteState = new Rejected(); + final ErrorCondition remoteError = new ErrorCondition(AmqpErrorCode.SERVER_BUSY_ERROR, null); + remoteState.setError(remoteError); + final int[] dispositionCallCount = new int[1]; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + doAnswer(__ -> { + if (dispositionCallCount[0] != 0) { + deliveries.onDispositionAck(deliveryTag, delivery); + } + dispositionCallCount[0]++; + return null; + }).when(delivery).disposition(any()); + + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + final AmqpException amqpError = (AmqpException) error; + Assertions.assertEquals(AmqpErrorCondition.SERVER_BUSY_ERROR, amqpError.getErrorCondition()); + }); + Assertions.assertEquals(retryOptions.getMaxRetries() + 1, dispositionCallCount[0]); + } + } + + @Test + public void sendDispositionMonoCacheCompletion() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = desiredState; + final int[] dispositionCallCount = new int[1]; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + doAnswer(invocation -> { + dispositionCallCount[0]++; + return null; + }).when(delivery).disposition(any()); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyComplete(); + for (int i = 0; i < 3; i++) { + StepVerifier.create(dispositionMono).verifyComplete(); + } + Assertions.assertEquals(1, dispositionCallCount[0]); + } + } + + @Test + public void sendDispositionMonoCacheError() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = new Declared(); + final int[] dispositionCallCount = new int[1]; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + doAnswer(invocation -> { + dispositionCallCount[0]++; + return null; + }).when(delivery).disposition(any()); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + final Throwable[] lastError = new Throwable[1]; + StepVerifier.create(dispositionMono) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyErrorSatisfies(error -> lastError[0] = error); + for (int i = 0; i < 3; i++) { + StepVerifier.create(dispositionMono) + .verifyErrorSatisfies(error -> { + Assertions.assertEquals(lastError[0], error, + "Expected replay of the last error object, but received a new error object."); + }); + } + Assertions.assertEquals(1, dispositionCallCount[0]); + } + } + + @Test + public void pendingSendDispositionErrorsOnClose() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = new Declared(); + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries(); + try { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + StepVerifier.create(dispositionMono) + .then(() -> deliveries.close()) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + Assertions.assertEquals(DISPOSITION_ERROR_ON_CLOSE, error.getMessage()); + }); + } finally { + deliveries.close(); + } + } + + @Test + public void preCloseAwaitForSendDispositionCompletion() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = desiredState; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + dispositionMono.subscribe(); + + StepVerifier.create(deliveries.attemptGracefulClose()) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyComplete(); + + StepVerifier.create(dispositionMono).verifyComplete(); + } + } + + @Test + public void closeDoNotWaitForSendDispositionCompletion() throws IOException { + final UUID deliveryTag = UUID.randomUUID(); + final DeliveryState desiredState = Accepted.getInstance(); + final DeliveryState remoteState = desiredState; + + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + when(delivery.getRemoteState()).thenReturn(remoteState); + when(delivery.remotelySettled()).thenReturn(true); + + final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries(); + try { + deliveries.onDelivery(deliveryTag, delivery); + final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); + dispositionMono.subscribe(); + + StepVerifier.create(Mono.fromRunnable(() -> deliveries.close())) + .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) + .verifyComplete(); + + StepVerifier.create(dispositionMono) + .verifyErrorSatisfies(error -> { + Assertions.assertInstanceOf(AmqpException.class, error); + Assertions.assertEquals(DISPOSITION_ERROR_ON_CLOSE, error.getMessage()); + }); + } finally { + deliveries.close(); + } + } + + @Test + public void nopOnDeliveryOnceClosed() { + final UUID deliveryTag = UUID.randomUUID(); + + final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries(); + try { + deliveries.close(); + Assertions.assertFalse(deliveries.onDelivery(deliveryTag, delivery)); + Assertions.assertFalse(deliveries.containsDelivery(deliveryTag)); + } finally { + deliveries.close(); + } + } + + @Test + @Disabled("Enable in once disposition API exposed in ReceiveLinkHandler") + public void settlesUnsettledDeliveriesOnClose() throws IOException { + // See the notes in ReceiverUnsettledDeliveries.close() + // + doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class)); + + final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries(); + try { + deliveries.onDelivery(UUID.randomUUID(), delivery); + deliveries.close(); + verify(delivery).disposition(any()); + verify(delivery).settle(); + } finally { + deliveries.close(); + } + } + + private ReceiverUnsettledDeliveries createUnsettledDeliveries() { + return new ReceiverUnsettledDeliveries(HOSTNAME, ENTITY_PATH, RECEIVER_LINK_NAME, + reactorDispatcher, retryOptions, DELIVERY_EMPTY_TAG, logger); + } + + private static Answer byRunningRunnable() { + return invocation -> { + final Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }; + } +} From 68291f3df4063000d51c856556e9fe4c437ca017 Mon Sep 17 00:00:00 2001 From: Anu Thomas Date: Fri, 17 Feb 2023 12:58:10 -0800 Subject: [PATCH 2/4] Integrating ReceiverUnsettledDeliveries with ServiceBusReactorReceiver --- eng/versioning/version_client.txt | 1 + .../amqp/implementation/ReactorReceiver.java | 13 + .../azure-messaging-eventhubs/pom.xml | 2 +- .../azure-messaging-servicebus/pom.xml | 2 +- .../ServiceBusReactorReceiver.java | 343 +----------------- .../ServiceBusReactorReceiverTest.java | 5 +- 6 files changed, 39 insertions(+), 327 deletions(-) diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 43f63addc6006..e5c8d41fb7138 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -405,6 +405,7 @@ com.azure.tools:azure-sdk-build-tool;1.0.0-beta.1;1.0.0-beta.2 # In the pom, the version update tag after the version should name the unreleased package and the dependency version: # +unreleased_com.azure:azure-core-amqp;2.9.0-beta.1 # Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current # version and set the version to the released beta. Released beta dependencies are only valid diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 229662d4ab0fd..cae487b58bc48 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -352,6 +352,18 @@ protected Mono getIsClosedMono() { return isClosedMono.asMono().publishOn(Schedulers.boundedElastic()); } + protected void onHandlerClose() { + // Note: Given the disposition is a generic AMQP feature of brokers that support receive-link with UNSETTLED + // settlement mode, in near future we will enable delivery disposition API in amqp-core 'ReceiverLinkHandler'. + // Such a future API in 'ReceiverLinkHandler' means the handler will own the 'ReceiverUnsettledDeliveries' + // object, and the closing of the handler (i.e., handler.close()) will close 'ReceiverUnsettledDeliveries'. + // TODO: anuchan: Remove onHandlerClose + // This 'onHandlerClose' method is a temporary internal method for the 'ServiceBusReactorReceiver' to close + // the 'ReceiverUnsettledDeliveries' for the interim while we rollout the full disposition API support in + // amqp-core. The 'onHandlerClose' method will be removed once ownership of the 'ReceiverUnsettledDeliveries' + // is abstracted within 'ReceiverLinkHandler', so 'ServiceBusReactorReceiver' no longer have to own it. + } + /** * Beings the client side close by initiating local-close on underlying receiver. * @@ -459,6 +471,7 @@ private void completeClose() { } handler.close(); + onHandlerClose(); receiver.free(); try { trackPrefetchSeqNoSubscription.close(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index c55416e126439..481f8ebcc6119 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -42,7 +42,7 @@ com.azure azure-core-amqp - 2.8.2 + 2.9.0-beta.1 diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 70783a384ed96..0f5f6e65a80c5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -57,7 +57,7 @@ com.azure azure-core-amqp - 2.8.2 + 2.9.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index 72fd318efa930..6f8b257113c3f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -6,59 +6,40 @@ import com.azure.core.amqp.AmqpConnection; import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryPolicy; -import com.azure.core.amqp.exception.AmqpErrorCondition; -import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.implementation.ExceptionUtil; import com.azure.core.amqp.implementation.ReactorProvider; import com.azure.core.amqp.implementation.ReactorReceiver; import com.azure.core.amqp.implementation.TokenManager; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; +import com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.Outcome; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; -import reactor.core.Disposable; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; import reactor.core.scheduler.Schedulers; -import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.StringJoiner; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY; import static com.azure.core.util.FluxUtil.monoError; import static com.azure.messaging.servicebus.implementation.MessageUtils.LOCK_TOKEN_SIZE; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.DELIVERY_STATE_KEY; -import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.LOCK_TOKEN_KEY; import static com.azure.messaging.servicebus.implementation.ServiceBusReactorSession.LOCKED_UNTIL_UTC; import static com.azure.messaging.servicebus.implementation.ServiceBusReactorSession.SESSION_FILTER; @@ -69,10 +50,8 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic private static final Message EMPTY_MESSAGE = Proton.message(); private final ClientLogger logger; - private final ConcurrentHashMap unsettledDeliveries = new ConcurrentHashMap<>(); - private final ConcurrentHashMap pendingUpdates = new ConcurrentHashMap<>(); + private final ReceiverUnsettledDeliveries receiverUnsettledDeliveries; private final AtomicBoolean isDisposed = new AtomicBoolean(); - private final Disposable subscription; private final Receiver receiver; /** @@ -80,10 +59,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic * ServiceBusReceiveMode#RECEIVE_AND_DELETE} is used. */ private final boolean isSettled; - private final Duration timeout; - private final AmqpRetryPolicy retryPolicy; private final ReceiveLinkHandler handler; - private final ReactorProvider provider; private final Mono sessionIdMono; private final Mono sessionLockedUntil; @@ -94,17 +70,16 @@ public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, R retryPolicy.getRetryOptions()); this.receiver = receiver; this.handler = handler; - this.provider = provider; this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED; - this.timeout = timeout; - this.retryPolicy = retryPolicy; - this.subscription = Flux.interval(timeout).subscribe(i -> cleanupWorkItems()); Map loggingContext = new HashMap<>(2); loggingContext.put(LINK_NAME_KEY, this.handler.getLinkName()); loggingContext.put(ENTITY_PATH_KEY, entityPath); this.logger = new ClientLogger(ServiceBusReactorReceiver.class, loggingContext); + this.receiverUnsettledDeliveries = new ReceiverUnsettledDeliveries(handler.getHostname(), entityPath, handler.getLinkName(), + provider.getReactorDispatcher(), retryPolicy.getRetryOptions(), MessageUtils.ZERO_LOCK_TOKEN, logger); + this.sessionIdMono = getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE) .next() .flatMap(state -> { @@ -142,7 +117,7 @@ public Mono updateDisposition(String lockToken, DeliveryState deliveryStat if (isDisposed.get()) { return monoError(logger, new IllegalStateException("Cannot perform operations on a disposed receiver.")); } - return updateDispositionInternal(lockToken, deliveryState); + return this.receiverUnsettledDeliveries.sendDisposition(lockToken, deliveryState); } @Override @@ -173,37 +148,7 @@ protected Mono closeAsync(String message, ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return super.getIsClosedMono(); } - - cleanupWorkItems(); - - final Mono disposeMono; - if (!pendingUpdates.isEmpty()) { - final List> pending = new ArrayList<>(); - final StringJoiner builder = new StringJoiner(", "); - for (UpdateDispositionWorkItem workItem : pendingUpdates.values()) { - - if (workItem.hasTimedout()) { - continue; - } - - if (workItem.getDeliveryState() instanceof TransactionalState) { - pending.add(updateDispositionInternal(workItem.getLockToken(), Released.getInstance())); - } else { - pending.add(workItem.getMono()); - } - builder.add(workItem.getLockToken()); - } - - logger.info("Waiting for pending updates to complete. Locks: {}", builder.toString()); - disposeMono = Mono.when(pending); - } else { - disposeMono = Mono.empty(); - } - - return disposeMono.onErrorResume(error -> { - logger.info("There was an exception while disposing of all links.", error); - return Mono.empty(); - }).doFinally(signal -> subscription.dispose()).then(super.closeAsync(message, errorCondition)); + return receiverUnsettledDeliveries.attemptGracefulClose().then(super.closeAsync(message, errorCondition)); } @Override @@ -216,282 +161,34 @@ protected Message decodeDelivery(Delivery delivery) { lockToken = MessageUtils.ZERO_LOCK_TOKEN; } - final String lockTokenString = lockToken.toString(); - - // There is no lock token associated with this delivery, or the lock token is not in the unsettledDeliveries. - if (lockToken == MessageUtils.ZERO_LOCK_TOKEN || !unsettledDeliveries.containsKey(lockTokenString)) { + if (receiverUnsettledDeliveries.containsDelivery(lockToken)) { + receiverUnsettledDeliveries.onDispositionAck(lockToken, delivery); + // Return empty update disposition messages. The deliveries themselves are ACKs. There is no actual message + // to propagate. + return EMPTY_MESSAGE; + } else { + // There is no lock token associated with this delivery, or the lock token is not in the receiverUnsettledDeliveries. final int messageSize = delivery.pending(); final byte[] buffer = new byte[messageSize]; final int read = receiver.recv(buffer, 0, messageSize); final Message message = Proton.message(); message.decode(buffer, 0, read); - // The delivery was already settled from the message broker. - // This occurs in the case of receive and delete. if (isSettled) { + // The delivery was already settled from the message broker. This occurs in the case of receive and delete. delivery.disposition(Accepted.getInstance()); delivery.settle(); } else { - unsettledDeliveries.putIfAbsent(lockToken.toString(), delivery); + receiverUnsettledDeliveries.onDelivery(lockToken, delivery); receiver.advance(); } return new MessageWithLockToken(message, lockToken); - } else { - updateOutcome(lockTokenString, delivery); - - // Return empty update disposition messages. The deliveries themselves are ACKs. There is no actual message - // to propagate. - return EMPTY_MESSAGE; - } - } - - private Mono updateDispositionInternal(String lockToken, DeliveryState deliveryState) { - final Delivery unsettled = unsettledDeliveries.get(lockToken); - if (unsettled == null) { - - logger.atWarning() - // TODO: it used to be deliveryTag, is it ok to change? - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .log("Delivery not found to update disposition."); - - return monoError(logger, Exceptions.propagate(new IllegalArgumentException( - "Delivery not on receive link."))); - } - - final UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, timeout); - final Mono result = Mono.create(sink -> { - workItem.start(sink); - try { - provider.getReactorDispatcher().invoke(() -> { - unsettled.disposition(deliveryState); - pendingUpdates.put(lockToken, workItem); - }); - } catch (IOException | RejectedExecutionException error) { - sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", - error, handler.getErrorContext(receiver))); - } - }).cache(); // cache because closeAsync use `when` to subscribe this Mono again. - - workItem.setMono(result); - - return result; - } - - /** - * Updates the outcome of a delivery. This occurs when a message is being settled from the receiver side. - * @param delivery Delivery to update. - */ - private void updateOutcome(String lockToken, Delivery delivery) { - final DeliveryState remoteState = delivery.getRemoteState(); - - logger.atVerbose() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue(DELIVERY_STATE_KEY, remoteState) - .log("Received update disposition delivery."); - - final Outcome remoteOutcome; - if (remoteState instanceof Outcome) { - remoteOutcome = (Outcome) remoteState; - } else if (remoteState instanceof TransactionalState) { - remoteOutcome = ((TransactionalState) remoteState).getOutcome(); - } else { - remoteOutcome = null; - } - - if (remoteOutcome == null) { - logger.atWarning() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue("delivery", delivery) - .log("No outcome associated with delivery."); - - return; - } - - final UpdateDispositionWorkItem workItem = pendingUpdates.get(lockToken); - if (workItem == null) { - logger.atWarning() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue("delivery", delivery) - .log("No pending update for delivery."); - - return; - } - - // If the statuses match, then we settle the delivery and move on. - if (remoteState.getType() == workItem.getDeliveryState().getType()) { - completeWorkItem(lockToken, delivery, workItem.getSink(), null); - return; - } - - logger.atInfo() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue("receivedDeliveryState", remoteState) - .addKeyValue(DELIVERY_STATE_KEY, workItem.getDeliveryState()) - .log("Received delivery state doesn't match expected state."); - - switch (remoteState.getType()) { - case Rejected: - final Rejected rejected = (Rejected) remoteOutcome; - final ErrorCondition errorCondition = rejected.getError(); - final Throwable exception = ExceptionUtil.toException(errorCondition.getCondition().toString(), - errorCondition.getDescription(), handler.getErrorContext(receiver)); - - final Duration retry = retryPolicy.calculateRetryDelay(exception, workItem.incrementRetry()); - if (retry == null) { - logger.atInfo() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue(DELIVERY_STATE_KEY, remoteState) - .log("Retry attempts exhausted.", exception); - - completeWorkItem(lockToken, delivery, workItem.getSink(), exception); - } else { - workItem.setLastException(exception); - workItem.resetStartTime(); - try { - provider.getReactorDispatcher().invoke(() -> delivery.disposition(workItem.getDeliveryState())); - } catch (IOException | RejectedExecutionException error) { - final Throwable amqpException = logger.atError() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .log(new AmqpException(false, - String.format("linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", getLinkName(), lockToken), - error, handler.getErrorContext(receiver))); - - completeWorkItem(lockToken, delivery, workItem.getSink(), amqpException); - } - } - - break; - case Released: - final Throwable cancelled = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED, - "AMQP layer unexpectedly aborted or disconnected.", handler.getErrorContext(receiver)); - - logger.atInfo() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue(DELIVERY_STATE_KEY, remoteState) - .log("Completing pending updateState operation with exception.", cancelled); - - completeWorkItem(lockToken, delivery, workItem.getSink(), cancelled); - break; - default: - final AmqpException error = new AmqpException(false, remoteOutcome.toString(), - handler.getErrorContext(receiver)); - - logger.atInfo() - .addKeyValue(LOCK_TOKEN_KEY, lockToken) - .addKeyValue(DELIVERY_STATE_KEY, remoteState) - .log("Completing pending updateState operation with exception.", error); - - completeWorkItem(lockToken, delivery, workItem.getSink(), error); - break; - } - } - - private void cleanupWorkItems() { - if (pendingUpdates.isEmpty()) { - return; } - - logger.verbose("Cleaning timed out update work tasks."); - pendingUpdates.forEach((key, value) -> { - if (value == null || !value.hasTimedout()) { - return; - } - - pendingUpdates.remove(key); - final Throwable error = value.getLastException() != null - ? value.getLastException() - : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.", - handler.getErrorContext(receiver)); - - completeWorkItem(key, null, value.getSink(), error); - }); } - private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) { - final boolean isSettled = delivery != null && delivery.remotelySettled(); - if (isSettled) { - delivery.settle(); - } - - if (error != null) { - final Throwable loggedError = error instanceof RuntimeException - ? logger.logExceptionAsError((RuntimeException) error) - : error; - sink.error(loggedError); - } else { - sink.success(); - } - - if (isSettled) { - pendingUpdates.remove(lockToken); - unsettledDeliveries.remove(lockToken); - } - } - - private static final class UpdateDispositionWorkItem { - private final String lockToken; - private final DeliveryState state; - private final Duration timeout; - private final AtomicInteger retryAttempts = new AtomicInteger(); - private final AtomicBoolean isDisposed = new AtomicBoolean(); - - private Mono mono; - private Instant expirationTime; - private MonoSink sink; - private Throwable throwable; - - private UpdateDispositionWorkItem(String lockToken, DeliveryState state, Duration timeout) { - this.lockToken = lockToken; - this.state = state; - this.timeout = timeout; - } - - private boolean hasTimedout() { - return expirationTime.isBefore(Instant.now()); - } - - private void resetStartTime() { - this.expirationTime = Instant.now().plus(timeout); - } - - private int incrementRetry() { - return retryAttempts.incrementAndGet(); - } - - private Throwable getLastException() { - return throwable; - } - - private void setLastException(Throwable throwable) { - this.throwable = throwable; - } - - private void setMono(Mono mono) { - this.mono = mono; - } - - private Mono getMono() { - return mono; - } - - private MonoSink getSink() { - return sink; - } - - private void start(MonoSink sink) { - Objects.requireNonNull(sink, "'sink' cannot be null."); - this.sink = sink; - this.sink.onDispose(() -> isDisposed.set(true)); - this.sink.onCancel(() -> isDisposed.set(true)); - resetStartTime(); - } - - private DeliveryState getDeliveryState() { - return state; - } - - public String getLockToken() { - return lockToken; - } + @Override + protected void onHandlerClose() { + // See the code comment in ReactorReceiver.onHandlerClose() [temporary method, tobe removed]. + receiverUnsettledDeliveries.close(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java index bca1c4c635bb2..6e4b8693ae9a5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java @@ -4,7 +4,9 @@ package com.azure.messaging.servicebus.implementation; import com.azure.core.amqp.AmqpConnection; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.FixedAmqpRetryPolicy; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.ReactorDispatcher; import com.azure.core.amqp.implementation.ReactorProvider; @@ -68,8 +70,7 @@ class ServiceBusReactorReceiverTest { private ReactorProvider reactorProvider; @Mock private ReactorDispatcher reactorDispatcher; - @Mock - private AmqpRetryPolicy retryPolicy; + private final AmqpRetryPolicy retryPolicy = new FixedAmqpRetryPolicy(new AmqpRetryOptions()); @Mock private ReceiveLinkHandler receiveLinkHandler; @Mock From 3b44507cebc8880848b93c57d8c6b6959a6ef792 Mon Sep 17 00:00:00 2001 From: Anu Thomas Date: Tue, 21 Feb 2023 14:12:12 -0800 Subject: [PATCH 3/4] better name for api to partial terminate UnsettledDeliveries, address code review feedback --- .../handler/ReceiverUnsettledDeliveries.java | 39 ++++++++++++------- .../ReceiverUnsettledDeliveriesTest.java | 4 +- .../ServiceBusReactorReceiver.java | 5 ++- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java index 9d6e9d07bbe82..b25e401afbb0e 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java @@ -99,7 +99,7 @@ public ReceiverUnsettledDeliveries(String hostName, String entityPath, String re this.timeout = retryOptions.getTryTimeout(); this.deliveryEmptyTag = deliveryEmptyTag; this.logger = logger; - this.timoutTimer = Flux.interval(timeout).subscribe(__ -> completeUncompletedDispositionWorksOnTimeout("timer")); + this.timoutTimer = Flux.interval(timeout).subscribe(__ -> completeDispositionWorksOnTimeout("timer")); } /** @@ -223,10 +223,11 @@ public void onDispositionAck(UUID deliveryTag, Delivery delivery) { } /** - * Attempt any optional graceful (if possible) cleanup and terminate this {@link ReceiverUnsettledDeliveries}. - * The function eagerly times out any expired disposition work and waits for the completion or timeout of - * the remaining disposition. Finally, disposes of the timeout timer. Future attempts to notify deliveries or - * send delivery dispositions will be rejected. + * Terminate this {@link ReceiverUnsettledDeliveries} including expired disposition works, and await to complete + * disposition work in progress, with AmqpRetryOptions_tryTimeout as the upper bound for the wait time. + *

+ * Given this is a terminal API in which the disposition timeout timer will be used last time, termination disposes + * the timer as well. Future attempts to notify unsettled deliveries or send delivery dispositions will be rejected. *

* From the point of view of this function's call site, it is still possible that the receive-link and dispatcher * may healthy, but not guaranteed. If healthy, send-receive of disposition frames are possible, enabling @@ -236,19 +237,24 @@ public void onDispositionAck(UUID deliveryTag, Delivery delivery) { * healthy. On the other hand, if the broker initiates the closing of the link, further frame transfer may not be * possible. * - * @return a {@link Mono} that completes upon the termination. + * @return a {@link Mono} that await to complete disposition work in progress, the wait has an upper bound + * of AmqpRetryOptions_tryTimeout. */ - public Mono attemptGracefulClose() { + public Mono terminateAndAwaitForDispositionsInProgressToComplete() { + // 1. Mark this ReceiverUnsettledDeliveries as terminated, so it no longer accept unsettled deliveries + // or disposition requests isTerminated.getAndSet(true); - // Complete timed out works if any - completeUncompletedDispositionWorksOnTimeout("preClose"); + // 2. then complete timed out works if any + completeDispositionWorksOnTimeout("terminateAndAwaitForDispositionsInProgressToComplete"); - // then wait for the completion of all remaining (not timed out) works. + // 3. then obtain a Mono that wait, with AmqpRetryOptions_tryTimeout as the upper bound for the maximum + // wait, for the completion of any disposition work in progress, which includes committing open transaction + // work. The upper bound for the wait time is imposed through timeoutTimer. final List> workMonoList = new ArrayList<>(); final StringJoiner deliveryTags = new StringJoiner(", "); for (DispositionWork work : pendingDispositions.values()) { - if (work.hasTimedout()) { + if (work == null || work.hasTimedout()) { continue; } if (work.getDesiredState() instanceof TransactionalState) { @@ -271,18 +277,21 @@ public Mono attemptGracefulClose() { } else { workMonoListMerged = Mono.empty(); } - return workMonoListMerged.doFinally(__ -> timoutTimer.dispose()); + final Mono dispositionsWithTimeout = workMonoListMerged; + + // 4. finally, disposes the timeoutTimer after its final use (to timeout disposition works in-progress). + return dispositionsWithTimeout.doFinally(__ -> timoutTimer.dispose()); } /** * Closes this {@link ReceiverUnsettledDeliveries} and force complete any uncompleted work. Future attempts - * to notify deliveries or send delivery dispositions will be rejected. + * to notify unsettled deliveries or send delivery dispositions will be rejected. */ @Override public void close() { isTerminated.getAndSet(true); - // Disposes of subscription to the global interval timer. + // Disposes of timeoutTimer's internal subscription to the global interval timer. timoutTimer.dispose(); // Note: Once disposition API support is enabled in ReceiveLinkHandler - this close() method should have @@ -415,7 +424,7 @@ private void handleReleasedOrUnknownRemoteOutcome(DispositionWork work, Delivery /** * Iterate through all the current {@link DispositionWork} and complete the work those are timed out. */ - private void completeUncompletedDispositionWorksOnTimeout(String callSite) { + private void completeDispositionWorksOnTimeout(String callSite) { if (pendingDispositions.isEmpty()) { return; } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java index 49cb376e15128..f182ffba40822 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java @@ -336,7 +336,7 @@ public void pendingSendDispositionErrorsOnClose() throws IOException { } @Test - public void preCloseAwaitForSendDispositionCompletion() throws IOException { + public void shouldTerminateAndAwaitForDispositionInProgressToComplete() throws IOException { final UUID deliveryTag = UUID.randomUUID(); final DeliveryState desiredState = Accepted.getInstance(); final DeliveryState remoteState = desiredState; @@ -350,7 +350,7 @@ public void preCloseAwaitForSendDispositionCompletion() throws IOException { final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState); dispositionMono.subscribe(); - StepVerifier.create(deliveries.attemptGracefulClose()) + StepVerifier.create(deliveries.terminateAndAwaitForDispositionsInProgressToComplete()) .then(() -> deliveries.onDispositionAck(deliveryTag, delivery)) .verifyComplete(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index 6f8b257113c3f..ef7a7dba81d29 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -148,7 +148,8 @@ protected Mono closeAsync(String message, ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return super.getIsClosedMono(); } - return receiverUnsettledDeliveries.attemptGracefulClose().then(super.closeAsync(message, errorCondition)); + return receiverUnsettledDeliveries.terminateAndAwaitForDispositionsInProgressToComplete() + .then(super.closeAsync(message, errorCondition)); } @Override @@ -188,7 +189,7 @@ protected Message decodeDelivery(Delivery delivery) { @Override protected void onHandlerClose() { - // See the code comment in ReactorReceiver.onHandlerClose() [temporary method, tobe removed]. + // See the code comment in ReactorReceiver.onHandlerClose(), [temporary method, tobe removed.] receiverUnsettledDeliveries.close(); } } From 285a8500d53b70ba68a0a67bcfda112db59adbe7 Mon Sep 17 00:00:00 2001 From: Anu Thomas Date: Tue, 21 Feb 2023 20:00:09 -0800 Subject: [PATCH 4/4] renamed 'completeUncompletedDispositionWorksOnClose' to 'completeDispositionWorksOnClose' --- .../implementation/handler/ReceiverUnsettledDeliveries.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java index b25e401afbb0e..1620afe45de68 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveries.java @@ -301,7 +301,7 @@ public void close() { // by the broker, so we need settle only the deliveries in ReceiverUnsettledDeliveries.deliveries in this close. // Force complete all uncompleted works. - completeUncompletedDispositionWorksOnClose(); + completeDispositionWorksOnClose(); } /** @@ -464,7 +464,7 @@ private void completeDispositionWorksOnTimeout(String callSite) { * Iterate through all the {@link DispositionWork}, and 'force' to complete the uncompleted works because * this {@link ReceiverUnsettledDeliveries} is closed. */ - private void completeUncompletedDispositionWorksOnClose() { + private void completeDispositionWorksOnClose() { // Note: Possible to have one function for cleaning both timeout and incomplete works, but readability // seems to be affected, hence separate functions.