diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 8364cae53b223..e97928c4c66e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -39,10 +39,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -239,33 +241,46 @@ public static void receiveMessages(BiFunction, Message, Boole public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, Duration quietTimeout, Stream> consumers) { + long quietTimeoutNanos = quietTimeout.toNanos(); + AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); FutureUtil.waitForAll(consumers - .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); + .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos)).toList()).join(); } // asynchronously receive messages from a consumer and handle them using the provided message handler // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads // this is useful in tests where multiple consumers are needed to test the functionality - private static CompletableFuture receiveMessagesAsync(Consumer consumer, Duration quietTimeout, - BiFunction, Message, Boolean> - messageHandler) { - CompletableFuture> receiveFuture = consumer.receiveAsync(); - return receiveFuture - .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS) + private static CompletableFuture receiveMessagesAsync(Consumer consumer, + long quietTimeoutNanos, + long receiveTimeoutNanos, + BiFunction, Message, Boolean> + messageHandler, + AtomicLong lastMessageReceivedNanos) { + return consumer.receiveAsync() + .orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS) .handle((msg, t) -> { + long currentNanos = System.nanoTime(); if (t != null) { if (t instanceof TimeoutException) { - // cancel the receive future so that Pulsar client can clean up the resources - receiveFuture.cancel(false); - return false; + long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get(); + if (sinceLastMessageReceivedNanos > quietTimeoutNanos) { + return Pair.of(false, 0L); + } else { + return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos); + } } else { throw FutureUtil.wrapToCompletionException(t); } } - return messageHandler.apply(consumer, msg); - }).thenComposeAsync(receiveMore -> { + lastMessageReceivedNanos.set(currentNanos); + return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos); + }).thenComposeAsync(receiveMoreAndNextTimeout -> { + boolean receiveMore = receiveMoreAndNextTimeout.getLeft(); if (receiveMore) { - return receiveMessagesAsync(consumer, quietTimeout, messageHandler); + Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight(); + return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos, + messageHandler, lastMessageReceivedNanos); } else { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java new file mode 100644 index 0000000000000..90b917a319c71 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +public class BrokerTestUtilTest { + @Test + public void testReceiveMessagesQuietTime() throws Exception { + // Mock consumers + Consumer consumer1 = mock(Consumer.class); + Consumer consumer2 = mock(Consumer.class); + + long consumer1DelayMs = 300L; + long consumer2DelayMs = 400L; + long quietTimeMs = 500L; + + // Define behavior for receiveAsync with delay + AtomicBoolean consumer1FutureContinueSupplying = new AtomicBoolean(true); + when(consumer1.receiveAsync()).thenAnswer(invocation -> { + if (consumer1FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS)); + consumer1FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer1FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + AtomicBoolean consumer2FutureContinueSupplying = new AtomicBoolean(true); + when(consumer2.receiveAsync()).thenAnswer(invocation -> { + if (consumer2FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS)); + consumer2FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer2FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + + // Atomic variables to track message handling + AtomicInteger messageCount = new AtomicInteger(0); + + // Message handler + BiFunction, Message, Boolean> messageHandler = (consumer, msg) -> { + messageCount.incrementAndGet(); + return true; + }; + + // Track start time + long startTime = System.nanoTime(); + + // Call receiveMessages method + BrokerTestUtil.receiveMessages(messageHandler, Duration.ofMillis(quietTimeMs), consumer1, consumer2); + + // Track end time + long endTime = System.nanoTime(); + + // Verify that messages were attempted to be received + verify(consumer1, times(3)).receiveAsync(); + verify(consumer2, times(2)).receiveAsync(); + + // Verify that the message handler was called + assertEquals(messageCount.get(), 2); + + // Verify the time spent is as expected (within a reasonable margin) + long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs, + consumer2DelayMs + quietTimeMs + (quietTimeMs / 2)); + } +} \ No newline at end of file