Skip to content

Commit

Permalink
Fixing repair deprecated partition tool
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Sep 26, 2022
1 parent 6377b77 commit 3257311
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -67,6 +68,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -456,8 +458,15 @@ public static int renamePartition(JavaSparkContext jsc, String basePath, String
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
Map<String, String> propsMap = getPropsForRewrite(metaClient);
rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
// after re-writing, we can safely delete older data.
// after re-writing, we can safely delete older partition.
deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
// also, we can physically delete the old partition.
FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf());
try {
fs.delete(new Path(basePath, oldPartition), true);
} catch (IOException e) {
LOG.warn("Failed to delete older partition " + basePath);
}
}
return 0;
}
Expand All @@ -473,21 +482,24 @@ private static void deleteOlderPartition(String basePath, String oldPartition, D
}

private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
StructType structType = recordsToRewrite.schema();
int partitionIndex = structType.fieldIndex(partitionFieldProp);

recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType()))
.write()
.options(propsMap)
.option("hoodie.datasource.write.operation", "insert")
.option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value())
.format("hudi")
.mode("Append")
.save(basePath);
}

private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
return sqlContext.read()
.option("hoodie.datasource.read.extract.partition.values.from.path", "false")
//.option("hoodie.datasource.read.extract.partition.values.from.path", "false")
.format("hudi")
.load(basePath)
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
.load(basePath + "/" + oldPartition)
.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
Expand Down

0 comments on commit 3257311

Please sign in to comment.