Skip to content

Commit

Permalink
Fixing a bug in setting delivery count on a received message. It was …
Browse files Browse the repository at this point in the history
…starting from 0, where as it should start from 1. (#11845)

The SDK sets delivery count on a received message starting from 0. If a message is received the first time, its delivery count is 0. But the real delivery count for service bus messages starts from 1. The discrepancy arose because AMQP spec defines delivery count as something that starts from 0. To comply with the spec, service bus puts (delivery count - 1) on the AMQP frame and the SDK is expected to increment it by 1.
  • Loading branch information
yvgopal authored Jun 8, 2020
1 parent 7405526 commit c11a386
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
}

// Header
brokeredMessage.setDeliveryCount(amqpMessage.getDeliveryCount());
// Delivery count for service bus starts from 1, for AMQP it starts from 0.
brokeredMessage.setDeliveryCount(amqpMessage.getDeliveryCount() + 1);
brokeredMessage.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl()));


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public CompletableFuture<Void> onMessageAsync(IMessage message) {
CompletableFuture<Void> countingFuture = CompletableFuture.runAsync(() -> {
this.maxConcurrencyCounter.incrementCount();
//System.out.println("Message Received - " + message.getMessageId() + " - delivery count:" + message.getDeliveryCount() + " - Thread:" + Thread.currentThread());
if (this.firstThrowException && message.getDeliveryCount() == 0) {
if (this.firstThrowException && message.getDeliveryCount() == 1) {
this.messageCountDownLatch.countDown();
this.maxConcurrencyCounter.decrementCount();
throw new RuntimeException("Dummy exception to cause abandon");
Expand Down Expand Up @@ -359,7 +359,7 @@ public CompletableFuture<Void> onMessageAsync(IMessageSession session, IMessage
this.maxConcurrencyCounter.incrementCount();
this.receivedSeesions.add(session.getSessionId());
//System.out.println("SessionID:" + session.getSessionId() + " - Message Received - " + message.getMessageId() + " - delivery count:" + message.getDeliveryCount() + " - Thread:" + Thread.currentThread() + ":" + Instant.now());
if (this.firstThrowException && message.getDeliveryCount() == 0) {
if (this.firstThrowException && message.getDeliveryCount() == 1) {
this.messageCountDownLatch.countDown();
this.maxConcurrencyCounter.decrementCount();
throw new RuntimeException("Dummy exception to cause abandon");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public static void testBasicReceiveAndAbandon(IMessageSender sender, String sess
Assert.assertNotNull("Message not received", receivedMessage);
Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId());
long deliveryCount = receivedMessage.getDeliveryCount();
Assert.assertEquals("Wrong delivery count for received message", 1, deliveryCount);
receiver.abandon(receivedMessage.getLockToken());
receivedMessage = receiver.receive();
Assert.assertNotNull("Message not received", receivedMessage);
Expand Down Expand Up @@ -516,6 +517,7 @@ public static void testReceiveBySequenceNumberAndAbandon(IMessageSender sender,
Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber());
Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId());
long deliveryCount = receivedMessage.getDeliveryCount();
Assert.assertEquals("Wrong delivery count for received message", 2, deliveryCount);
receiver.abandon(receivedMessage.getLockToken());

// Try to receive by sequence number again
Expand Down

0 comments on commit c11a386

Please sign in to comment.