Skip to content

Commit

Permalink
a bit more stabilization
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Mar 7, 2023
1 parent 0e024e0 commit 062043e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.azure.messaging.eventhubs.EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME;
import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent;
import static java.nio.charset.StandardCharsets.UTF_8;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Tests that {@link EventHubConsumerAsyncClient} can be created with various {@link EventPosition EventPositions}.
*/
Expand Down Expand Up @@ -166,33 +169,22 @@ void receiveLatestMessagesNoneAdded() {
* Test for receiving message from latest offset
*/
@Test
void receiveLatestMessages() {
void receiveLatestMessages() throws InterruptedException {
// Arrange
final String messageId = UUID.randomUUID().toString();
final SendOptions options = new SendOptions().setPartitionId(testData.getPartitionId());
final EventHubProducerClient producer = toClose(createBuilder().buildProducerClient());
final List<EventData> events = TestUtils.getEvents(15, messageId);
final CountDownLatch receivedAll = new CountDownLatch(numberOfEvents);
toClose(consumer.receiveFromPartition(testData.getPartitionId(), EventPosition.latest())
.filter(event -> isMatchingEvent(event, messageId))
.take(numberOfEvents)
.subscribe(e -> receivedAll.countDown(), (ex) -> fail(ex)));

try {
StepVerifier.create(consumer.receiveFromPartition(testData.getPartitionId(), EventPosition.latest())
.filter(event -> isMatchingEvent(event, messageId))
.take(numberOfEvents))
.then(() -> {
try {
producer.send(events, options);
logger.atInfo().log("sent events");
} catch (RuntimeException ex) {
throw logger.logThrowableAsError(ex);
}
})
.expectNextCount(numberOfEvents)
.expectComplete()
.verify(TIMEOUT);
producer.send(events, options);
logger.atInfo().log("sent events");

// Act
} finally {
dispose(producer);
}
assertTrue(receivedAll.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public final class TestUtils {
private static final MessageSerializer MESSAGE_SERIALIZER = new EventHubMessageSerializer();
private static final ClientLogger LOGGER = new ClientLogger(TestUtils.class);
;

// System and application properties from the generated test message.
static final Instant ENQUEUED_TIME = Instant.ofEpochSecond(1561344661);
static final Long OFFSET = 1534L;
Expand Down

0 comments on commit 062043e

Please sign in to comment.