Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMessages #23876

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -239,33 +241,46 @@ public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boole
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
Duration quietTimeout,
Stream<Consumer<T>> consumers) {
long quietTimeoutNanos = quietTimeout.toNanos();
AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime());
FutureUtil.waitForAll(consumers
.map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join();
.map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler,
lastMessageReceivedNanos)).toList()).join();
}

// asynchronously receive messages from a consumer and handle them using the provided message handler
// the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads
// this is useful in tests where multiple consumers are needed to test the functionality
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer, Duration quietTimeout,
BiFunction<Consumer<T>, Message<T>, Boolean>
messageHandler) {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
return receiveFuture
.orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS)
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer,
long quietTimeoutNanos,
long receiveTimeoutNanos,
BiFunction<Consumer<T>, Message<T>, Boolean>
messageHandler,
AtomicLong lastMessageReceivedNanos) {
return consumer.receiveAsync()
.orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
.handle((msg, t) -> {
long currentNanos = System.nanoTime();
if (t != null) {
if (t instanceof TimeoutException) {
// cancel the receive future so that Pulsar client can clean up the resources
receiveFuture.cancel(false);
return false;
long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get();
if (sinceLastMessageReceivedNanos > quietTimeoutNanos) {
return Pair.of(false, 0L);
} else {
return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos);
}
} else {
throw FutureUtil.wrapToCompletionException(t);
}
}
return messageHandler.apply(consumer, msg);
}).thenComposeAsync(receiveMore -> {
lastMessageReceivedNanos.set(currentNanos);
return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos);
}).thenComposeAsync(receiveMoreAndNextTimeout -> {
boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
if (receiveMore) {
return receiveMessagesAsync(consumer, quietTimeout, messageHandler);
Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight();
return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos,
messageHandler, lastMessageReceivedNanos);
} else {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;

@Slf4j
public class BrokerTestUtilTest {
@Test
public void testReceiveMessagesQuietTime() throws Exception {
// Mock consumers
Consumer<Integer> consumer1 = mock(Consumer.class);
Consumer<Integer> consumer2 = mock(Consumer.class);

long consumer1DelayMs = 300L;
long consumer2DelayMs = 400L;
long quietTimeMs = 500L;

// Define behavior for receiveAsync with delay
AtomicBoolean consumer1FutureContinueSupplying = new AtomicBoolean(true);
when(consumer1.receiveAsync()).thenAnswer(invocation -> {
if (consumer1FutureContinueSupplying.get()) {
CompletableFuture<Message> messageCompletableFuture =
CompletableFuture.supplyAsync(() -> mock(Message.class),
CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS));
consumer1FutureContinueSupplying.set(false);
// continue supplying while the future is cancelled or timed out
FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> {
consumer1FutureContinueSupplying.set(true);
});
return messageCompletableFuture;
} else {
return new CompletableFuture<>();
}
});
AtomicBoolean consumer2FutureContinueSupplying = new AtomicBoolean(true);
when(consumer2.receiveAsync()).thenAnswer(invocation -> {
if (consumer2FutureContinueSupplying.get()) {
CompletableFuture<Message> messageCompletableFuture =
CompletableFuture.supplyAsync(() -> mock(Message.class),
CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS));
consumer2FutureContinueSupplying.set(false);
// continue supplying while the future is cancelled or timed out
FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> {
consumer2FutureContinueSupplying.set(true);
});
return messageCompletableFuture;
} else {
return new CompletableFuture<>();
}
});

// Atomic variables to track message handling
AtomicInteger messageCount = new AtomicInteger(0);

// Message handler
BiFunction<Consumer<Integer>, Message<Integer>, Boolean> messageHandler = (consumer, msg) -> {
messageCount.incrementAndGet();
return true;
};

// Track start time
long startTime = System.nanoTime();

// Call receiveMessages method
BrokerTestUtil.receiveMessages(messageHandler, Duration.ofMillis(quietTimeMs), consumer1, consumer2);

// Track end time
long endTime = System.nanoTime();

// Verify that messages were attempted to be received
verify(consumer1, times(3)).receiveAsync();
verify(consumer2, times(2)).receiveAsync();

// Verify that the message handler was called
assertEquals(messageCount.get(), 2);

// Verify the time spent is as expected (within a reasonable margin)
long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs,
consumer2DelayMs + quietTimeMs + (quietTimeMs / 2));
}
}
Loading