Skip to content

Commit

Permalink
[Improve][E2E] improve kafka e2e (#8295)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Dec 15, 2024
1 parent a139595 commit 3b64972
Showing 1 changed file with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;

Expand Down Expand Up @@ -104,7 +103,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

@Slf4j
public class KafkaIT extends TestSuiteBase implements TestResource {
Expand Down Expand Up @@ -132,8 +131,7 @@ public void startUp() throws Exception {
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started");
Awaitility.given()
.ignoreExceptions()
given().ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
Expand Down Expand Up @@ -789,11 +787,12 @@ public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
}
return null;
});
TimeUnit.MINUTES.sleep(5);
// wait for data written to kafka
Long finalEndOffset = endOffset;
await().atMost(5, TimeUnit.MINUTES)
.pollInterval(5000, TimeUnit.MILLISECONDS)
given().pollDelay(30, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.SECONDS)
.await()
.atMost(5, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertTrue(
Expand Down

0 comments on commit 3b64972

Please sign in to comment.