sendDisposition(String deliveryTag, DeliveryState desiredState) {
+ if (isTerminated.get()) {
+ return monoError(logger, new IllegalStateException("Cannot perform sendDisposition on a disposed receiver."));
+ } else {
+ return sendDispositionImpl(deliveryTag, desiredState);
+ }
+ }
+
+ /**
+ * The function to notify the broker's acknowledgment in response to a disposition frame sent to the broker
+ * via {@link ReceiverUnsettledDeliveries#sendDisposition(String, DeliveryState)}.
+ * The broker acknowledgment is also a disposition frame; the ProtonJ library will map this disposition
+ * frame to the same Delivery in-memory object for which the application requested disposition.
+ * As part of mapping, the remote-state (representing remote-outcome) and is-remotely-settled (boolean)
+ * property of the Delivery object is updated from the disposition frame ack.
+ *
+ * @param deliveryTag the unique delivery tag of the delivery that application requested disposition.
+ * @param delivery the delivery object updated from the broker's transfer frame ack.
+ */
+ public void onDispositionAck(UUID deliveryTag, Delivery delivery) {
+ final DeliveryState remoteState = delivery.getRemoteState();
+
+ logger.atVerbose()
+ .addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
+ .addKeyValue(DELIVERY_STATE_KEY, remoteState)
+ .log("Received update disposition delivery.");
+
+ final Outcome remoteOutcome;
+ if (remoteState instanceof Outcome) {
+ remoteOutcome = (Outcome) remoteState;
+ } else if (remoteState instanceof TransactionalState) {
+ remoteOutcome = ((TransactionalState) remoteState).getOutcome();
+ } else {
+ remoteOutcome = null;
+ }
+
+ if (remoteOutcome == null) {
+ logger.atWarning()
+ .addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
+ .addKeyValue("delivery", delivery)
+ .log("No outcome associated with delivery.");
+
+ return;
+ }
+
+ final DispositionWork work = pendingDispositions.get(deliveryTag.toString());
+ if (work == null) {
+ logger.atWarning()
+ .addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
+ .addKeyValue("delivery", delivery)
+ .log("No pending update for delivery.");
+ return;
+ }
+
+ // the outcome that application desired.
+ final DeliveryStateType desiredOutcomeType = work.getDesiredState().getType();
+ // the outcome that broker actually attained.
+ final DeliveryStateType remoteOutcomeType = remoteState.getType();
+
+ if (desiredOutcomeType == remoteOutcomeType) {
+ completeDispositionWorkWithSettle(work, delivery, null);
+ } else {
+ logger.atInfo()
+ .addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
+ .addKeyValue("receivedDeliveryState", remoteState)
+ .addKeyValue(DELIVERY_STATE_KEY, work.getDesiredState())
+ .log("Received delivery state doesn't match expected state.");
+
+ if (remoteOutcomeType == DeliveryStateType.Rejected) {
+ handleRetriableRejectedRemoteOutcome(work, delivery, (Rejected) remoteOutcome);
+ } else {
+ handleReleasedOrUnknownRemoteOutcome(work, delivery, remoteOutcome);
+ }
+ }
+ }
+
+ /**
+ * Terminate this {@link ReceiverUnsettledDeliveries} including expired disposition works, and await to complete
+ * disposition work in progress, with AmqpRetryOptions_tryTimeout as the upper bound for the wait time.
+ *
+ * Given this is a terminal API in which the disposition timeout timer will be used last time, termination disposes
+ * the timer as well. Future attempts to notify unsettled deliveries or send delivery dispositions will be rejected.
+ *
+ * From the point of view of this function's call site, it is still possible that the receive-link and dispatcher
+ * may healthy, but not guaranteed. If healthy, send-receive of disposition frames are possible, enabling
+ * 'graceful' completion of works.
+ *
+ * e.g., if the user proactively initiates the closing of client, it is likely that the receive-link may be
+ * healthy. On the other hand, if the broker initiates the closing of the link, further frame transfer may not be
+ * possible.
+ *
+ * @return a {@link Mono} that await to complete disposition work in progress, the wait has an upper bound
+ * of AmqpRetryOptions_tryTimeout.
+ */
+ public Mono terminateAndAwaitForDispositionsInProgressToComplete() {
+ // 1. Mark this ReceiverUnsettledDeliveries as terminated, so it no longer accept unsettled deliveries
+ // or disposition requests
+ isTerminated.getAndSet(true);
+
+ // 2. then complete timed out works if any
+ completeDispositionWorksOnTimeout("terminateAndAwaitForDispositionsInProgressToComplete");
+
+ // 3. then obtain a Mono that wait, with AmqpRetryOptions_tryTimeout as the upper bound for the maximum
+ // wait, for the completion of any disposition work in progress, which includes committing open transaction
+ // work. The upper bound for the wait time is imposed through timeoutTimer.
+ final List> workMonoList = new ArrayList<>();
+ final StringJoiner deliveryTags = new StringJoiner(", ");
+ for (DispositionWork work : pendingDispositions.values()) {
+ if (work == null || work.hasTimedout()) {
+ continue;
+ }
+ if (work.getDesiredState() instanceof TransactionalState) {
+ final Mono workMono = sendDispositionImpl(work.getDeliveryTag(), Released.getInstance());
+ workMonoList.add(workMono);
+ } else {
+ workMonoList.add(work.getMono());
+ }
+ deliveryTags.add(work.getDeliveryTag());
+ }
+
+ final Mono workMonoListMerged;
+ if (!workMonoList.isEmpty()) {
+ logger.info("Waiting for pending updates to complete. Locks: {}", deliveryTags.toString());
+ workMonoListMerged = Mono.whenDelayError(workMonoList)
+ .onErrorResume(error -> {
+ logger.info("There was exception(s) while disposing of all disposition work.", error);
+ return Mono.empty();
+ });
+ } else {
+ workMonoListMerged = Mono.empty();
+ }
+ final Mono dispositionsWithTimeout = workMonoListMerged;
+
+ // 4. finally, disposes the timeoutTimer after its final use (to timeout disposition works in-progress).
+ return dispositionsWithTimeout.doFinally(__ -> timoutTimer.dispose());
+ }
+
+ /**
+ * Closes this {@link ReceiverUnsettledDeliveries} and force complete any uncompleted work. Future attempts
+ * to notify unsettled deliveries or send delivery dispositions will be rejected.
+ */
+ @Override
+ public void close() {
+ isTerminated.getAndSet(true);
+
+ // Disposes of timeoutTimer's internal subscription to the global interval timer.
+ timoutTimer.dispose();
+
+ // Note: Once disposition API support is enabled in ReceiveLinkHandler - this close() method should have
+ // logic to free the tracked QPID deliveries. The ReceiveLinkHandler will no longer track "ALL" QPID
+ // deliveries (using 'queuedDeliveries' set), because, the plan is, upon arrival of any delivery in
+ // reactor-thread, we will be draining the delivery buffer and settle those deliveries already settled
+ // by the broker, so we need settle only the deliveries in ReceiverUnsettledDeliveries.deliveries in this close.
+
+ // Force complete all uncompleted works.
+ completeDispositionWorksOnClose();
+ }
+
+ /**
+ * See the doc for {@link ReceiverUnsettledDeliveries#sendDisposition(String, DeliveryState)}.
+ *
+ * @param deliveryTag the unique delivery tag identifying the delivery.
+ * @param desiredState The state to include in the disposition frame indicating the desired-outcome
+ * that the application wish to occur at the broker.
+ * @return the {@link Mono} representing disposition work.
+ */
+ private Mono sendDispositionImpl(String deliveryTag, DeliveryState desiredState) {
+ final Delivery delivery = deliveries.get(deliveryTag);
+ if (delivery == null) {
+ logger.atWarning()
+ .addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
+ .log("Delivery not found to update disposition.");
+
+ return monoError(logger, Exceptions.propagate(new IllegalArgumentException(
+ "Delivery not on receive link.")));
+ }
+
+ final DispositionWork work = new DispositionWork(deliveryTag, desiredState, timeout);
+
+ final Mono mono = Mono.create(sink -> {
+ work.onStart(sink);
+ try {
+ dispatcher.invoke(() -> {
+ delivery.disposition(desiredState);
+ if (pendingDispositions.putIfAbsent(deliveryTag, work) != null) {
+ work.onComplete(new AmqpException(false,
+ "A disposition requested earlier is waiting for the broker's ack; "
+ + "a new disposition request is not allowed.",
+ null));
+ }
+ });
+ } catch (IOException | RejectedExecutionException dispatchError) {
+ work.onComplete(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
+ dispatchError, getErrorContext(delivery)));
+ }
+ });
+
+ work.setMono(mono);
+
+ return work.getMono();
+ }
+
+ /**
+ * Handles the 'Rejected' outcome (in a disposition ack) from the broker in-response to a disposition frame
+ * application sent.
+ *
+ * @param work the work that sent the disposition frame with a desired-outcome which broker 'Rejected'.
+ * @param delivery the Delivery in-memory object for which the application had sent the disposition frame;
+ * the ProtonJ library updates the remote-state (representing remote-outcome) and
+ * is-remotely-settled (boolean) property of the Delivery object from the disposition frame ack.
+ * @param remoteOutcome the 'Rejected' remote-outcome describing the rejection reason, this is derived from
+ * the remote-state.
+ */
+ private void handleRetriableRejectedRemoteOutcome(DispositionWork work, Delivery delivery, Rejected remoteOutcome) {
+ final AmqpErrorContext amqpErrorContext = getErrorContext(delivery);
+ final ErrorCondition errorCondition = remoteOutcome.getError();
+ final Throwable error = ExceptionUtil.toException(errorCondition.getCondition().toString(),
+ errorCondition.getDescription(), amqpErrorContext);
+
+ final Duration retry = retryPolicy.calculateRetryDelay(error, work.getTryCount());
+ if (retry != null) {
+ work.onRetriableRejectedOutcome(error);
+ try {
+ dispatcher.invoke(() -> delivery.disposition(work.getDesiredState()));
+ } catch (IOException | RejectedExecutionException dispatchError) {
+ final Throwable amqpException = logger.atError()
+ .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag())
+ .log(new AmqpException(false,
+ String.format("linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.",
+ receiveLinkName, work.getDeliveryTag()),
+ dispatchError, getErrorContext(delivery)));
+
+ completeDispositionWorkWithSettle(work, delivery, amqpException);
+ }
+ } else {
+ logger.atInfo()
+ .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag())
+ .addKeyValue(DELIVERY_STATE_KEY, delivery.getRemoteState())
+ .log("Retry attempts exhausted.", error);
+
+ completeDispositionWorkWithSettle(work, delivery, error);
+ }
+ }
+
+ /**
+ * Handles the 'Released' or unknown outcome (in a disposition ack) from the broker in-response to a disposition
+ * frame application sent.
+ *
+ * @param work the work that sent the disposition frame with a desired-outcome.
+ * @param delivery the Delivery in-memory object for which the application had sent the disposition frame;
+ * the ProtonJ library updates the remote-state (representing remote-outcome) and
+ * is-remotely-settled (boolean) property of the Delivery object from the disposition frame ack.
+ * @param remoteOutcome the remote-outcome from the broker describing the reason for broker choosing an outcome
+ * different from requested desired-outcome, this is derived from the remote-state.
+ */
+ private void handleReleasedOrUnknownRemoteOutcome(DispositionWork work, Delivery delivery, Outcome remoteOutcome) {
+ final AmqpErrorContext amqpErrorContext = getErrorContext(delivery);
+ final AmqpException completionError;
+
+ final DeliveryStateType remoteOutcomeType = delivery.getRemoteState().getType();
+ if (remoteOutcomeType == DeliveryStateType.Released) {
+ completionError = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED,
+ "AMQP layer unexpectedly aborted or disconnected.", amqpErrorContext);
+ } else {
+ completionError = new AmqpException(false, remoteOutcome.toString(), amqpErrorContext);
+ }
+
+ logger.atInfo()
+ .addKeyValue(DELIVERY_TAG_KEY, work.getDeliveryTag())
+ .addKeyValue(DELIVERY_STATE_KEY, delivery.getRemoteState())
+ .log("Completing pending updateState operation with exception.", completionError);
+
+ completeDispositionWorkWithSettle(work, delivery, completionError);
+ }
+
+ /**
+ * Iterate through all the current {@link DispositionWork} and complete the work those are timed out.
+ */
+ private void completeDispositionWorksOnTimeout(String callSite) {
+ if (pendingDispositions.isEmpty()) {
+ return;
+ }
+
+ final int[] completionCount = new int[1];
+ final StringJoiner deliveryTags = new StringJoiner(", ");
+
+ pendingDispositions.forEach((deliveryTag, work) -> {
+ if (work == null || !work.hasTimedout()) {
+ return;
+ }
+
+ if (completionCount[0] == 0) {
+ logger.info("Starting completion of timed out disposition works (call site:{}).", callSite);
+ }
+
+ final Throwable completionError;
+ if (work.getRejectedOutcomeError() != null) {
+ completionError = work.getRejectedOutcomeError();
+ } else {
+ completionError = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR,
+ "Update disposition request timed out.", getErrorContext(deliveries.get(work.getDeliveryTag())));
+ }
+ deliveryTags.add(work.getDeliveryTag());
+ completeDispositionWork(work, completionError);
+ completionCount[0]++;
+ });
+
+ if (completionCount[0] > 0) {
+ // The log help debug if the user code chained to the work-mono (DispositionWork::getMono()) never returns.
+ logger.info("Completed {} timed-out disposition works (call site:{}). Locks {}",
+ callSite, completionCount[0], deliveryTags.toString());
+ }
+ }
+
+ /**
+ * Iterate through all the {@link DispositionWork}, and 'force' to complete the uncompleted works because
+ * this {@link ReceiverUnsettledDeliveries} is closed.
+ */
+ private void completeDispositionWorksOnClose() {
+ // Note: Possible to have one function for cleaning both timeout and incomplete works, but readability
+ // seems to be affected, hence separate functions.
+
+ if (pendingDispositions.isEmpty()) {
+ return;
+ }
+
+ final int[] completionCount = new int[1];
+ final StringJoiner deliveryTags = new StringJoiner(", ");
+
+ final AmqpException completionError = new AmqpException(false,
+ "The receiver didn't receive the disposition acknowledgment due to receive link closure.", null);
+
+ pendingDispositions.forEach((deliveryTag, work) -> {
+ if (work == null || work.isCompleted()) {
+ return;
+ }
+
+ if (completionCount[0] == 0) {
+ logger.info("Starting completion of disposition works as part of receive link closure.");
+ }
+
+ deliveryTags.add(work.getDeliveryTag());
+ completeDispositionWork(work, completionError);
+ completionCount[0]++;
+ });
+
+ if (completionCount[0] > 0) {
+ // The log help debug if the user code chained to the work-mono (DispositionWork::getMono()) never returns.
+ logger.info("Completed {} disposition works as part of receive link closure. Locks {}",
+ completionCount[0], deliveryTags.toString());
+ }
+ }
+
+ /**
+ * Completes the given {@link DispositionWork}, which results in termination of the {@link Mono} returned
+ * from the {@link DispositionWork#getMono()} API. If the broker settled the {@link Delivery} associated
+ * with the work, it would also be locally settled.
+ *
+ * Invocations of this function are guaranteed to be serial, as all call sites originate from
+ * {@link ReceiverUnsettledDeliveries#onDispositionAck(UUID, Delivery)} running on the ProtonJ Reactor event-loop thread.
+ *
+ * @param work the work to complete.
+ * @param delivery the delivery that the work attempted the disposition, to be locally settled if the broker
+ * settled it on the remote end.
+ * @param completionError a null value indicates that the work has to complete successfully, otherwise complete
+ * the work with the error value.
+ */
+ private void completeDispositionWorkWithSettle(DispositionWork work, Delivery delivery, Throwable completionError) {
+ // The operation ordering same as the T1 Lib: "delivery-settling -> work-completion -> work-delivery-removal".
+
+ final boolean isRemotelySettled = delivery.remotelySettled();
+ if (isRemotelySettled) {
+ delivery.settle();
+ }
+
+ if (completionError != null) {
+ final Throwable loggedError = completionError instanceof RuntimeException
+ ? logger.logExceptionAsError((RuntimeException) completionError)
+ : completionError;
+ work.onComplete(loggedError);
+ } else {
+ work.onComplete();
+ }
+
+ if (isRemotelySettled) {
+ final String deliveryTag = work.getDeliveryTag();
+ pendingDispositions.remove(deliveryTag);
+ deliveries.remove(deliveryTag);
+ }
+ }
+
+ /**
+ * Completes the given {@link DispositionWork} with error, which results in termination of the {@link Mono}
+ * returned from the {@link DispositionWork#getMono()} API.
+ *
+ * @param work the work to complete with error.
+ * @param completionError the non-null error value.
+ */
+ private void completeDispositionWork(DispositionWork work, Throwable completionError) {
+ // The operation ordering same as the T1 Lib: "work-removal -> work-completion".
+
+ pendingDispositions.remove(work.getDeliveryTag());
+
+ final Throwable loggedError = completionError instanceof RuntimeException
+ ? logger.logExceptionAsError((RuntimeException) completionError)
+ : completionError;
+ work.onComplete(loggedError);
+ }
+
+ /**
+ * Gets the error context from the receive-link associated with the delivery.
+ *
+ * @param delivery the delivery.
+ * @return the error context from delivery's receive-link, {@code null} if the delivery or
+ * receive-link is {@code null}.
+ */
+ private AmqpErrorContext getErrorContext(Delivery delivery) {
+ if (delivery == null || delivery.getLink() == null) {
+ return null;
+ }
+ return LinkHandler.getErrorContext(hostName, entityPath, delivery.getLink());
+ }
+
+ /**
+ * Represents a work that, upon starting, requests ProtonJ library to send a disposition frame to settle
+ * a delivery on the broker and the work completes when the broker acknowledges with a disposition frame
+ * indicating the outcome. The work can complete with an error if it cannot initiate the request
+ * to the ProtonJ library or the configured timeout elapses.
+ *
+ * The work is started once the application is subscribed to the {@link Mono} returned by
+ * {@link DispositionWork#getMono()}; the Mono is terminated upon the work completion.
+ */
+ private static final class DispositionWork extends AtomicBoolean {
+ private final AtomicInteger tryCount = new AtomicInteger(1);
+ private final String deliveryTag;
+ private final DeliveryState desiredState;
+ private final Duration timeout;
+ private Mono mono;
+ private MonoSink monoSink;
+ private Instant expirationTime;
+ private Throwable rejectedOutcomeError;
+
+ /**
+ * Create a DispositionWork.
+ *
+ * @param deliveryTag The delivery tag of the Delivery for which to send the disposition frame requesting
+ * delivery settlement on the broker.
+ * @param desiredState The state to include in the disposition frame indicating the desired-outcome
+ * the application wish to occur at the broker.
+ * @param timeout after requesting the ProtonJ library to send the disposition frame, how long to wait for
+ * an acknowledgment disposition frame to arrive from the broker.
+ */
+ DispositionWork(String deliveryTag, DeliveryState desiredState, Duration timeout) {
+ this.deliveryTag = deliveryTag;
+ this.desiredState = desiredState;
+ this.timeout = timeout;
+ this.monoSink = null;
+ }
+
+ /**
+ * Gets the delivery tag.
+ *
+ * @return the delivery tag.
+ */
+ String getDeliveryTag() {
+ return deliveryTag;
+ }
+
+ /**
+ * Gets the state indicating the desired-outcome which the application wishes to occur at the broker.
+ * The disposition frame send to the broker includes this desired state.
+ *
+ * @return the desired state.
+ */
+ DeliveryState getDesiredState() {
+ return desiredState;
+ }
+
+ /**
+ * Gets the number of times the work was tried.
+ *
+ * @return the try count.
+ */
+ int getTryCount() {
+ return tryCount.get();
+ }
+
+ /**
+ * Gets the error received from the broker when the outcome of the last disposition attempt
+ * (by sending a disposition frame) happened to be 'Rejected'.
+ *
+ * @return the error in the disposition ack frame from the broker with 'Rejected' outcome,
+ * null if no such disposition ack frame received.
+ */
+ Throwable getRejectedOutcomeError() {
+ return rejectedOutcomeError;
+ }
+
+ /**
+ * Check if the work has timed out.
+ *
+ * @return {@code true} if the work has timed out, {@code false} otherwise.
+ */
+ boolean hasTimedout() {
+ return expirationTime != null && expirationTime.isBefore(Instant.now());
+ }
+
+ /**
+ * Gets the {@link Mono} upon subscription starts the work by requesting ProtonJ library to send
+ * disposition frame to settle a delivery on the broker, and this Mono terminates once the broker
+ * acknowledges with disposition frame indicating settlement outcome (a.k.a. remote-outcome)
+ * The Mono can terminate if the configured timeout elapses or cannot initiate the request to
+ * ProtonJ library.
+ *
+ * @return the mono
+ */
+ Mono getMono() {
+ return mono;
+ }
+
+ /**
+ * Sets the {@link Mono}, where the application can obtain cached version of it
+ * from {@link DispositionWork#getMono()} and subscribe to start the work, the mono terminates
+ * upon the successful or unsuccessful completion of the work.
+ *
+ * @param mono the mono
+ */
+ void setMono(Mono mono) {
+ // cache() the mono to replay the result when subscribed more than once, avoid multiple
+ // disposition placement (and enables a possible second subscription to be safe when closing
+ // the UnsettledDeliveries type).
+ this.mono = mono.cache();
+ }
+
+ /**
+ * Check if this work is already completed.
+ *
+ * @return {@code true} if the work is completed, {@code true} otherwise.
+ */
+ boolean isCompleted() {
+ return this.get();
+ }
+
+ /**
+ * The function invoked once the application start the work by subscribing to the {@link Mono}
+ * obtained from {@link DispositionWork#getMono()}.
+ *
+ * @param monoSink the {@link MonoSink} to notify the completion of the work, which triggers
+ * termination of the same {@link Mono} that started the work.
+ */
+ void onStart(MonoSink monoSink) {
+ this.monoSink = monoSink;
+ expirationTime = Instant.now().plus(timeout);
+ }
+
+ /**
+ * The function invoked when the work is about to be restarted/retried. The broker may return an
+ * outcome named 'Rejected' if it is unable to attain the desired-outcome that the application
+ * specified in the disposition frame; in this case, the work is retried based on the configured
+ * retry settings.
+ *
+ * @param error the error that the broker returned upon Reject-ing the last work execution attempting
+ * the disposition.
+ */
+ void onRetriableRejectedOutcome(Throwable error) {
+ this.rejectedOutcomeError = error;
+ expirationTime = Instant.now().plus(timeout);
+ tryCount.incrementAndGet();
+ }
+
+ /**
+ * the function invoked upon the successful completion of the work.
+ */
+ void onComplete() {
+ this.set(true);
+ Objects.requireNonNull(monoSink);
+ monoSink.success();
+ }
+
+ /**
+ * the function invoked when the work is completed with an error.
+ *
+ * @param error the error reason.
+ */
+ void onComplete(Throwable error) {
+ this.set(true);
+ Objects.requireNonNull(monoSink);
+ monoSink.error(error);
+ }
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java
new file mode 100644
index 000000000000..ebf87fa531d8
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesIsolatedTest.java
@@ -0,0 +1,121 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.implementation.handler;
+
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.amqp.implementation.ReactorDispatcher;
+import com.azure.core.util.logging.ClientLogger;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.engine.Delivery;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.jupiter.api.parallel.Isolated;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import reactor.test.scheduler.VirtualTimeScheduler;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+@Execution(ExecutionMode.SAME_THREAD)
+@Isolated
+public class ReceiverUnsettledDeliveriesIsolatedTest {
+ private static final UUID DELIVERY_EMPTY_TAG = new UUID(0L, 0L);
+ private static final String HOSTNAME = "hostname";
+ private static final String ENTITY_PATH = "/orders";
+ private static final String RECEIVER_LINK_NAME = "orders-link";
+ private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(20);
+ private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(3);
+ private static final Duration VIRTUAL_TIME_SHIFT = OPERATION_TIMEOUT.plusSeconds(30);
+ private final ClientLogger logger = new ClientLogger(ReceiverUnsettledDeliveriesTest.class);
+ private final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
+ private AutoCloseable mocksCloseable;
+ @Mock
+ private ReactorDispatcher reactorDispatcher;
+ @Mock
+ private Delivery delivery;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ mocksCloseable = MockitoAnnotations.openMocks(this);
+ retryOptions.setTryTimeout(OPERATION_TIMEOUT);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ Mockito.framework().clearInlineMock(this);
+
+ if (mocksCloseable != null) {
+ mocksCloseable.close();
+ }
+ }
+
+ @Test
+ @Execution(ExecutionMode.SAME_THREAD)
+ public void sendDispositionTimeoutOnExpiration() throws Exception {
+ final UUID deliveryTag = UUID.randomUUID();
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance());
+ try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
+ verifier.create(() -> dispositionMono, VIRTUAL_TIME_SHIFT)
+ .expectErrorSatisfies(error -> {
+ Assertions.assertTrue(error instanceof AmqpException);
+ final AmqpException amqpError = (AmqpException) error;
+ Assertions.assertEquals(AmqpErrorCondition.TIMEOUT_ERROR, amqpError.getErrorCondition());
+ })
+ .verify(VERIFY_TIMEOUT);
+ }
+ }
+ }
+
+ private ReceiverUnsettledDeliveries createUnsettledDeliveries() {
+ return new ReceiverUnsettledDeliveries(HOSTNAME, ENTITY_PATH, RECEIVER_LINK_NAME,
+ reactorDispatcher, retryOptions, DELIVERY_EMPTY_TAG, logger);
+ }
+
+ private static Answer byRunningRunnable() {
+ return invocation -> {
+ final Runnable runnable = invocation.getArgument(0);
+ runnable.run();
+ return null;
+ };
+ }
+
+ private static final class VirtualTimeStepVerifier implements AutoCloseable {
+ private final VirtualTimeScheduler scheduler;
+
+ VirtualTimeStepVerifier() {
+ scheduler = VirtualTimeScheduler.create();
+ }
+
+ StepVerifier.Step create(Supplier> scenarioSupplier, Duration timeShift) {
+ return StepVerifier.withVirtualTime(scenarioSupplier, () -> scheduler, 1)
+ .thenAwait(timeShift);
+ }
+
+ @Override
+ public void close() {
+ scheduler.dispose();
+ }
+ }
+}
diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java
new file mode 100644
index 000000000000..f182ffba4082
--- /dev/null
+++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiverUnsettledDeliveriesTest.java
@@ -0,0 +1,435 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.amqp.implementation.handler;
+
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.amqp.implementation.AmqpErrorCode;
+import com.azure.core.amqp.implementation.ReactorDispatcher;
+import com.azure.core.util.logging.ClientLogger;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.RejectedExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReceiverUnsettledDeliveriesTest {
+ private static final UUID DELIVERY_EMPTY_TAG = new UUID(0L, 0L);
+ private static final String HOSTNAME = "hostname";
+ private static final String ENTITY_PATH = "/orders";
+ private static final String RECEIVER_LINK_NAME = "orders-link";
+ private static final String DISPOSITION_ERROR_ON_CLOSE = "The receiver didn't receive the disposition "
+ + "acknowledgment due to receive link closure.";
+ private final ClientLogger logger = new ClientLogger(ReceiverUnsettledDeliveriesTest.class);
+ private final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
+ private AutoCloseable mocksCloseable;
+ @Mock
+ private ReactorDispatcher reactorDispatcher;
+ @Mock
+ private Delivery delivery;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ mocksCloseable = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ Mockito.framework().clearInlineMock(this);
+
+ if (mocksCloseable != null) {
+ mocksCloseable.close();
+ }
+ }
+
+ @Test
+ public void tracksOnDelivery() throws IOException {
+ doNothing().when(reactorDispatcher).invoke(any(Runnable.class));
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ final UUID deliveryTag = UUID.randomUUID();
+ deliveries.onDelivery(deliveryTag, delivery);
+ assertTrue(deliveries.containsDelivery(deliveryTag));
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsForUntrackedDelivery() {
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ final UUID deliveryTag = UUID.randomUUID();
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance());
+ StepVerifier.create(dispositionMono)
+ .verifyError(IllegalArgumentException.class);
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsOnDispatcherIOException() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+
+ doThrow(new IOException()).when(reactorDispatcher).invoke(any(Runnable.class));
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance());
+ StepVerifier.create(dispositionMono)
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ Assertions.assertNotNull(error.getCause());
+ Assertions.assertInstanceOf(IOException.class, error.getCause());
+ });
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsOnDispatcherRejectedException() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+
+ doThrow(new RejectedExecutionException()).when(reactorDispatcher).invoke(any(Runnable.class));
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance());
+ StepVerifier.create(dispositionMono)
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ Assertions.assertNotNull(error.getCause());
+ Assertions.assertInstanceOf(RejectedExecutionException.class, error.getCause());
+ });
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsIfSameDeliveryDispositionInProgress() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono1 = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ dispositionMono1.subscribe();
+ final Mono dispositionMono2 = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono2)
+ .verifyError(AmqpException.class);
+ }
+ }
+
+ @Test
+ public void sendDispositionCompletesOnSuccessfulOutcome() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = desiredState;
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyComplete();
+ verify(delivery).disposition(desiredState);
+ Assertions.assertFalse(deliveries.containsDelivery(deliveryTag));
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsOnReleaseOutcome() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = Released.getInstance();
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ final AmqpException amqpError = (AmqpException) error;
+ Assertions.assertNotNull(amqpError.getErrorCondition());
+ Assertions.assertEquals(AmqpErrorCondition.OPERATION_CANCELLED, amqpError.getErrorCondition());
+ });
+ verify(delivery).disposition(desiredState);
+ Assertions.assertFalse(deliveries.containsDelivery(deliveryTag));
+ }
+ }
+
+ @Test
+ public void sendDispositionErrorsOnUnknownOutcome() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = new Declared();
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ Assertions.assertEquals(remoteState.toString(), error.getMessage());
+ });
+ verify(delivery).disposition(desiredState);
+ Assertions.assertFalse(deliveries.containsDelivery(deliveryTag));
+ }
+ }
+
+ @Test
+ public void sendDispositionRetriesOnRejectedOutcome() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final Rejected remoteState = new Rejected();
+ final ErrorCondition remoteError = new ErrorCondition(AmqpErrorCode.SERVER_BUSY_ERROR, null);
+ remoteState.setError(remoteError);
+ final int[] dispositionCallCount = new int[1];
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ doAnswer(__ -> {
+ if (dispositionCallCount[0] != 0) {
+ deliveries.onDispositionAck(deliveryTag, delivery);
+ }
+ dispositionCallCount[0]++;
+ return null;
+ }).when(delivery).disposition(any());
+
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ final AmqpException amqpError = (AmqpException) error;
+ Assertions.assertEquals(AmqpErrorCondition.SERVER_BUSY_ERROR, amqpError.getErrorCondition());
+ });
+ Assertions.assertEquals(retryOptions.getMaxRetries() + 1, dispositionCallCount[0]);
+ }
+ }
+
+ @Test
+ public void sendDispositionMonoCacheCompletion() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = desiredState;
+ final int[] dispositionCallCount = new int[1];
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+ doAnswer(invocation -> {
+ dispositionCallCount[0]++;
+ return null;
+ }).when(delivery).disposition(any());
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyComplete();
+ for (int i = 0; i < 3; i++) {
+ StepVerifier.create(dispositionMono).verifyComplete();
+ }
+ Assertions.assertEquals(1, dispositionCallCount[0]);
+ }
+ }
+
+ @Test
+ public void sendDispositionMonoCacheError() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = new Declared();
+ final int[] dispositionCallCount = new int[1];
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+ doAnswer(invocation -> {
+ dispositionCallCount[0]++;
+ return null;
+ }).when(delivery).disposition(any());
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ final Throwable[] lastError = new Throwable[1];
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyErrorSatisfies(error -> lastError[0] = error);
+ for (int i = 0; i < 3; i++) {
+ StepVerifier.create(dispositionMono)
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertEquals(lastError[0], error,
+ "Expected replay of the last error object, but received a new error object.");
+ });
+ }
+ Assertions.assertEquals(1, dispositionCallCount[0]);
+ }
+ }
+
+ @Test
+ public void pendingSendDispositionErrorsOnClose() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = new Declared();
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries();
+ try {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ StepVerifier.create(dispositionMono)
+ .then(() -> deliveries.close())
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ Assertions.assertEquals(DISPOSITION_ERROR_ON_CLOSE, error.getMessage());
+ });
+ } finally {
+ deliveries.close();
+ }
+ }
+
+ @Test
+ public void shouldTerminateAndAwaitForDispositionInProgressToComplete() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = desiredState;
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ dispositionMono.subscribe();
+
+ StepVerifier.create(deliveries.terminateAndAwaitForDispositionsInProgressToComplete())
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyComplete();
+
+ StepVerifier.create(dispositionMono).verifyComplete();
+ }
+ }
+
+ @Test
+ public void closeDoNotWaitForSendDispositionCompletion() throws IOException {
+ final UUID deliveryTag = UUID.randomUUID();
+ final DeliveryState desiredState = Accepted.getInstance();
+ final DeliveryState remoteState = desiredState;
+
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+ when(delivery.getRemoteState()).thenReturn(remoteState);
+ when(delivery.remotelySettled()).thenReturn(true);
+
+ final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries();
+ try {
+ deliveries.onDelivery(deliveryTag, delivery);
+ final Mono dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), desiredState);
+ dispositionMono.subscribe();
+
+ StepVerifier.create(Mono.fromRunnable(() -> deliveries.close()))
+ .then(() -> deliveries.onDispositionAck(deliveryTag, delivery))
+ .verifyComplete();
+
+ StepVerifier.create(dispositionMono)
+ .verifyErrorSatisfies(error -> {
+ Assertions.assertInstanceOf(AmqpException.class, error);
+ Assertions.assertEquals(DISPOSITION_ERROR_ON_CLOSE, error.getMessage());
+ });
+ } finally {
+ deliveries.close();
+ }
+ }
+
+ @Test
+ public void nopOnDeliveryOnceClosed() {
+ final UUID deliveryTag = UUID.randomUUID();
+
+ final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries();
+ try {
+ deliveries.close();
+ Assertions.assertFalse(deliveries.onDelivery(deliveryTag, delivery));
+ Assertions.assertFalse(deliveries.containsDelivery(deliveryTag));
+ } finally {
+ deliveries.close();
+ }
+ }
+
+ @Test
+ @Disabled("Enable in once disposition API exposed in ReceiveLinkHandler")
+ public void settlesUnsettledDeliveriesOnClose() throws IOException {
+ // See the notes in ReceiverUnsettledDeliveries.close()
+ //
+ doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));
+
+ final ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries();
+ try {
+ deliveries.onDelivery(UUID.randomUUID(), delivery);
+ deliveries.close();
+ verify(delivery).disposition(any());
+ verify(delivery).settle();
+ } finally {
+ deliveries.close();
+ }
+ }
+
+ private ReceiverUnsettledDeliveries createUnsettledDeliveries() {
+ return new ReceiverUnsettledDeliveries(HOSTNAME, ENTITY_PATH, RECEIVER_LINK_NAME,
+ reactorDispatcher, retryOptions, DELIVERY_EMPTY_TAG, logger);
+ }
+
+ private static Answer byRunningRunnable() {
+ return invocation -> {
+ final Runnable runnable = invocation.getArgument(0);
+ runnable.run();
+ return null;
+ };
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
index c55416e12643..481f8ebcc611 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
+++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml
@@ -42,7 +42,7 @@
com.azure
azure-core-amqp
- 2.8.2
+ 2.9.0-beta.1
diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml
index 268b47cdd862..77a81f171574 100644
--- a/sdk/servicebus/azure-messaging-servicebus/pom.xml
+++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml
@@ -57,7 +57,7 @@
com.azure
azure-core-amqp
- 2.8.2
+ 2.9.0-beta.1
com.azure
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java
index 72fd318efa93..ef7a7dba81d2 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java
@@ -6,59 +6,40 @@
import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
-import com.azure.core.amqp.exception.AmqpErrorCondition;
-import com.azure.core.amqp.exception.AmqpException;
-import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
+import com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
-import reactor.core.Disposable;
-import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
-import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.StringJoiner;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.LINK_NAME_KEY;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.MessageUtils.LOCK_TOKEN_SIZE;
-import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.DELIVERY_STATE_KEY;
-import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.LOCK_TOKEN_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusReactorSession.LOCKED_UNTIL_UTC;
import static com.azure.messaging.servicebus.implementation.ServiceBusReactorSession.SESSION_FILTER;
@@ -69,10 +50,8 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic
private static final Message EMPTY_MESSAGE = Proton.message();
private final ClientLogger logger;
- private final ConcurrentHashMap unsettledDeliveries = new ConcurrentHashMap<>();
- private final ConcurrentHashMap pendingUpdates = new ConcurrentHashMap<>();
+ private final ReceiverUnsettledDeliveries receiverUnsettledDeliveries;
private final AtomicBoolean isDisposed = new AtomicBoolean();
- private final Disposable subscription;
private final Receiver receiver;
/**
@@ -80,10 +59,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic
* ServiceBusReceiveMode#RECEIVE_AND_DELETE} is used.
*/
private final boolean isSettled;
- private final Duration timeout;
- private final AmqpRetryPolicy retryPolicy;
private final ReceiveLinkHandler handler;
- private final ReactorProvider provider;
private final Mono sessionIdMono;
private final Mono sessionLockedUntil;
@@ -94,17 +70,16 @@ public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, R
retryPolicy.getRetryOptions());
this.receiver = receiver;
this.handler = handler;
- this.provider = provider;
this.isSettled = receiver.getSenderSettleMode() == SenderSettleMode.SETTLED;
- this.timeout = timeout;
- this.retryPolicy = retryPolicy;
- this.subscription = Flux.interval(timeout).subscribe(i -> cleanupWorkItems());
Map loggingContext = new HashMap<>(2);
loggingContext.put(LINK_NAME_KEY, this.handler.getLinkName());
loggingContext.put(ENTITY_PATH_KEY, entityPath);
this.logger = new ClientLogger(ServiceBusReactorReceiver.class, loggingContext);
+ this.receiverUnsettledDeliveries = new ReceiverUnsettledDeliveries(handler.getHostname(), entityPath, handler.getLinkName(),
+ provider.getReactorDispatcher(), retryPolicy.getRetryOptions(), MessageUtils.ZERO_LOCK_TOKEN, logger);
+
this.sessionIdMono = getEndpointStates().filter(x -> x == AmqpEndpointState.ACTIVE)
.next()
.flatMap(state -> {
@@ -142,7 +117,7 @@ public Mono updateDisposition(String lockToken, DeliveryState deliveryStat
if (isDisposed.get()) {
return monoError(logger, new IllegalStateException("Cannot perform operations on a disposed receiver."));
}
- return updateDispositionInternal(lockToken, deliveryState);
+ return this.receiverUnsettledDeliveries.sendDisposition(lockToken, deliveryState);
}
@Override
@@ -173,37 +148,8 @@ protected Mono closeAsync(String message, ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return super.getIsClosedMono();
}
-
- cleanupWorkItems();
-
- final Mono disposeMono;
- if (!pendingUpdates.isEmpty()) {
- final List> pending = new ArrayList<>();
- final StringJoiner builder = new StringJoiner(", ");
- for (UpdateDispositionWorkItem workItem : pendingUpdates.values()) {
-
- if (workItem.hasTimedout()) {
- continue;
- }
-
- if (workItem.getDeliveryState() instanceof TransactionalState) {
- pending.add(updateDispositionInternal(workItem.getLockToken(), Released.getInstance()));
- } else {
- pending.add(workItem.getMono());
- }
- builder.add(workItem.getLockToken());
- }
-
- logger.info("Waiting for pending updates to complete. Locks: {}", builder.toString());
- disposeMono = Mono.when(pending);
- } else {
- disposeMono = Mono.empty();
- }
-
- return disposeMono.onErrorResume(error -> {
- logger.info("There was an exception while disposing of all links.", error);
- return Mono.empty();
- }).doFinally(signal -> subscription.dispose()).then(super.closeAsync(message, errorCondition));
+ return receiverUnsettledDeliveries.terminateAndAwaitForDispositionsInProgressToComplete()
+ .then(super.closeAsync(message, errorCondition));
}
@Override
@@ -216,282 +162,34 @@ protected Message decodeDelivery(Delivery delivery) {
lockToken = MessageUtils.ZERO_LOCK_TOKEN;
}
- final String lockTokenString = lockToken.toString();
-
- // There is no lock token associated with this delivery, or the lock token is not in the unsettledDeliveries.
- if (lockToken == MessageUtils.ZERO_LOCK_TOKEN || !unsettledDeliveries.containsKey(lockTokenString)) {
+ if (receiverUnsettledDeliveries.containsDelivery(lockToken)) {
+ receiverUnsettledDeliveries.onDispositionAck(lockToken, delivery);
+ // Return empty update disposition messages. The deliveries themselves are ACKs. There is no actual message
+ // to propagate.
+ return EMPTY_MESSAGE;
+ } else {
+ // There is no lock token associated with this delivery, or the lock token is not in the receiverUnsettledDeliveries.
final int messageSize = delivery.pending();
final byte[] buffer = new byte[messageSize];
final int read = receiver.recv(buffer, 0, messageSize);
final Message message = Proton.message();
message.decode(buffer, 0, read);
- // The delivery was already settled from the message broker.
- // This occurs in the case of receive and delete.
if (isSettled) {
+ // The delivery was already settled from the message broker. This occurs in the case of receive and delete.
delivery.disposition(Accepted.getInstance());
delivery.settle();
} else {
- unsettledDeliveries.putIfAbsent(lockToken.toString(), delivery);
+ receiverUnsettledDeliveries.onDelivery(lockToken, delivery);
receiver.advance();
}
return new MessageWithLockToken(message, lockToken);
- } else {
- updateOutcome(lockTokenString, delivery);
-
- // Return empty update disposition messages. The deliveries themselves are ACKs. There is no actual message
- // to propagate.
- return EMPTY_MESSAGE;
- }
- }
-
- private Mono updateDispositionInternal(String lockToken, DeliveryState deliveryState) {
- final Delivery unsettled = unsettledDeliveries.get(lockToken);
- if (unsettled == null) {
-
- logger.atWarning()
- // TODO: it used to be deliveryTag, is it ok to change?
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .log("Delivery not found to update disposition.");
-
- return monoError(logger, Exceptions.propagate(new IllegalArgumentException(
- "Delivery not on receive link.")));
- }
-
- final UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, timeout);
- final Mono result = Mono.create(sink -> {
- workItem.start(sink);
- try {
- provider.getReactorDispatcher().invoke(() -> {
- unsettled.disposition(deliveryState);
- pendingUpdates.put(lockToken, workItem);
- });
- } catch (IOException | RejectedExecutionException error) {
- sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
- error, handler.getErrorContext(receiver)));
- }
- }).cache(); // cache because closeAsync use `when` to subscribe this Mono again.
-
- workItem.setMono(result);
-
- return result;
- }
-
- /**
- * Updates the outcome of a delivery. This occurs when a message is being settled from the receiver side.
- * @param delivery Delivery to update.
- */
- private void updateOutcome(String lockToken, Delivery delivery) {
- final DeliveryState remoteState = delivery.getRemoteState();
-
- logger.atVerbose()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue(DELIVERY_STATE_KEY, remoteState)
- .log("Received update disposition delivery.");
-
- final Outcome remoteOutcome;
- if (remoteState instanceof Outcome) {
- remoteOutcome = (Outcome) remoteState;
- } else if (remoteState instanceof TransactionalState) {
- remoteOutcome = ((TransactionalState) remoteState).getOutcome();
- } else {
- remoteOutcome = null;
- }
-
- if (remoteOutcome == null) {
- logger.atWarning()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue("delivery", delivery)
- .log("No outcome associated with delivery.");
-
- return;
- }
-
- final UpdateDispositionWorkItem workItem = pendingUpdates.get(lockToken);
- if (workItem == null) {
- logger.atWarning()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue("delivery", delivery)
- .log("No pending update for delivery.");
-
- return;
- }
-
- // If the statuses match, then we settle the delivery and move on.
- if (remoteState.getType() == workItem.getDeliveryState().getType()) {
- completeWorkItem(lockToken, delivery, workItem.getSink(), null);
- return;
- }
-
- logger.atInfo()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue("receivedDeliveryState", remoteState)
- .addKeyValue(DELIVERY_STATE_KEY, workItem.getDeliveryState())
- .log("Received delivery state doesn't match expected state.");
-
- switch (remoteState.getType()) {
- case Rejected:
- final Rejected rejected = (Rejected) remoteOutcome;
- final ErrorCondition errorCondition = rejected.getError();
- final Throwable exception = ExceptionUtil.toException(errorCondition.getCondition().toString(),
- errorCondition.getDescription(), handler.getErrorContext(receiver));
-
- final Duration retry = retryPolicy.calculateRetryDelay(exception, workItem.incrementRetry());
- if (retry == null) {
- logger.atInfo()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue(DELIVERY_STATE_KEY, remoteState)
- .log("Retry attempts exhausted.", exception);
-
- completeWorkItem(lockToken, delivery, workItem.getSink(), exception);
- } else {
- workItem.setLastException(exception);
- workItem.resetStartTime();
- try {
- provider.getReactorDispatcher().invoke(() -> delivery.disposition(workItem.getDeliveryState()));
- } catch (IOException | RejectedExecutionException error) {
- final Throwable amqpException = logger.atError()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .log(new AmqpException(false,
- String.format("linkName[%s], deliveryTag[%s]. Retrying updateDisposition failed to dispatch to Reactor.", getLinkName(), lockToken),
- error, handler.getErrorContext(receiver)));
-
- completeWorkItem(lockToken, delivery, workItem.getSink(), amqpException);
- }
- }
-
- break;
- case Released:
- final Throwable cancelled = new AmqpException(false, AmqpErrorCondition.OPERATION_CANCELLED,
- "AMQP layer unexpectedly aborted or disconnected.", handler.getErrorContext(receiver));
-
- logger.atInfo()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue(DELIVERY_STATE_KEY, remoteState)
- .log("Completing pending updateState operation with exception.", cancelled);
-
- completeWorkItem(lockToken, delivery, workItem.getSink(), cancelled);
- break;
- default:
- final AmqpException error = new AmqpException(false, remoteOutcome.toString(),
- handler.getErrorContext(receiver));
-
- logger.atInfo()
- .addKeyValue(LOCK_TOKEN_KEY, lockToken)
- .addKeyValue(DELIVERY_STATE_KEY, remoteState)
- .log("Completing pending updateState operation with exception.", error);
-
- completeWorkItem(lockToken, delivery, workItem.getSink(), error);
- break;
- }
- }
-
- private void cleanupWorkItems() {
- if (pendingUpdates.isEmpty()) {
- return;
}
-
- logger.verbose("Cleaning timed out update work tasks.");
- pendingUpdates.forEach((key, value) -> {
- if (value == null || !value.hasTimedout()) {
- return;
- }
-
- pendingUpdates.remove(key);
- final Throwable error = value.getLastException() != null
- ? value.getLastException()
- : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Update disposition request timed out.",
- handler.getErrorContext(receiver));
-
- completeWorkItem(key, null, value.getSink(), error);
- });
}
- private void completeWorkItem(String lockToken, Delivery delivery, MonoSink sink, Throwable error) {
- final boolean isSettled = delivery != null && delivery.remotelySettled();
- if (isSettled) {
- delivery.settle();
- }
-
- if (error != null) {
- final Throwable loggedError = error instanceof RuntimeException
- ? logger.logExceptionAsError((RuntimeException) error)
- : error;
- sink.error(loggedError);
- } else {
- sink.success();
- }
-
- if (isSettled) {
- pendingUpdates.remove(lockToken);
- unsettledDeliveries.remove(lockToken);
- }
- }
-
- private static final class UpdateDispositionWorkItem {
- private final String lockToken;
- private final DeliveryState state;
- private final Duration timeout;
- private final AtomicInteger retryAttempts = new AtomicInteger();
- private final AtomicBoolean isDisposed = new AtomicBoolean();
-
- private Mono mono;
- private Instant expirationTime;
- private MonoSink sink;
- private Throwable throwable;
-
- private UpdateDispositionWorkItem(String lockToken, DeliveryState state, Duration timeout) {
- this.lockToken = lockToken;
- this.state = state;
- this.timeout = timeout;
- }
-
- private boolean hasTimedout() {
- return expirationTime.isBefore(Instant.now());
- }
-
- private void resetStartTime() {
- this.expirationTime = Instant.now().plus(timeout);
- }
-
- private int incrementRetry() {
- return retryAttempts.incrementAndGet();
- }
-
- private Throwable getLastException() {
- return throwable;
- }
-
- private void setLastException(Throwable throwable) {
- this.throwable = throwable;
- }
-
- private void setMono(Mono mono) {
- this.mono = mono;
- }
-
- private Mono getMono() {
- return mono;
- }
-
- private MonoSink getSink() {
- return sink;
- }
-
- private void start(MonoSink sink) {
- Objects.requireNonNull(sink, "'sink' cannot be null.");
- this.sink = sink;
- this.sink.onDispose(() -> isDisposed.set(true));
- this.sink.onCancel(() -> isDisposed.set(true));
- resetStartTime();
- }
-
- private DeliveryState getDeliveryState() {
- return state;
- }
-
- public String getLockToken() {
- return lockToken;
- }
+ @Override
+ protected void onHandlerClose() {
+ // See the code comment in ReactorReceiver.onHandlerClose(), [temporary method, tobe removed.]
+ receiverUnsettledDeliveries.close();
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java
index bca1c4c635bb..6e4b8693ae9a 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java
@@ -4,7 +4,9 @@
package com.azure.messaging.servicebus.implementation;
import com.azure.core.amqp.AmqpConnection;
+import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
+import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.amqp.implementation.ReactorProvider;
@@ -68,8 +70,7 @@ class ServiceBusReactorReceiverTest {
private ReactorProvider reactorProvider;
@Mock
private ReactorDispatcher reactorDispatcher;
- @Mock
- private AmqpRetryPolicy retryPolicy;
+ private final AmqpRetryPolicy retryPolicy = new FixedAmqpRetryPolicy(new AmqpRetryOptions());
@Mock
private ReceiveLinkHandler receiveLinkHandler;
@Mock