From d8161380e232fd4be408e6298827423907a027fb Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Fri, 5 Jan 2024 14:40:45 -0500 Subject: [PATCH] fix: Use message ordering enabled property that comes with streaming pull responses (#1851) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add back in working asserts * Formatting fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Version/delete fixes * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * Add back in working asserts * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Formatting fixes * Version/delete fixes * samples: Schema evolution (#1499) * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add back in working asserts * Formatting fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Version/delete fixes * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * Add back in working asserts * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Formatting fixes * Version/delete fixes --------- Co-authored-by: Owl Bot * Minor fixes for comments * samples: Schema evolution (#1499) * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add back in working asserts * Formatting fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Version/delete fixes * samples: schema evolution * samples: schema evolution * Format fixes * Fix documentation for field. * Add back in working asserts * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Formatting fixes * Version/delete fixes --------- Co-authored-by: Owl Bot * Fix rollback example * Formatting * Formatting and wording fixes * Add new schemas to test directory * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Samples: Fix exception handling * fix: Set x-goog-request-params for streaming pull request * Revert "fix: Set x-goog-request-params for streaming pull request" This reverts commit 3185a3e9d48680d75cc70745f7ea0048d726556b. * Revert "Revert "fix: Set x-goog-request-params for streaming pull request"" This reverts commit 3b1f4d9c0751a8fa676159842208b4213d764ee6. * Thread example * Add examples for limited and unlimited exeuctors * Add back missing semicolon * Revert changes to original async example * Revert changes to original async example * Add examples of different threading models * Make variables final to conform to style. * Fix catches * Fix ids * Fix naming * Revert "Merge pull request #2 from kamalaboulhosn/ML_experiments" This reverts commit 5a435fad03f8c5bc577906ef0088b9899c2963cd, reversing changes made to c3a572560f74fa8e10b7f354352bdd736e6f58aa. * Set blunderbuss config to auto-assign issues and PRs * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Swap writer and reader schema to correct places in sample * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: Use message ordering enabled property that comes with streaming pull responses so that messages are only delivered to the callback one at a time in order when ordering is actually enabled --------- Co-authored-by: Owl Bot --- .../cloud/pubsub/v1/MessageDispatcher.java | 8 +- .../v1/StreamingSubscriberConnection.java | 3 + .../pubsub/v1/MessageDispatcherTest.java | 119 +++++++++++++++++- 3 files changed, 124 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index b257594ea..635bc92d5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -82,6 +82,7 @@ class MessageDispatcher { private final FlowController flowController; private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false); + private AtomicBoolean messageOrderingEnabled = new AtomicBoolean(false); private final Waiter messagesWaiter; @@ -343,6 +344,11 @@ void setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) { } } + @InternalApi + void setMessageOrderingEnabled(boolean messageOrderingEnabled) { + this.messageOrderingEnabled.set(messageOrderingEnabled); + } + private static class OutstandingMessage { private final ReceivedMessage receivedMessage; private final AckHandler ackHandler; @@ -506,7 +512,7 @@ public void run() { } } }; - if (message.getOrderingKey().isEmpty()) { + if (!messageOrderingEnabled.get() || message.getOrderingKey().isEmpty()) { executor.execute(deliverMessageTask); } else { sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 014771f2a..7849bdb74 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -236,9 +236,12 @@ public void onResponse(StreamingPullResponse response) { boolean exactlyOnceDeliveryEnabledResponse = response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled(); + boolean messageOrderingEnabledResponse = + response.getSubscriptionProperties().getMessageOrderingEnabled(); setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse); messageDispatcher.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse); + messageDispatcher.setMessageOrderingEnabled(messageOrderingEnabledResponse); messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); // Only request more if we're not shutdown. diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 9321272b4..c608ee8d5 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -30,18 +30,41 @@ import java.util.concurrent.*; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.threeten.bp.Duration; public class MessageDispatcherTest { private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); private static final int DELIVERY_INFO_COUNT = 3; private static final String ACK_ID = "ACK-ID"; + private static final String ORDERING_KEY = "KEY"; private static final ReceivedMessage TEST_MESSAGE = ReceivedMessage.newBuilder() .setAckId(ACK_ID) .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) .setDeliveryAttempt(DELIVERY_INFO_COUNT) .build(); + private static final ByteString ORDERED_MESSAGE_DATA_1 = ByteString.copyFromUtf8("message-data1"); + private static final ReceivedMessage ORDERED_TEST_MESSAGE_1 = + ReceivedMessage.newBuilder() + .setAckId("ACK-ID-1") + .setMessage( + PubsubMessage.newBuilder() + .setData(ORDERED_MESSAGE_DATA_1) + .setOrderingKey(ORDERING_KEY) + .build()) + .build(); + private static final ByteString ORDERED_MESSAGE_DATA_2 = ByteString.copyFromUtf8("message-data2"); + private static final ReceivedMessage ORDERED_TEST_MESSAGE_2 = + ReceivedMessage.newBuilder() + .setAckId("ACK-ID-2") + .setMessage( + PubsubMessage.newBuilder() + .setData(ORDERED_MESSAGE_DATA_2) + .setOrderingKey(ORDERING_KEY) + .build()) + .build(); private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60; private static final int MIN_ACK_DEADLINE_SECONDS = 10; private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); @@ -494,6 +517,84 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() { Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds())); } + @Test + public void testOrderedDeliveryOrderingDisabled() throws Exception { + MessageReceiver mockMessageReceiver = mock(MessageReceiver.class); + MessageDispatcher messageDispatcher = + getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5)); + + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setMessageOrderingEnabled(false); + + CountDownLatch receiveCalls = new CountDownLatch(2); + + doAnswer( + new Answer() { + public Void answer(InvocationOnMock invocation) throws Exception { + Thread.sleep(1000); + receiveCalls.countDown(); + return null; + } + }) + .when(mockMessageReceiver) + .receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class)); + doAnswer( + new Answer() { + public Void answer(InvocationOnMock invocation) { + // Ensure the previous method didn't finish and we could process in parallel. + assertEquals(2, receiveCalls.getCount()); + receiveCalls.countDown(); + return null; + } + }) + .when(mockMessageReceiver) + .receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class)); + + messageDispatcher.processReceivedMessages( + Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2)); + receiveCalls.await(); + } + + @Test + public void testOrderedDeliveryOrderingEnabled() throws Exception { + MessageReceiver mockMessageReceiver = mock(MessageReceiver.class); + MessageDispatcher messageDispatcher = + getMessageDispatcher(mockMessageReceiver, Executors.newFixedThreadPool(5)); + + // This would normally be set from the streaming pull response in the + // StreamingSubscriberConnection + messageDispatcher.setMessageOrderingEnabled(true); + + CountDownLatch receiveCalls = new CountDownLatch(2); + + doAnswer( + new Answer() { + public Void answer(InvocationOnMock invocation) throws Exception { + Thread.sleep(1000); + receiveCalls.countDown(); + return null; + } + }) + .when(mockMessageReceiver) + .receiveMessage(eq(ORDERED_TEST_MESSAGE_1.getMessage()), any(AckReplyConsumer.class)); + doAnswer( + new Answer() { + public Void answer(InvocationOnMock invocation) { + // Ensure the previous method has finished completely. + assertEquals(1, receiveCalls.getCount()); + receiveCalls.countDown(); + return null; + } + }) + .when(mockMessageReceiver) + .receiveMessage(eq(ORDERED_TEST_MESSAGE_2.getMessage()), any(AckReplyConsumer.class)); + + messageDispatcher.processReceivedMessages( + Arrays.asList(ORDERED_TEST_MESSAGE_1, ORDERED_TEST_MESSAGE_2)); + receiveCalls.await(); + } + @Test public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() { int customMinSeconds = 30; @@ -569,20 +670,28 @@ private void assertMinAndMaxAckDeadlines( } private MessageDispatcher getMessageDispatcher() { - return getMessageDispatcher(mock(MessageReceiver.class)); + return getMessageDispatcher(mock(MessageReceiver.class), MoreExecutors.directExecutor()); } private MessageDispatcher getMessageDispatcher(MessageReceiver messageReceiver) { - return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver)); + return getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiver), MoreExecutors.directExecutor()); + } + + private MessageDispatcher getMessageDispatcher( + MessageReceiver messageReceiver, Executor executor) { + return getMessageDispatcherFromBuilder(MessageDispatcher.newBuilder(messageReceiver), executor); } private MessageDispatcher getMessageDispatcher( MessageReceiverWithAckResponse messageReceiverWithAckResponse) { return getMessageDispatcherFromBuilder( - MessageDispatcher.newBuilder(messageReceiverWithAckResponse)); + MessageDispatcher.newBuilder(messageReceiverWithAckResponse), + MoreExecutors.directExecutor()); } - private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Builder builder) { + private MessageDispatcher getMessageDispatcherFromBuilder( + MessageDispatcher.Builder builder, Executor executor) { MessageDispatcher messageDispatcher = builder .setAckProcessor(mockAckProcessor) @@ -594,7 +703,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Buil .setMaxDurationPerAckExtensionDefaultUsed(true) .setAckLatencyDistribution(mock(Distribution.class)) .setFlowController(mock(FlowController.class)) - .setExecutor(MoreExecutors.directExecutor()) + .setExecutor(executor) .setSystemExecutor(systemExecutor) .setApiClock(clock) .build();