Skip to content

Commit

Permalink
[HUDI-4775] Fixing incremental source for MOR table (apache#6587)
Browse files Browse the repository at this point in the history
* Fixing incremental source for MOR table

* Remove unused import

Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
2 people authored and voonhous committed Oct 7, 2022
1 parent f00e1b6 commit 815dc19
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static Pair<String, Pair<String, String>> calculateBeginAndEndInstants(Ja
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();

final HoodieTimeline activeCommitTimeline =
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();

String beginInstantTime = beginInstant.orElseGet(() -> {
if (missingCheckpointStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,41 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -55,20 +65,39 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {

private HoodieTestDataGenerator dataGen;
private HoodieTableMetaClient metaClient;
private HoodieTableType tableType = COPY_ON_WRITE;

@BeforeEach
public void setUp() throws IOException {
dataGen = new HoodieTestDataGenerator();
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
}

@Test
public void testHoodieIncrSource() throws IOException {
@Override
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
props = HoodieTableMetaClient.withPropertyBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(props)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}

private static Stream<Arguments> tableTypeParams() {
return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("tableTypeParams")
public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.enable(false).build())
.build();

SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
Expand Down

0 comments on commit 815dc19

Please sign in to comment.