Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Phase-1] Disposition handling in Amqp-Core (and fixes a thread-hang edege-case in Service Bus disposition) #33593

Merged
merged 5 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,22 @@
<Bug pattern="EI_EXPOSE_REP"/>
</Match>

<!-- The DispositionWork doesn't need to be serializable -->
<Match>
<Class
name="~com\.azure\.core\.amqp\.implementation\.handler\.ReceiverUnsettledDeliveries\$DispositionWork"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Class
name="~com\.azure\.core\.amqp\.implementation\.handler\.ReceiverUnsettledDeliveries\$DispositionWork"/>
<Or>
<Field name="desiredState"/>
<Field name="mono"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>

<!-- For BinaryData, copying array contents degrades performance. The data returned as byte array from BinaryData is expected to be copied by the call if mutability is desired. -->
<Match>
<Class name="com.azure.core.implementation.util.ByteArrayContent"/>
Expand Down
1 change: 1 addition & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ com.azure.tools:azure-sdk-build-tool;1.0.0-beta.1;1.0.0-beta.2
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->

unreleased_com.azure:azure-core-amqp;2.9.0-beta.1

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ protected Mono<Void> getIsClosedMono() {
return isClosedMono.asMono().publishOn(Schedulers.boundedElastic());
}

protected void onHandlerClose() {
// Note: Given the disposition is a generic AMQP feature of brokers that support receive-link with UNSETTLED
// settlement mode, in near future we will enable delivery disposition API in amqp-core 'ReceiverLinkHandler'.
// Such a future API in 'ReceiverLinkHandler' means the handler will own the 'ReceiverUnsettledDeliveries'
// object, and the closing of the handler (i.e., handler.close()) will close 'ReceiverUnsettledDeliveries'.
// TODO: anuchan: Remove onHandlerClose
// This 'onHandlerClose' method is a temporary internal method for the 'ServiceBusReactorReceiver' to close
// the 'ReceiverUnsettledDeliveries' for the interim while we rollout the full disposition API support in
// amqp-core. The 'onHandlerClose' method will be removed once ownership of the 'ReceiverUnsettledDeliveries'
// is abstracted within 'ReceiverLinkHandler', so 'ServiceBusReactorReceiver' no longer have to own it.
}

/**
* Beings the client side close by initiating local-close on underlying receiver.
*
Expand Down Expand Up @@ -459,6 +471,7 @@ private void completeClose() {
}

handler.close();
onHandlerClose();
receiver.free();
try {
trackPrefetchSeqNoSubscription.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ public void onLinkFinal(Event event) {
}

public AmqpErrorContext getErrorContext(Link link) {
return getErrorContext(getHostname(), entityPath, link);
}

static AmqpErrorContext getErrorContext(String hostName, String entityPath, Link link) {
final String referenceId;
if (link.getRemoteProperties() != null && link.getRemoteProperties().containsKey(TRACKING_ID_PROPERTY)) {
referenceId = link.getRemoteProperties().get(TRACKING_ID_PROPERTY).toString();
} else {
referenceId = link.getName();
}

return new LinkErrorContext(getHostname(), entityPath, referenceId, link.getCredit());
return new LinkErrorContext(hostName, entityPath, referenceId, link.getCredit());
}

private void handleRemoteLinkClosed(final String eventName, final Event event) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.engine.Delivery;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;

@Execution(ExecutionMode.SAME_THREAD)
@Isolated
public class ReceiverUnsettledDeliveriesIsolatedTest {
private static final UUID DELIVERY_EMPTY_TAG = new UUID(0L, 0L);
private static final String HOSTNAME = "hostname";
private static final String ENTITY_PATH = "/orders";
private static final String RECEIVER_LINK_NAME = "orders-link";
private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(20);
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(3);
private static final Duration VIRTUAL_TIME_SHIFT = OPERATION_TIMEOUT.plusSeconds(30);
private final ClientLogger logger = new ClientLogger(ReceiverUnsettledDeliveriesTest.class);
private final AmqpRetryOptions retryOptions = new AmqpRetryOptions();
private AutoCloseable mocksCloseable;
@Mock
private ReactorDispatcher reactorDispatcher;
@Mock
private Delivery delivery;

@BeforeEach
public void setup() throws IOException {
mocksCloseable = MockitoAnnotations.openMocks(this);
retryOptions.setTryTimeout(OPERATION_TIMEOUT);
}

@AfterEach
public void teardown() throws Exception {
Mockito.framework().clearInlineMock(this);

if (mocksCloseable != null) {
mocksCloseable.close();
}
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void sendDispositionTimeoutOnExpiration() throws Exception {
final UUID deliveryTag = UUID.randomUUID();

doAnswer(byRunningRunnable()).when(reactorDispatcher).invoke(any(Runnable.class));

try (ReceiverUnsettledDeliveries deliveries = createUnsettledDeliveries()) {
deliveries.onDelivery(deliveryTag, delivery);
final Mono<Void> dispositionMono = deliveries.sendDisposition(deliveryTag.toString(), Accepted.getInstance());
try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
verifier.create(() -> dispositionMono, VIRTUAL_TIME_SHIFT)
.expectErrorSatisfies(error -> {
Assertions.assertTrue(error instanceof AmqpException);
final AmqpException amqpError = (AmqpException) error;
Assertions.assertEquals(AmqpErrorCondition.TIMEOUT_ERROR, amqpError.getErrorCondition());
})
.verify(VERIFY_TIMEOUT);
}
}
}

private ReceiverUnsettledDeliveries createUnsettledDeliveries() {
return new ReceiverUnsettledDeliveries(HOSTNAME, ENTITY_PATH, RECEIVER_LINK_NAME,
reactorDispatcher, retryOptions, DELIVERY_EMPTY_TAG, logger);
}

private static Answer<Void> byRunningRunnable() {
return invocation -> {
final Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
};
}

private static final class VirtualTimeStepVerifier implements AutoCloseable {
private final VirtualTimeScheduler scheduler;

VirtualTimeStepVerifier() {
scheduler = VirtualTimeScheduler.create();
}

<T> StepVerifier.Step<T> create(Supplier<Mono<T>> scenarioSupplier, Duration timeShift) {
return StepVerifier.withVirtualTime(scenarioSupplier, () -> scheduler, 1)
.thenAwait(timeShift);
}

@Override
public void close() {
scheduler.dispose();
}
}
}
Loading