diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java index 2020c73065d67..4ae51b0de2df9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java @@ -22,13 +22,16 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME; import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent; import static java.nio.charset.StandardCharsets.UTF_8; - +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests that {@link EventHubConsumerAsyncClient} can be created with various {@link EventPosition EventPositions}. */ @@ -166,33 +169,22 @@ void receiveLatestMessagesNoneAdded() { * Test for receiving message from latest offset */ @Test - void receiveLatestMessages() { + void receiveLatestMessages() throws InterruptedException { // Arrange final String messageId = UUID.randomUUID().toString(); final SendOptions options = new SendOptions().setPartitionId(testData.getPartitionId()); final EventHubProducerClient producer = toClose(createBuilder().buildProducerClient()); final List events = TestUtils.getEvents(15, messageId); + final CountDownLatch receivedAll = new CountDownLatch(numberOfEvents); + toClose(consumer.receiveFromPartition(testData.getPartitionId(), EventPosition.latest()) + .filter(event -> isMatchingEvent(event, messageId)) + .take(numberOfEvents) + .subscribe(e -> receivedAll.countDown(), (ex) -> fail(ex))); - try { - StepVerifier.create(consumer.receiveFromPartition(testData.getPartitionId(), EventPosition.latest()) - .filter(event -> isMatchingEvent(event, messageId)) - .take(numberOfEvents)) - .then(() -> { - try { - producer.send(events, options); - logger.atInfo().log("sent events"); - } catch (RuntimeException ex) { - throw logger.logThrowableAsError(ex); - } - }) - .expectNextCount(numberOfEvents) - .expectComplete() - .verify(TIMEOUT); + producer.send(events, options); + logger.atInfo().log("sent events"); - // Act - } finally { - dispose(producer); - } + assertTrue(receivedAll.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java index d8a03837fd62e..97930bc2ec05c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/TestUtils.java @@ -38,7 +38,7 @@ public final class TestUtils { private static final MessageSerializer MESSAGE_SERIALIZER = new EventHubMessageSerializer(); private static final ClientLogger LOGGER = new ClientLogger(TestUtils.class); -; + // System and application properties from the generated test message. static final Instant ENQUEUED_TIME = Instant.ofEpochSecond(1561344661); static final Long OFFSET = 1534L;