From 815dc194202426aea1e6a840b031a4ddb5e3458d Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 5 Sep 2022 04:32:59 -0700 Subject: [PATCH] [HUDI-4775] Fixing incremental source for MOR table (#6587) * Fixing incremental source for MOR table * Remove unused import Co-authored-by: Sagar Sumit --- .../sources/helpers/IncrSourceHelper.java | 2 +- .../sources/TestHoodieIncrSource.java | 39 ++++++++++++++++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index cbfb153ee9ca4..d9415d036c312 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -73,7 +73,7 @@ public static Pair> calculateBeginAndEndInstants(Ja HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); final HoodieTimeline activeCommitTimeline = - srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); String beginInstantTime = beginInstant.orElseGet(() -> { if (missingCheckpointStrategy != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 57270bdf812d3..df790cf115364 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -22,7 +22,9 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -30,23 +32,31 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -55,20 +65,39 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { private HoodieTestDataGenerator dataGen; private HoodieTableMetaClient metaClient; + private HoodieTableType tableType = COPY_ON_WRITE; @BeforeEach public void setUp() throws IOException { dataGen = new HoodieTestDataGenerator(); - metaClient = getHoodieMetaClient(hadoopConf(), basePath()); } - @Test - public void testHoodieIncrSource() throws IOException { + @Override + public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException { + props = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class) + .fromProperties(props) + .build(); + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); + } + + private static Stream tableTypeParams() { + return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("tableTypeParams") + public void testHoodieIncrSource(HoodieTableType tableType) throws IOException { + this.tableType = tableType; + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .enable(false).build()) .build(); SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);