From db733a1805605cef9e5560851a0f2852415301d4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 17 Jan 2025 18:10:15 +0200 Subject: [PATCH 1/3] Improve quiet time implementation in BrokerTestUtil.receiveMessages --- .../apache/pulsar/broker/BrokerTestUtil.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 8364cae53b223..fee0ababe114d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -239,33 +240,42 @@ public static void receiveMessages(BiFunction, Message, Boole public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, Duration quietTimeout, Stream> consumers) { + long quietTimeoutNanos = quietTimeout.toNanos(); + AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); FutureUtil.waitForAll(consumers - .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); + .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos)).toList()).join(); } // asynchronously receive messages from a consumer and handle them using the provided message handler // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads // this is useful in tests where multiple consumers are needed to test the functionality - private static CompletableFuture receiveMessagesAsync(Consumer consumer, Duration quietTimeout, - BiFunction, Message, Boolean> - messageHandler) { - CompletableFuture> receiveFuture = consumer.receiveAsync(); - return receiveFuture - .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS) + private static CompletableFuture receiveMessagesAsync(Consumer consumer, + long quietTimeoutNanos, + BiFunction, Message, Boolean> + messageHandler, + AtomicLong lastMessageReceivedNanos) { + return consumer.receiveAsync() + .orTimeout(quietTimeoutNanos, TimeUnit.NANOSECONDS) .handle((msg, t) -> { + long currentNanos = System.nanoTime(); if (t != null) { if (t instanceof TimeoutException) { - // cancel the receive future so that Pulsar client can clean up the resources - receiveFuture.cancel(false); - return false; + if (currentNanos - lastMessageReceivedNanos.get() > quietTimeoutNanos) { + return false; + } else { + return true; + } } else { throw FutureUtil.wrapToCompletionException(t); } } + lastMessageReceivedNanos.set(currentNanos); return messageHandler.apply(consumer, msg); }).thenComposeAsync(receiveMore -> { if (receiveMore) { - return receiveMessagesAsync(consumer, quietTimeout, messageHandler); + return receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos); } else { return CompletableFuture.completedFuture(null); } From 7022394d2175969ad000cd9b39278f18ec181407 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 11:32:23 +0200 Subject: [PATCH 2/3] Handle quiet time correctly when there's a timeout --- .../apache/pulsar/broker/BrokerTestUtil.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index fee0ababe114d..e97928c4c66e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -44,6 +44,7 @@ import java.util.function.BiFunction; import java.util.stream.Stream; import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -243,7 +244,7 @@ public static void receiveMessages(BiFunction, Message, Boole long quietTimeoutNanos = quietTimeout.toNanos(); AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); FutureUtil.waitForAll(consumers - .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, + .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler, lastMessageReceivedNanos)).toList()).join(); } @@ -252,30 +253,34 @@ public static void receiveMessages(BiFunction, Message, Boole // this is useful in tests where multiple consumers are needed to test the functionality private static CompletableFuture receiveMessagesAsync(Consumer consumer, long quietTimeoutNanos, + long receiveTimeoutNanos, BiFunction, Message, Boolean> messageHandler, AtomicLong lastMessageReceivedNanos) { return consumer.receiveAsync() - .orTimeout(quietTimeoutNanos, TimeUnit.NANOSECONDS) + .orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS) .handle((msg, t) -> { long currentNanos = System.nanoTime(); if (t != null) { if (t instanceof TimeoutException) { - if (currentNanos - lastMessageReceivedNanos.get() > quietTimeoutNanos) { - return false; + long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get(); + if (sinceLastMessageReceivedNanos > quietTimeoutNanos) { + return Pair.of(false, 0L); } else { - return true; + return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos); } } else { throw FutureUtil.wrapToCompletionException(t); } } lastMessageReceivedNanos.set(currentNanos); - return messageHandler.apply(consumer, msg); - }).thenComposeAsync(receiveMore -> { + return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos); + }).thenComposeAsync(receiveMoreAndNextTimeout -> { + boolean receiveMore = receiveMoreAndNextTimeout.getLeft(); if (receiveMore) { - return receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, - lastMessageReceivedNanos); + Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight(); + return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos, + messageHandler, lastMessageReceivedNanos); } else { return CompletableFuture.completedFuture(null); } From 2df6d9c7e7764b3a90c3c662c12e87369efb26c0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 12:22:44 +0200 Subject: [PATCH 3/3] Add unit test for BrokerTestUtil.receiveMessages --- .../pulsar/broker/BrokerTestUtilTest.java | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java new file mode 100644 index 0000000000000..90b917a319c71 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +public class BrokerTestUtilTest { + @Test + public void testReceiveMessagesQuietTime() throws Exception { + // Mock consumers + Consumer consumer1 = mock(Consumer.class); + Consumer consumer2 = mock(Consumer.class); + + long consumer1DelayMs = 300L; + long consumer2DelayMs = 400L; + long quietTimeMs = 500L; + + // Define behavior for receiveAsync with delay + AtomicBoolean consumer1FutureContinueSupplying = new AtomicBoolean(true); + when(consumer1.receiveAsync()).thenAnswer(invocation -> { + if (consumer1FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS)); + consumer1FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer1FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + AtomicBoolean consumer2FutureContinueSupplying = new AtomicBoolean(true); + when(consumer2.receiveAsync()).thenAnswer(invocation -> { + if (consumer2FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS)); + consumer2FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer2FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + + // Atomic variables to track message handling + AtomicInteger messageCount = new AtomicInteger(0); + + // Message handler + BiFunction, Message, Boolean> messageHandler = (consumer, msg) -> { + messageCount.incrementAndGet(); + return true; + }; + + // Track start time + long startTime = System.nanoTime(); + + // Call receiveMessages method + BrokerTestUtil.receiveMessages(messageHandler, Duration.ofMillis(quietTimeMs), consumer1, consumer2); + + // Track end time + long endTime = System.nanoTime(); + + // Verify that messages were attempted to be received + verify(consumer1, times(3)).receiveAsync(); + verify(consumer2, times(2)).receiveAsync(); + + // Verify that the message handler was called + assertEquals(messageCount.get(), 2); + + // Verify the time spent is as expected (within a reasonable margin) + long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs, + consumer2DelayMs + quietTimeMs + (quietTimeMs / 2)); + } +} \ No newline at end of file