From 9f6315eaa10595f351302962488dc1ad13d41989 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Sun, 26 Jun 2022 09:12:13 +0800 Subject: [PATCH] [HUDI-3504] Support bootstrap command based on Call Produce Command --- .../hudi/cli/BootstrapExecutorUtils.java | 254 ++++++++++++++++++ .../org/apache/hudi/cli/SchemaProvider.java | 58 ++++ .../command/procedures/HoodieProcedures.scala | 3 + .../procedures/RunBootstrapProcedure.scala | 144 ++++++++++ .../ShowBootstrapMappingProcedure.scala | 117 ++++++++ .../ShowBootstrapPartitionsProcedure.scala | 75 ++++++ .../procedure/TestBootstrapProcedure.scala | 89 ++++++ 7 files changed, 740 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java create mode 100644 hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java new file mode 100644 index 0000000000000..0cae022967e6b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.sync.common.HoodieSyncConfig; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; + +import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; + +/** + * Performs bootstrap from a non-hudi source. + */ +public class BootstrapExecutorUtils implements Serializable { + + private static final Logger LOG = LogManager.getLogger(BootstrapExecutorUtils.class); + + /** + * Config. + */ + private final Config cfg; + + /** + * Spark context. + */ + private final transient JavaSparkContext jssc; + + /** + * Bag of properties with source, hoodie client, key generator etc. + */ + private final TypedProperties props; + + /** + * Hadoop Configuration. + */ + private final Configuration configuration; + + /** + * Bootstrap Configuration. + */ + private final HoodieWriteConfig bootstrapConfig; + + /** + * FileSystem instance. + */ + private final transient FileSystem fs; + + private final String bootstrapBasePath; + + public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY; + + /** + * Bootstrap Executor. + * + * @param cfg DeltaStreamer Config + * @param jssc Java Spark Context + * @param fs File System + * @param properties Bootstrap Writer Properties + * @throws IOException + */ + public BootstrapExecutorUtils(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, + TypedProperties properties) throws IOException { + this.cfg = cfg; + this.jssc = jssc; + this.fs = fs; + this.configuration = conf; + this.props = properties; + + ValidationUtils.checkArgument(properties.containsKey(HoodieTableConfig.BOOTSTRAP_BASE_PATH + .key()), + HoodieTableConfig.BOOTSTRAP_BASE_PATH.key() + " must be specified."); + this.bootstrapBasePath = properties.getString(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key()); + + // Add more defaults if full bootstrap requested + this.props.putIfAbsent(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), + DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue()); + /** + * Schema provider that supplies the command for reading the input and writing out the target table. + */ + SchemaProvider schemaProvider = createSchemaProvider(cfg.schemaProviderClass, props, jssc); + HoodieWriteConfig.Builder builder = + HoodieWriteConfig.newBuilder().withPath(cfg.basePath) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()) + .forTable(cfg.tableName) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withAutoCommit(true) + .withProps(props); + + if (null != schemaProvider && null != schemaProvider.getTargetSchema()) { + builder = builder.withSchema(schemaProvider.getTargetSchema().toString()); + } + this.bootstrapConfig = builder.build(); + LOG.info("Created bootstrap executor with configs : " + bootstrapConfig.getProps()); + } + + public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, + JavaSparkContext jssc) throws IOException { + try { + return StringUtils.isNullOrEmpty(schemaProviderClass) ? null + : (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg, jssc); + } catch (Throwable e) { + throw new IOException("Could not load schema provider class " + schemaProviderClass, e); + } + } + + /** + * Executes Bootstrap. + */ + public void execute() throws IOException { + initializeTable(); + + try (SparkRDDWriteClient bootstrapClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jssc), bootstrapConfig)) { + HashMap checkpointCommitMetadata = new HashMap<>(); + checkpointCommitMetadata.put(CHECKPOINT_KEY, Config.checkpoint); + bootstrapClient.bootstrap(Option.of(checkpointCommitMetadata)); + syncHive(); + } + } + + /** + * Sync to Hive. + */ + private void syncHive() { + if (cfg.enableHiveSync) { + TypedProperties metaProps = new TypedProperties(); + metaProps.putAll(props); + metaProps.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), cfg.basePath); + metaProps.put(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key(), cfg.baseFileFormat); + if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key(), HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue())) { + metaProps.put(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), + props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key()))); + } + + new HiveSyncTool(metaProps, configuration, fs).syncHoodieTable(); + } + } + + private void initializeTable() throws IOException { + Path basePath = new Path(cfg.basePath); + if (fs.exists(basePath)) { + if (cfg.bootstrapOverwrite) { + LOG.warn("Target base path already exists, overwrite it"); + fs.delete(basePath, true); + } else { + throw new HoodieException("target base path already exists at " + cfg.basePath + + ". Cannot bootstrap data on top of an existing table"); + } + } + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(props) + .setTableType(cfg.tableType) + .setTableName(cfg.tableName) + .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) + .setPayloadClassName(cfg.payloadClass) + .setBaseFileFormat(cfg.baseFileFormat) + .setBootstrapIndexClass(cfg.bootstrapIndexClass) + .setBootstrapBasePath(bootstrapBasePath) + .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.basePath); + } + + public static class Config { + private String tableName; + private String tableType; + + private String basePath; + + private String baseFileFormat; + private String bootstrapIndexClass; + private String schemaProviderClass; + private String payloadClass; + private Boolean enableHiveSync; + + private Boolean bootstrapOverwrite; + + public static String checkpoint = null; + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setTableType(String tableType) { + this.tableType = tableType; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + public void setBaseFileFormat(String baseFileFormat) { + this.baseFileFormat = baseFileFormat; + } + + public void setBootstrapIndexClass(String bootstrapIndexClass) { + this.bootstrapIndexClass = bootstrapIndexClass; + } + + public void setSchemaProviderClass(String schemaProviderClass) { + this.schemaProviderClass = schemaProviderClass; + } + + public void setPayloadClass(String payloadClass) { + this.payloadClass = payloadClass; + } + + public void setEnableHiveSync(Boolean enableHiveSync) { + this.enableHiveSync = enableHiveSync; + } + + public void setBootstrapOverwrite(Boolean bootstrapOverwrite) { + this.bootstrapOverwrite = bootstrapOverwrite; + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java new file mode 100644 index 0000000000000..de6770bf3038b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/SchemaProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli; + +import org.apache.avro.Schema; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.PublicAPIMethod; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; + +/** + * Class to provide schema for reading data and also writing into a Hoodie table, + * used by deltastreamer (runs over Spark). + */ +@PublicAPIClass(maturity = ApiMaturityLevel.STABLE) +public abstract class SchemaProvider implements Serializable { + + protected TypedProperties config; + + protected JavaSparkContext jssc; + + public SchemaProvider(TypedProperties props) { + this(props, null); + } + + protected SchemaProvider(TypedProperties props, JavaSparkContext jssc) { + this.config = props; + this.jssc = jssc; + } + + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + public abstract Schema getSourceSchema(); + + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + public Schema getTargetSchema() { + // by default, use source schema as target for hoodie table as well + return getSourceSchema(); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 33ca211b03bd6..e3f05389a9e0a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -55,6 +55,9 @@ object HoodieProcedures { mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder) mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder) mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder) + mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder) + mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder) + mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala new file mode 100644 index 0000000000000..8e6fd36a8f7f5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.cli.BootstrapExecutorUtils +import org.apache.hudi.cli.HDFSParquetImporterUtils.{buildProperties, readConfig} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.config.HoodieBootstrapConfig +import org.apache.hudi.keygen.constant.KeyGeneratorType +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.Locale +import java.util.function.Supplier + +class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "tableType", DataTypes.StringType, None), + ProcedureParameter.required(2, "bootstrapPath", DataTypes.StringType, None), + ProcedureParameter.required(3, "basePath", DataTypes.StringType, None), + ProcedureParameter.required(4, "rowKeyField", DataTypes.StringType, None), + ProcedureParameter.optional(5, "baseFileFormat", DataTypes.StringType, "PARQUET"), + ProcedureParameter.optional(6, "partitionPathField", DataTypes.StringType, ""), + ProcedureParameter.optional(7, "bootstrapIndexClass", DataTypes.StringType, "org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"), + ProcedureParameter.optional(8, "selectorClass", DataTypes.StringType, "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector"), + ProcedureParameter.optional(9, "keyGeneratorClass", DataTypes.StringType, "org.apache.hudi.keygen.SimpleKeyGenerator"), + ProcedureParameter.optional(10, "fullBootstrapInputProvider", DataTypes.StringType, "org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider"), + ProcedureParameter.optional(11, "schemaProviderClass", DataTypes.StringType, ""), + ProcedureParameter.optional(12, "payloadClass", DataTypes.StringType, "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"), + ProcedureParameter.optional(13, "parallelism", DataTypes.IntegerType, 1500), + ProcedureParameter.optional(14, "enableHiveSync", DataTypes.BooleanType, false), + ProcedureParameter.optional(15, "propsFilePath", DataTypes.StringType, ""), + ProcedureParameter.optional(16, "bootstrapOverwrite", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("status", DataTypes.IntegerType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tableType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val bootstrapPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val basePath = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String] + val rowKeyField = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String] + val baseFileFormat = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String] + val partitionPathField = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String] + val bootstrapIndexClass = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String] + val selectorClass = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String] + val keyGeneratorClass = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[String] + val fullBootstrapInputProvider = getArgValueOrDefault(args, PARAMETERS(10)).get.asInstanceOf[String] + val schemaProviderClass = getArgValueOrDefault(args, PARAMETERS(11)).get.asInstanceOf[String] + val payloadClass = getArgValueOrDefault(args, PARAMETERS(12)).get.asInstanceOf[String] + val parallelism = getArgValueOrDefault(args, PARAMETERS(13)).get.asInstanceOf[Int] + val enableHiveSync = getArgValueOrDefault(args, PARAMETERS(14)).get.asInstanceOf[Boolean] + val propsFilePath = getArgValueOrDefault(args, PARAMETERS(15)).get.asInstanceOf[String] + val bootstrapOverwrite = getArgValueOrDefault(args, PARAMETERS(16)).get.asInstanceOf[Boolean] + + val configs: util.List[String] = new util.ArrayList[String] + + val properties: TypedProperties = if (propsFilePath == null || propsFilePath.isEmpty) buildProperties(configs) + else readConfig(jsc.hadoopConfiguration, new Path(propsFilePath), configs).getProps(true) + + properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key, bootstrapPath) + + if (!StringUtils.isNullOrEmpty(keyGeneratorClass) && KeyGeneratorType.getNames.contains(keyGeneratorClass.toUpperCase(Locale.ROOT))) { + properties.setProperty(HoodieBootstrapConfig.KEYGEN_TYPE.key, keyGeneratorClass.toUpperCase(Locale.ROOT)) + } + else { + properties.setProperty(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, keyGeneratorClass) + } + + properties.setProperty(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME.key, fullBootstrapInputProvider) + properties.setProperty(HoodieBootstrapConfig.PARALLELISM_VALUE.key, parallelism.toString) + properties.setProperty(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, selectorClass) + properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD.key, rowKeyField) + properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionPathField) + + val fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration) + + val cfg = new BootstrapExecutorUtils.Config() + cfg.setTableName(tableName.get.asInstanceOf[String]) + cfg.setTableType(tableType) + cfg.setBasePath(basePath) + cfg.setBaseFileFormat(baseFileFormat) + cfg.setBootstrapIndexClass(bootstrapIndexClass) + cfg.setSchemaProviderClass(schemaProviderClass) + cfg.setPayloadClass(payloadClass) + cfg.setEnableHiveSync(enableHiveSync) + cfg.setBootstrapOverwrite(bootstrapOverwrite) + + try { + new BootstrapExecutorUtils(cfg, jsc, fs, jsc.hadoopConfiguration, properties).execute() + } catch { + case e: Exception => + logWarning(s"Run bootstrap failed due to", e) + Seq(Row(-1)) + } + Seq(Row(0)) + } + + override def build = new RunBootstrapProcedure() +} + +object RunBootstrapProcedure { + val NAME = "run_bootstrap" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RunBootstrapProcedure + } +} + + + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala new file mode 100644 index 0000000000000..dab3891686756 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.google.common.collect.Lists +import org.apache.hudi.common.bootstrap.index.BootstrapIndex +import org.apache.hudi.common.model.{BootstrapFileMapping, HoodieFileGroupId} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +class ShowBootstrapMappingProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "partitionPath", DataTypes.StringType, ""), + ProcedureParameter.optional(2, "fileIds", DataTypes.StringType, ""), + ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.optional(4, "sortBy", DataTypes.StringType, "partition"), + ProcedureParameter.optional(5, "desc", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("fileid", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("source_basepath", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("source_partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("source_file", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val partitionPath = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val fileIds = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Int] + val sortBy = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String] + val desc = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Boolean] + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + if (partitionPath.isEmpty && fileIds.nonEmpty) throw new IllegalStateException("PartitionPath is mandatory when passing fileIds.") + + val indexReader = createBootstrapIndexReader(metaClient) + val indexedPartitions = indexReader.getIndexedPartitionPaths + + if (partitionPath.nonEmpty && !indexedPartitions.contains(partitionPath)) new HoodieException(partitionPath + " is not an valid indexed partition") + + val mappingList: util.ArrayList[BootstrapFileMapping] = new util.ArrayList[BootstrapFileMapping] + if (fileIds.nonEmpty) { + val fileGroupIds = fileIds.split(",").toList.map((fileId: String) => new HoodieFileGroupId(partitionPath, fileId)).asJava + mappingList.addAll(indexReader.getSourceFileMappingForFileIds(fileGroupIds).values) + } else if (partitionPath.nonEmpty) mappingList.addAll(indexReader.getSourceFileMappingForPartition(partitionPath)) + else { + for (part <- indexedPartitions) { + mappingList.addAll(indexReader.getSourceFileMappingForPartition(part)) + } + } + + val rows: java.util.List[Row] = mappingList + .map(mapping => Row(mapping.getPartitionPath, mapping.getFileId, mapping.getBootstrapBasePath, + mapping.getBootstrapPartitionPath, mapping.getBootstrapFileStatus.getPath.getUri)).toList + + val df = spark.createDataFrame(rows, OUTPUT_TYPE) + + if (desc) { + df.orderBy(df(sortBy).desc).limit(limit).collect() + } else { + df.orderBy(df(sortBy).asc).limit(limit).collect() + } + } + + private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = { + val index = BootstrapIndex.getBootstrapIndex(metaClient) + if (!index.useIndex) throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info") + index.createReader + } + + override def build: Procedure = new ShowBootstrapMappingProcedure() +} + +object ShowBootstrapMappingProcedure { + val NAME = "show_bootstrap_mapping" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowBootstrapMappingProcedure + } +} + + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala new file mode 100644 index 0000000000000..b3bebd7f22416 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.bootstrap.index.BootstrapIndex +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +class ShowBootstrapPartitionsProcedure extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("indexed_partitions", DataTypes.StringType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + + val basePath: String = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + + val indexReader = createBootstrapIndexReader(metaClient) + val indexedPartitions = indexReader.getIndexedPartitionPaths + + indexedPartitions.stream().toArray.map(r => Row(r)).toList + } + + private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = { + val index = BootstrapIndex.getBootstrapIndex(metaClient) + if (!index.useIndex) throw new HoodieException("This is not a bootstrapped Hudi table. Don't have any index info") + index.createReader + } + + override def build = new ShowBootstrapPartitionsProcedure() +} + +object ShowBootstrapPartitionsProcedure { + val NAME = "show_bootstrap_partitions" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowBootstrapPartitionsProcedure + } +} + + + + + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala new file mode 100644 index 0000000000000..931d3130138d0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestBootstrapProcedure.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.functional.TestBootstrap +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase +import org.apache.spark.sql.{Dataset, Row} + +import java.time.Instant +import java.util + +class TestBootstrapProcedure extends HoodieSparkSqlTestBase { + + test("Test Call run_bootstrap Procedure") { + withTempDir { tmp => + val NUM_OF_RECORDS = 100 + val PARTITION_FIELD = "datestr" + val RECORD_KEY_FIELD = "_row_key" + + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}" + + val srcName: String = "source" + val sourcePath = basePath + Path.SEPARATOR + srcName + val tablePath = basePath + Path.SEPARATOR + tableName + val jsc = new JavaSparkContext(spark.sparkContext) + + // generate test data + val partitions = util.Arrays.asList("2018", "2019", "2020") + val timestamp: Long = Instant.now.toEpochMilli + for (i <- 0 until partitions.size) { + val df: Dataset[Row] = TestBootstrap.generateTestRawTripDataset(timestamp, i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, spark.sqlContext) + df.write.parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + partitions.get(i)) + } + + // run bootstrap + checkAnswer( + s"""call run_bootstrap( + |table => '$tableName', + |basePath => '$tablePath', + |tableType => '${HoodieTableType.COPY_ON_WRITE.name}', + |bootstrapPath => '$sourcePath', + |rowKeyField => '$RECORD_KEY_FIELD', + |partitionPathField => '$PARTITION_FIELD', + |bootstrapOverwrite => true)""".stripMargin) { + Seq(0) + } + + // create table + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' + |tblproperties(primaryKey = '$RECORD_KEY_FIELD') + |""".stripMargin) + + // show bootstrap's index partitions + var result = spark.sql(s"""call show_bootstrap_partitions(table => '$tableName')""".stripMargin).collect() + assertResult(3) { + result.length + } + + // show bootstrap's index mapping + result = spark.sql( + s"""call show_bootstrap_mapping(table => '$tableName')""".stripMargin).collect() + assertResult(3) { + result.length + } + } + } +}