Skip to content

Commit

Permalink
[HUDI-4785] Fix partition discovery in bootstrap operation (apache#6673)
Browse files Browse the repository at this point in the history
Co-authored-by: Y Ethan Guo <[email protected]>
  • Loading branch information
codope and yihua authored Sep 16, 2022
1 parent 39f562f commit 488f58d
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.io.IOException;
import java.util.Properties;

import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;

/**
* Bootstrap specific configs.
*/
Expand All @@ -50,6 +53,15 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");

public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
.key("hoodie.bootstrap.mode.selector.regex.mode")
.defaultValue(METADATA_ONLY.name())
.sinceVersion("0.6.0")
.withValidValues(METADATA_ONLY.name(), FULL_RECORD.name())
.withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
+ "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
+ "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");

public static final ConfigProperty<String> MODE_SELECTOR_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.mode.selector")
.defaultValue(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
Expand Down Expand Up @@ -92,14 +104,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Matches each bootstrap dataset partition against this regex and applies the mode below to it.");

public static final ConfigProperty<String> PARTITION_SELECTOR_REGEX_MODE = ConfigProperty
.key("hoodie.bootstrap.mode.selector.regex.mode")
.defaultValue(BootstrapMode.METADATA_ONLY.name())
.sinceVersion("0.6.0")
.withDocumentation("Bootstrap mode to apply for partition paths, that match regex above. "
+ "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
+ "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");

public static final ConfigProperty<String> INDEX_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.index.class")
.defaultValue(HFileBootstrapIndex.class.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
Expand All @@ -47,7 +48,6 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
Expand All @@ -74,11 +74,15 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.client.bootstrap.BootstrapMode.FULL_RECORD;
import static org.apache.hudi.client.bootstrap.BootstrapMode.METADATA_ONLY;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;

Expand All @@ -93,19 +97,29 @@ public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
Option<Map<String, String>> extraMetadata) {
super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
.withBulkInsertParallelism(config.getBootstrapParallelism())
.build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP,
super(
context,
new HoodieWriteConfig.Builder()
.withProps(config.getProps())
.withAutoCommit(true)
.withWriteStatusClass(BootstrapWriteStatus.class)
.withBulkInsertParallelism(config.getBootstrapParallelism()).build(),
table,
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
WriteOperationType.BOOTSTRAP,
extraMetadata);
bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
}

private void validate() {
ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null,
checkArgument(config.getBootstrapSourceBasePath() != null,
"Ensure Bootstrap Source Path is set");
ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null,
checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
"FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode");
}
}

@Override
Expand All @@ -115,15 +129,15 @@ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
HoodieTableMetaClient metaClient = table.getMetaClient();
Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
ValidationUtils.checkArgument(!completedInstant.isPresent(),
checkArgument(!completedInstant.isPresent(),
"Active Timeline is expected to be empty for bootstrap to be performed. "
+ "If you want to re-bootstrap, please rollback bootstrap first !!");
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();

// First run metadata bootstrap which will auto commit
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(FULL_RECORD));

return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);
} catch (IOException ioe) {
Expand Down Expand Up @@ -307,12 +321,21 @@ private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndPr
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);

Map<BootstrapMode, List<String>> result = selector.select(folders);
Map<BootstrapMode, List<String>> result = new HashMap<>();
// for FULL_RECORD mode, original record along with metadata fields are needed
if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
if (!(selector instanceof FullRecordBootstrapModeSelector)) {
FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config);
result.putAll(fullRecordBootstrapModeSelector.select(folders));
}
} else {
result = selector.select(folders);
}
Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));

// Ensure all partitions are accounted for
ValidationUtils.checkArgument(partitionToFiles.keySet().equals(
checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));

return result.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ public SparkBootstrapDeltaCommitActionExecutor(HoodieSparkEngineContext context,

@Override
protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, extraMetadata);
return new SparkBulkInsertDeltaCommitActionExecutor(
(HoodieSparkEngineContext) context,
new HoodieWriteConfig.Builder()
.withProps(config.getProps())
.withSchema(bootstrapSchema).build(),
table,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD,
extraMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@

package org.apache.hudi.common.table;

import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -56,16 +45,25 @@
import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.util.Lazy;

import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -208,7 +206,7 @@ private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<
// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema, partitionFields))
.map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElse(schema);
}

Expand Down Expand Up @@ -650,18 +648,18 @@ private MessageType readSchemaFromBaseFile(String filePath) throws IOException {
}
}

static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields) {
public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
// In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns
// won't be persisted w/in the data files, and therefore we need to append such columns
// when schema is parsed from data files
//
// Here we append partition columns with {@code StringType} as the data type
if (partitionFields.length == 0) {
if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
return dataSchema;
}

boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
boolean hasPartitionColInSchema = Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema");
}
Expand All @@ -670,7 +668,7 @@ static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
for (String partitionField: partitionFields) {
for (String partitionField: partitionFields.get()) {
newFields.add(new Schema.Field(
partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.hudi.common.table;

import org.apache.avro.Schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIncompatibleSchemaException;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -36,17 +38,17 @@ public void testRecreateSchemaWhenDropPartitionColumns() {

// case2
String[] pts1 = new String[0];
Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1);
Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts1));
assertEquals(originSchema, s2);

// case3: partition_path is in originSchema
String[] pts2 = {"partition_path"};
Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2);
Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts2));
assertEquals(originSchema, s3);

// case4: user_partition is not in originSchema
String[] pts3 = {"user_partition"};
Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
assertNotEquals(originSchema, s4);
assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition")));
Schema.Field f = s4.getField("user_partition");
Expand All @@ -55,7 +57,7 @@ public void testRecreateSchemaWhenDropPartitionColumns() {
// case5: user_partition is in originSchema, but partition_path is in originSchema
String[] pts4 = {"user_partition", "partition_path"};
try {
TableSchemaResolver.appendPartitionColumns(originSchema, pts3);
TableSchemaResolver.appendPartitionColumns(originSchema, Option.of(pts3));
} catch (HoodieIncompatibleSchemaException e) {
assertTrue(e.getMessage().contains("Partial partition fields are still in the schema"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -147,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
if (fullSchema == null) {
logInfo("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields)
dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourc
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);

Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
// NOTE: "basePath" option is required for spark to discover the partition column
// More details at https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery
Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths);
try {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String structName = tableName + "_record";
Expand Down
Loading

0 comments on commit 488f58d

Please sign in to comment.