Skip to content

Commit

Permalink
Close message processor downstream. (#12921)
Browse files Browse the repository at this point in the history
* Use dispose.

* Fixing test.
  • Loading branch information
conniey authored Jul 9, 2020
1 parent 08bba18 commit 5399396
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Mono<Void> updateDisposition(String lockToken, DispositionStatus dispositionStat
@Override
public void close() {
if (!isDisposed.getAndSet(true)) {
linkProcessor.cancel();
linkProcessor.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -24,11 +23,10 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

import java.time.Duration;
import java.util.UUID;
Expand All @@ -45,10 +43,13 @@
*/
class ServiceBusAsyncConsumerTest {
private static final String LINK_NAME = "some-link";
private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create();
private final FluxSink<Message> messageProcessorSink = messageProcessor.sink();
private final EmitterProcessor<AmqpEndpointState> endpointProcessor = EmitterProcessor.create();
private final FluxSink<AmqpEndpointState> endpointProcessorSink = endpointProcessor.sink();
private final TestPublisher<ServiceBusReceiveLink> linkPublisher = TestPublisher.create();
private final Flux<ServiceBusReceiveLink> linkFlux = linkPublisher.flux();
private final TestPublisher<Message> messagePublisher = TestPublisher.create();
private final Flux<Message> messageFlux = messagePublisher.flux();
private final TestPublisher<AmqpEndpointState> endpointPublisher = TestPublisher.create();
private final Flux<AmqpEndpointState> endpointStateFlux = endpointPublisher.flux();

private final ClientLogger logger = new ClientLogger(ServiceBusAsyncConsumer.class);

private ServiceBusReceiveLinkProcessor linkProcessor;
Expand Down Expand Up @@ -80,13 +81,10 @@ void setup(TestInfo testInfo) {

MockitoAnnotations.initMocks(this);

when(link.getEndpointStates()).thenReturn(endpointProcessor);
when(link.receive()).thenReturn(messageProcessor);
linkProcessor = Flux.<ServiceBusReceiveLink>create(sink -> sink.onRequest(requested -> {
logger.info("Requested link: {}", requested);
sink.next(link);
})).subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy, parentConnection,
new AmqpErrorContext("a-namespace")));
when(link.getEndpointStates()).thenReturn(endpointStateFlux);
when(link.receive()).thenReturn(messageFlux);
linkProcessor = linkFlux.subscribeWith(new ServiceBusReceiveLinkProcessor(10, retryPolicy,
parentConnection, new AmqpErrorContext("a-namespace")));

when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
when(link.updateDisposition(anyString(), any(DeliveryState.class))).thenReturn(Mono.empty());
Expand All @@ -97,6 +95,11 @@ void teardown(TestInfo testInfo) {
logger.info("[{}]: Tearing down.", testInfo.getDisplayName());

Mockito.framework().clearInlineMocks();

linkProcessor.dispose();
linkPublisher.complete();
endpointPublisher.complete();
messagePublisher.complete();
}

/**
Expand All @@ -122,9 +125,13 @@ void receiveNoAutoComplete() {

// Act and Assert
StepVerifier.create(consumer.receive())
.then(() -> messageProcessorSink.next(message1))
.then(() -> {
linkPublisher.next(link);
endpointPublisher.next(AmqpEndpointState.ACTIVE);
messagePublisher.next(message1);
})
.expectNext(receivedMessage1)
.then(() -> messageProcessorSink.next(message2))
.then(() -> messagePublisher.next(message2))
.expectNext(receivedMessage2)
.thenCancel()
.verify();
Expand All @@ -139,9 +146,6 @@ void receiveNoAutoComplete() {
void canDispose() {
// Arrange
final String lockToken = UUID.randomUUID().toString();
when(linkProcessor.updateDisposition(lockToken, Accepted.getInstance()))
.thenReturn(Mono.error(new IllegalArgumentException("Should not have called complete.")));

final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer);

final Message message1 = mock(Message.class);
Expand All @@ -153,11 +157,15 @@ void canDispose() {
// Act and Assert
StepVerifier.create(consumer.receive())
.then(() -> {
endpointProcessorSink.next(AmqpEndpointState.ACTIVE);
messageProcessorSink.next(message1);
linkPublisher.next(link);
endpointPublisher.next(AmqpEndpointState.ACTIVE);
messagePublisher.next(message1);
})
.expectNext(receivedMessage1)
.then(() -> consumer.close())
.then(() -> {
linkPublisher.complete();
endpointPublisher.complete();
})
.verifyComplete();

verify(link, never()).updateDisposition(anyString(), any(DeliveryState.class));
Expand Down

0 comments on commit 5399396

Please sign in to comment.