Skip to content

Commit

Permalink
[HUDI-3403] Ensure keygen props are set for bootstrap (apache#6645)
Browse files Browse the repository at this point in the history
  • Loading branch information
codope authored and voonhous committed Oct 7, 2022
1 parent 102dfd4 commit 144fdb0
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,10 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) {
return this;
}

public PropertyBuilder set(String key, Object value) {
private void set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
}
return this;
}

public PropertyBuilder set(Map<String, Object> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.common.util.{CommitUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME, KEYGEN_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
Expand Down Expand Up @@ -468,7 +468,10 @@ object HoodieSparkSqlWriter {
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val keyGenProp =
if (StringUtils.nonEmpty(hoodieConfig.getString(KEYGEN_CLASS_NAME))) hoodieConfig.getString(KEYGEN_CLASS_NAME)
else hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenProp, parameters)
val populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(
HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())
Expand All @@ -492,6 +495,7 @@ object HoodieSparkSqlWriter {
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp)
.set(timestampKeyGeneratorConfigs)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.hudi

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
Expand All @@ -43,12 +40,15 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue,
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters

Expand Down Expand Up @@ -508,7 +508,7 @@ class TestHoodieSparkSqlWriter {
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
Expand Down Expand Up @@ -565,7 +565,7 @@ class TestHoodieSparkSqlWriter {
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getCanonicalName)
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = true, initBasePath = false)

val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
Expand Down Expand Up @@ -593,7 +593,7 @@ class TestHoodieSparkSqlWriter {
}
}

def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean) : Unit = {
def initializeMetaClientForBootstrap(fooTableParams : Map[String, String], tableType: String, addBootstrapPath : Boolean, initBasePath: Boolean) : Unit = {
// when metadata is enabled, directly instantiating write client using DataSourceUtils.createHoodieClient
// will hit a code which tries to instantiate meta client for data table. if table does not exist, it fails.
// hence doing an explicit instantiation here.
Expand All @@ -612,7 +612,9 @@ class TestHoodieSparkSqlWriter {
tableMetaClientBuilder
.setBootstrapBasePath(fooTableParams(HoodieBootstrapConfig.BASE_PATH.key))
}
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
if (initBasePath) {
tableMetaClientBuilder.initTable(sc.hadoopConfiguration, tempBasePath)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

import java.time.Instant
import java.util.Collections

import scala.collection.JavaConverters._

class TestDataSourceForBootstrap {
Expand Down Expand Up @@ -102,9 +102,12 @@ class TestDataSourceForBootstrap {
.save(srcPath)

// Perform bootstrap
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
)
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
Expand All @@ -123,10 +126,10 @@ class TestDataSourceForBootstrap {

updateDF.write
.format("hudi")
.options(commonOpts)
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Append)
.save(basePath)

Expand Down Expand Up @@ -163,8 +166,8 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
Some("datestr"),
Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"))
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") ++ Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true"),
classOf[SimpleKeyGenerator].getName)

// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
Expand Down Expand Up @@ -227,7 +230,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
Expand Down Expand Up @@ -302,7 +307,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
Expand Down Expand Up @@ -367,7 +374,9 @@ class TestDataSourceForBootstrap {

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"),
classOf[SimpleKeyGenerator].getName)

// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi")
Expand Down Expand Up @@ -497,18 +506,16 @@ class TestDataSourceForBootstrap {
}

def runMetadataBootstrapAndVerifyCommit(tableType: String,
partitionColumns: Option[String] = None,
extraOpts: Map[String, String] = Map.empty): String = {
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
.options(commonOpts)
.options(extraOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
.option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, bootstrapKeygenClass)
.mode(SaveMode.Overwrite)
.save(basePath)

Expand All @@ -528,7 +535,7 @@ class TestDataSourceForBootstrap {
.load(basePath)

assertEquals(numRecords, hoodieIncViewDF1.count())
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))

Expand All @@ -537,10 +544,10 @@ class TestDataSourceForBootstrap {
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, bootstrapCommitInstantTime)
.load(basePath);
.load(basePath)

assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
assertEquals(latestCommitInstantTime, countsPerCommit(0).get(0))

Expand Down

0 comments on commit 144fdb0

Please sign in to comment.