Skip to content

Commit

Permalink
Fix issue where additional credits were not added to link (#8989)
Browse files Browse the repository at this point in the history
* Move get credit to correct location.

* Add test
  • Loading branch information
conniey authored Mar 12, 2020
1 parent f770d24 commit fb7ed0f
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl
this.dispatcher = dispatcher;
this.messagesProcessor = this.handler.getDeliveredMessages()
.map(delivery -> decodeDelivery(delivery))
.subscribeWith(EmitterProcessor.create());

this.messagesProcessor.doOnNext(next -> {
if (receiver.getRemoteCredit() == 0) {
final Supplier<Integer> supplier = creditSupplier.get();
if (supplier == null) {
return;
.doOnNext(next -> {
if (receiver.getRemoteCredit() == 0) {
final Supplier<Integer> supplier = creditSupplier.get();
if (supplier == null) {
return;
}

final Integer credits = supplier.get();
if (credits != null && credits > 0) {
addCredits(credits);
}
}

final Integer credits = supplier.get();
if (credits != null && credits > 0) {
addCredits(credits);
}
}
});
})
.subscribeWith(EmitterProcessor.create());

this.subscriptions = Disposables.composite(
this.handler.getEndpointStates().subscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
Expand All @@ -35,9 +37,12 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -59,6 +64,8 @@ class ReactorReceiverTest {
private Record record;
@Mock
private ReactorDispatcher dispatcher;
@Mock
private Supplier<Integer> creditSupplier;

@Captor
private ArgumentCaptor<Runnable> dispatcherCaptor;
Expand All @@ -77,7 +84,7 @@ static void afterAll() {
}

@BeforeEach
void setup() throws IOException {
void setup() {
MockitoAnnotations.initMocks(this);

when(cbsNode.authorize(any(), any())).thenReturn(Mono.empty());
Expand Down Expand Up @@ -120,7 +127,7 @@ void addCredits() throws IOException {
final List<Runnable> invocations = dispatcherCaptor.getAllValues();
assertEquals(1, invocations.size());

// Apply the invocation. This should actually set the credits.
// Apply the invocation.
invocations.get(0).run();

verify(receiver).flow(credits);
Expand Down Expand Up @@ -195,4 +202,62 @@ void closesOnNonAmqpException() {
.expectComplete()
.verify(Duration.ofSeconds(10));
}

@Test
void addsMoreCreditsWhenPrefetchIsDone() throws IOException {
// Arrange
// This message was copied from one that was received.
final byte[] messageBytes = new byte[] { 0, 83, 114, -63, 73, 6, -93, 21, 120, 45, 111, 112, 116, 45, 115, 101,
113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 85, 0, -93, 12, 120, 45, 111, 112, 116, 45,
111, 102, 102, 115, 101, 116, -95, 1, 48, -93, 19, 120, 45, 111, 112, 116, 45, 101, 110, 113, 117, 101, 117,
101, 100, 45, 116, 105, 109, 101, -125, 0, 0, 1, 112, -54, 124, -41, 90, 0, 83, 117, -96, 12, 80, 111, 115,
105, 116, 105, 111, 110, 53, 58, 32, 48};
final Link link = mock(Link.class);
final Delivery delivery = mock(Delivery.class);

when(event.getLink()).thenReturn(link);
when(event.getDelivery()).thenReturn(delivery);

when(delivery.getLink()).thenReturn(receiver);
when(delivery.isPartial()).thenReturn(false);
when(delivery.isSettled()).thenReturn(false);
when(delivery.pending()).thenReturn(messageBytes.length);

when(receiver.getRemoteCredit()).thenReturn(0);
when(receiver.recv(any(), eq(0), eq(messageBytes.length))).thenAnswer(invocation -> {
final byte[] buffer = invocation.getArgument(0);
System.arraycopy(messageBytes, 0, buffer, 0, messageBytes.length);
return messageBytes.length;
});

when(creditSupplier.get()).thenReturn(10);
reactorReceiver.setEmptyCreditListener(creditSupplier);

// Act & Assert
StepVerifier.create(reactorReceiver.receive())
.then(() -> receiverHandler.onDelivery(event))
.assertNext(message -> {
Assertions.assertNotNull(message.getMessageAnnotations());

final Map<Symbol, Object> values = message.getMessageAnnotations().getValue();
Assertions.assertTrue(values.containsKey(Symbol.getSymbol(AmqpMessageConstant.OFFSET_ANNOTATION_NAME.getValue())));
Assertions.assertTrue(values.containsKey(Symbol.getSymbol(AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME.getValue())));
Assertions.assertTrue(values.containsKey(Symbol.getSymbol(AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME.getValue())));
})
.thenCancel()
.verify();

verify(creditSupplier).get();

// Verify that the get addCredits was called on that dispatcher.
verify(dispatcher).invoke(dispatcherCaptor.capture());

final List<Runnable> invocations = dispatcherCaptor.getAllValues();
assertEquals(1, invocations.size());

// Apply the invocation.
invocations.get(0).run();

verify(receiver).flow(10);
}
}

0 comments on commit fb7ed0f

Please sign in to comment.