diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index c6d07466040b..89b7b42e1b43 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -21,9 +21,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.cloud.Timestamp; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -41,10 +43,13 @@ import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.IOITHelper; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; @@ -63,7 +68,6 @@ import org.apache.beam.sdk.testutils.metrics.TimeMonitor; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -72,8 +76,8 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.windowing.CalendarWindows; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -135,6 +139,8 @@ public class KafkaIOIT { private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class); + private static String expectedHashcode; + private static SyntheticSourceOptions sourceOptions; private static Options options; @@ -196,23 +202,17 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { // Use streaming pipeline to read Kafka records. readPipeline.getOptions().as(Options.class).setStreaming(true); - PCollection count = - readPipeline - .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) - .apply( - "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Window", Window.into(CalendarWindows.years(1))) - .apply( - "Counting element", - Combine.globally(Count.>combineFn()).withoutDefaults()); + readPipeline + .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic())) + .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) + .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); PipelineResult.State writeState = writeResult.waitUntilFinish(); // Fail the test if pipeline failed. assertNotEquals(PipelineResult.State.FAILED, writeState); - PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); - PipelineResult readResult = readPipeline.run(); PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); @@ -221,6 +221,13 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); + long actualRecords = readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME); + assertTrue( + String.format( + "actual number of records %d smaller than expected: %d.", + actualRecords, sourceOptions.numRecords), + sourceOptions.numRecords <= actualRecords); + if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings); @@ -230,25 +237,32 @@ public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException { @Test public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { + // Map of hashes of set size collections with 100b records - 10b key, 90b values. + Map expectedHashes = + ImmutableMap.of( + 1000L, "4507649971ee7c51abbb446e65a5c660", + 100_000_000L, "0f12c27c9a7672e14775594be66cad9a"); + expectedHashcode = getHashForRecordCount(sourceOptions.numRecords, expectedHashes); writePipeline .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions))) .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME))) .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic())); - PCollection count = + PCollection hashcode = readPipeline .apply( "Read from bounded Kafka", readFromBoundedKafka().withTopic(options.getKafkaTopic())) .apply( "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) - .apply("Counting element", Count.globally()); + .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) + .apply("Calculate hashcode", Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.thatSingleton(hashcode).isEqualTo(expectedHashcode); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); - PAssert.thatSingleton(count).isEqualTo(sourceOptions.numRecords); - PipelineResult readResult = readPipeline.run(); PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout())); @@ -257,7 +271,8 @@ public void testKafkaIOReadsAndWritesCorrectlyInBatch() throws IOException { tearDownTopic(options.getKafkaTopic()); cancelIfTimeouted(readResult, readState); - assertNotEquals(PipelineResult.State.FAILED, readState); + // Fail the test if pipeline failed. + assertEquals(PipelineResult.State.DONE, readState); if (!options.isWithTestcontainers()) { Set metrics = readMetrics(writeResult, readResult); @@ -672,7 +687,9 @@ private PipelineResult runWithStopReadingFn( readFromKafka() .withTopic(options.getKafkaTopic() + "-" + topicSuffix) .withCheckStopReadingFn(function)) - .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))); + .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME))) + .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings())) + .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); PipelineResult writeResult = writePipeline.run(); writeResult.waitUntilFinish(); @@ -817,6 +834,19 @@ private KafkaIO.Read readFromKafka() { .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")); } + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement() { + elementCounter.inc(1L); + } + } /** Pipeline options specific for this test. */ public interface Options extends IOTestPipelineOptions, StreamingOptions { @@ -857,6 +887,25 @@ public interface Options extends IOTestPipelineOptions, StreamingOptions { void setKafkaContainerVersion(String kafkaContainerVersion); } + private static class MapKafkaRecordsToStrings + extends SimpleFunction, String> { + @Override + public String apply(KafkaRecord input) { + String key = Arrays.toString(input.getKV().getKey()); + String value = Arrays.toString(input.getKV().getValue()); + return String.format("%s %s", key, value); + } + } + + public static String getHashForRecordCount(long recordCount, Map hashes) { + String hash = hashes.get(recordCount); + if (hash == null) { + throw new UnsupportedOperationException( + String.format("No hash for that record count: %s", recordCount)); + } + return hash; + } + private static void setupKafkaContainer() { kafkaContainer = new KafkaContainer(