diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 3c772819a..9556849bb 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -30,8 +30,11 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -89,7 +92,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - + private final LinkedHashMap outstandingReceipts = + new LinkedHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -350,6 +354,28 @@ private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandle } } + private static class ReceiptCompleteData { + private OutstandingMessage outstandingMessage; + private Boolean receiptComplete; + + private ReceiptCompleteData(OutstandingMessage outstandingMessage) { + this.outstandingMessage = outstandingMessage; + this.receiptComplete = false; + } + + private OutstandingMessage getOutstandingMessage() { + return this.outstandingMessage; + } + + private Boolean isReceiptComplete() { + return this.receiptComplete; + } + + private void notifyReceiptComplete() { + this.receiptComplete = true; + } + } + void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); @@ -361,7 +387,13 @@ void processReceivedMessages(List messages) { AckRequestData ackRequestData = builder.build(); AckHandler ackHandler = new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration); - if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { + OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler); + + if (this.exactlyOnceDeliveryEnabled.get()) { + // For exactly once deliveries we don't add to outstanding batch because we first + // process the receipt modack. If that is successful then we process the message. + outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage)); + } else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) { // putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the // previously-mapped element. // If the previous element is not null, we already have the message and the new one is @@ -371,14 +403,44 @@ void processReceivedMessages(List messages) { // we want to eventually // totally expire so that pubsub service sends us the message again. continue; + } else { + outstandingBatch.add(outstandingMessage); } - outstandingBatch.add(new OutstandingMessage(message, ackHandler)); pendingReceipts.add(ackRequestData); } - processBatch(outstandingBatch); } + synchronized void notifyAckSuccess(AckRequestData ackRequestData) { + + if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { + outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); + List outstandingBatch = new ArrayList<>(); + + for (Iterator> it = + outstandingReceipts.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry receipt = it.next(); + // If receipt is complete then add to outstandingBatch to process the batch + if (receipt.getValue().isReceiptComplete()) { + it.remove(); + if (pendingMessages.putIfAbsent( + receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler) + == null) { + outstandingBatch.add(receipt.getValue().getOutstandingMessage()); + } + } else { + break; + } + } + processBatch(outstandingBatch); + } + } + + synchronized void notifyAckFailed(AckRequestData ackRequestData) { + outstandingReceipts.remove(ackRequestData.getAckId()); + } + private void processBatch(List batch) { messagesWaiter.incrementPendingCount(batch.size()); for (OutstandingMessage message : batch) { @@ -519,6 +581,7 @@ void extendDeadlines() { @InternalApi void processOutstandingOperations() { + List modackRequestData = new ArrayList(); // Nacks are modacks with an expiration of 0 diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 160032c7a..014771f2a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -527,6 +527,7 @@ public void onSuccess(Empty empty) { for (AckRequestData ackRequestData : ackRequestDataList) { // This will check if a response is needed, and if it has already been set ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + messageDispatcher.notifyAckSuccess(ackRequestData); // Remove from our pending operations pendingRequests.remove(ackRequestData); } @@ -564,12 +565,15 @@ public void onFailure(Throwable t) { "Permanent error invalid ack id message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess); + messageDispatcher.notifyAckFailed(ackRequestData); } else { logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage); ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); + messageDispatcher.notifyAckFailed(ackRequestData); } } else { ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + messageDispatcher.notifyAckSuccess(ackRequestData); } // Remove from our pending pendingRequests.remove(ackRequestData); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java new file mode 100644 index 000000000..745b18244 --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDataMatcher.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.mockito.ArgumentMatcher; + +public class MessageDataMatcher implements ArgumentMatcher { + + private ByteString expectedData; + + public MessageDataMatcher(ByteString expectedData) { + this.expectedData = expectedData; + } + + @Override + public boolean matches(PubsubMessage message2) { + return (expectedData.equals(message2.getData())); + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 0b48e0991..9321272b4 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -139,6 +139,100 @@ public void testReceiptMessageReceiver() { .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class)); } + @Test + public void testReceiptModackWithOrderingForExactlyOnceDelivered() { + + MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse = + mock(MessageReceiverWithAckResponse.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse); + messageDispatcher.setExactlyOnceDeliveryEnabled(true); + + ReceivedMessage TEST_MESSAGE1 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID1") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data1")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + ReceivedMessage TEST_MESSAGE2 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID2") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data2")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + ReceivedMessage TEST_MESSAGE3 = + ReceivedMessage.newBuilder() + .setAckId("ACK_ID3") + .setMessage( + PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("message-data3")) + .build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) + .build(); + + messageDispatcher.processReceivedMessages( + Arrays.asList(TEST_MESSAGE3, TEST_MESSAGE2, TEST_MESSAGE1)); + + messageDispatcher.processOutstandingOperations(); + verify(mockMessageReceiverWithAckResponse, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class)); + + AckRequestData ackRequestData1 = AckRequestData.newBuilder(TEST_MESSAGE1.getAckId()).build(); + AckRequestData ackRequestData2 = AckRequestData.newBuilder(TEST_MESSAGE2.getAckId()).build(); + AckRequestData ackRequestData3 = AckRequestData.newBuilder(TEST_MESSAGE3.getAckId()).build(); + messageDispatcher.notifyAckSuccess(ackRequestData2); + messageDispatcher.processOutstandingOperations(); + + messageDispatcher.notifyAckSuccess(ackRequestData1); + messageDispatcher.notifyAckSuccess(ackRequestData3); + messageDispatcher.processOutstandingOperations(); + + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE3.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE2.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE1.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + } + + @Test + public void testReceiptModackForExactlyOnceDelivered() { + + MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse = + mock(MessageReceiverWithAckResponse.class); + MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse); + messageDispatcher.setExactlyOnceDeliveryEnabled(true); + + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + messageDispatcher.processOutstandingOperations(); + verify(mockMessageReceiverWithAckResponse, never()) + .receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class)); + + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + messageDispatcher.notifyAckSuccess(ackRequestData); + messageDispatcher.processOutstandingOperations(); + + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData)); + + verify(mockMessageReceiverWithAckResponse, times(1)) + .receiveMessage( + argThat(new MessageDataMatcher(TEST_MESSAGE.getMessage().getData())), + any(AckReplyConsumerWithResponse.class)); + } + @Test public void testReceiptMessageReceiverWithAckResponse() { MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =