From 74d8da97c42b672e3f9f26b9a535d6bac948a402 Mon Sep 17 00:00:00 2001 From: maitrimangal <121899734+maitrimangal@users.noreply.github.com> Date: Thu, 24 Aug 2023 16:06:05 -0400 Subject: [PATCH] feat: Receipt modack (#1540) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * receipt-modack for exactly once * changing setup * changing the pendingReceipt List * using scheduled fixed rate * using blocked queues * using blocked queues * using blocked queues * adding null safety * adding null safety * removing list * adding list back * if permanent failure, remove outstandingmsg from queue * adding snippet of test * adding method to streaming subscriber * adding method to streaming subscriber * adding notifyAcks * changing notifyAckFailed calls * addressing some comments * changed logic to use one datastructure * fixing notifyFailed * fixing notifyFailed * changing Pair to custom class * removing the not needed data structure * Fixing test * Fixing test * Fixing test * Fixing test * fixing format * fixing test to call receiveMessage * testing test failure * testing test failure * testing test failure * increasing timestamp to test * increasing timestamp to test * adding log statement for testing * Fixing lint * Adding more logs * batch size log * changing method to syncronized * fixing for loop to not remove as we are iterating * trying a concurrent map * fix: syncronizing notifyFailed * fix: removing unused import * fix: reformat * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: removing System.out.println statements * fix: reviewign comments * fix: lint * adding another ordering key test example * fix: trying to run this test again * fix: trying to run this test again * fix: removing commented code * fix: removing commented code * resolving the comments from review * adding custom matcher * adding custom matcher * adding custom matcher * adding custom matcher * adding custom matcher correcting the matching statement * lint * removing comments * removing comments * removing comments * changing messageMatcher to messageDataMatcher, and fixing other nit things * lint * addressing review comments * addressing review comments --------- Co-authored-by: Owl Bot --- .../cloud/pubsub/v1/MessageDispatcher.java | 71 +++++++++++++- .../v1/StreamingSubscriberConnection.java | 4 + .../cloud/pubsub/v1/MessageDataMatcher.java | 35 +++++++ .../pubsub/v1/MessageDispatcherTest.java | 94 +++++++++++++++++++ 4 files changed, 200 insertions(+), 4 deletions(-) create mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 3c772819a..9556849bb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -30,8 +30,11 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -89,7 +92,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - + private final LinkedHashMap outstandingReceipts = + new LinkedHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -350,6 +354,28 @@ private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandle } } + private static class ReceiptCompleteData { + private OutstandingMessage outstandingMessage; + private Boolean receiptComplete; + + private ReceiptCompleteData(OutstandingMessage outstandingMessage) { + this.outstandingMessage = outstandingMessage; + this.receiptComplete = false; + } + + private OutstandingMessage getOutstandingMessage() { + return this.outstandingMessage; + } + + private Boolean isReceiptComplete() { + return this.receiptComplete; + } + + private void notifyReceiptComplete() { + this.receiptComplete = true; + } + } + void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); @@ -361,7 +387,13 @@ void processReceivedMessages(List messages) { AckRequestData ackRequestData = builder.build(); AckHandler ackHandler = new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration); - if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { + OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler); + + if (this.exactlyOnceDeliveryEnabled.get()) { + // For exactly once deliveries we don't add to outstanding batch because we first + // process the receipt modack. If that is successful then we process the message. + outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage)); + } else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the // previously-mapped element. // If the previous element is not null, we already have the message and the new one is @@ -371,14 +403,44 @@ void processReceivedMessages(List messages) { // we want to eventually // totally expire so that pubsub service sends us the message again. continue; + } else { + outstandingBatch.add(outstandingMessage); } - outstandingBatch.add(new OutstandingMessage(message, ackHandler)); pendingReceipts.add(ackRequestData); } - processBatch(outstandingBatch); } + synchronized void notifyAckSuccess(AckRequestData ackRequestData) { + + if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { + outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); + List outstandingBatch = new ArrayList<>(); + + for (Iterator> it = + outstandingReceipts.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry receipt = it.next(); + // If receipt is complete then add to outstandingBatch to process the batch + if (receipt.getValue().isReceiptComplete()) { + it.remove(); + if (pendingMessages.putIfAbsent( + receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler) + == null) { + outstandingBatch.add(receipt.getValue().getOutstandingMessage()); + } + } else { + break; + } + } + processBatch(outstandingBatch); + } + } + + synchronized void notifyAckFailed(AckRequestData ackRequestData) { + outstandingReceipts.remove(ackRequestData.getAckId()); + } + private void processBatch(List batch) { messagesWaiter.incrementPendingCount(batch.size()); for (OutstandingMessage message : batch) { @@ -519,6 +581,7 @@ void extendDeadlines() { @InternalApi void processOutstandingOperations() { + List modackRequestData = new ArrayList(); // Nacks are modacks with an expiration of 0 diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 160032c7a..014771f2a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -527,6 +527,7 @@ public void onSuccess(Empty empty) { for (AckRequestData ackRequestData : ackRequestDataList) { // This will check if a response is needed, and if it has already been set ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + messageDispatcher.notifyAckSuccess(ackRequestData); // Remove from our pending operations pendingRequests.remove(ackRequestData); } @@ -564,12 +565,15 @@ public void onFailure(Throwable t) { "Permanent error invalid ack id message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); + messageDispatcher.notifyAckFailed(ackRequestData); } else { logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); + messageDispatcher.notifyAckFailed(ackRequestData); } } else { ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + messageDispatcher.notifyAckSuccess(ackRequestData); } // Remove from our pending pendingRequests.remove(ackRequestData); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java new file mode 100644 index 000000000..745b18244 --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.mockito.ArgumentMatcher; + +public class MessageDataMatcher implements ArgumentMatcher { + + private ByteString expectedData; + + public MessageDataMatcher(ByteString expectedData) { + this.expectedData = expectedData; + } + + @Override + public boolean matches(PubsubMessage message2) { + return (expectedData.equals(message2.getData())); + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 0b48e0991..9321272b4 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -139,6 +139,100 @@ public void testReceiptMessageReceiver() { .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class)); } + @Test + public void testReceiptModackWithOrderingForExactlyOnceDelivered() { + + MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse = + mock(MessageReceiverWithAckResponse.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse); + messageDispatcher.setExactlyOnceDeliveryEnabled(true); + + ReceivedMessage TEST_MESSAGE1 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID1") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data1")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + ReceivedMessage TEST_MESSAGE2 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID2") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data2")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + ReceivedMessage TEST_MESSAGE3 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID3") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data3")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + + messageDispatcher.processReceivedMessages( + Arrays.asList(TEST_MESSAGE3, TEST_MESSAGE2, TEST_MESSAGE1)); + + messageDispatcher.processOutstandingOperations(); + verify(mockMessageReceiverWithAckResponse, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class)); + + AckRequestData ackRequestData1 = AckRequestData.newBuilder(TEST_MESSAGE1.getAckId()).build(); + AckRequestData ackRequestData2 = AckRequestData.newBuilder(TEST_MESSAGE2.getAckId()).build(); + AckRequestData ackRequestData3 = AckRequestData.newBuilder(TEST_MESSAGE3.getAckId()).build(); + messageDispatcher.notifyAckSuccess(ackRequestData2); + messageDispatcher.processOutstandingOperations(); + + messageDispatcher.notifyAckSuccess(ackRequestData1); + messageDispatcher.notifyAckSuccess(ackRequestData3); + messageDispatcher.processOutstandingOperations(); + + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE3.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE2.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE1.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + } + + @Test + public void testReceiptModackForExactlyOnceDelivered() { + + MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse = + mock(MessageReceiverWithAckResponse.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse); + messageDispatcher.setExactlyOnceDeliveryEnabled(true); + + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + messageDispatcher.processOutstandingOperations(); + verify(mockMessageReceiverWithAckResponse, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class)); + + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + messageDispatcher.notifyAckSuccess(ackRequestData); + messageDispatcher.processOutstandingOperations(); + + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + } + @Test public void testReceiptMessageReceiverWithAckResponse() { MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =