Skip to content

Commit

Permalink
Merge branch 'master' into HUDI-3836
Browse files Browse the repository at this point in the history
  • Loading branch information
xicm committed Jul 5, 2022
2 parents afe418c + fbda4ad commit aa7c571
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.TypedProperties;
Expand All @@ -37,17 +31,23 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncConfigHolder;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.table.HoodieSparkTable;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -56,6 +56,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import scala.Tuple2;

/**
* A tool with spark-submit to drop Hudi table partitions.
*
Expand Down Expand Up @@ -352,11 +354,13 @@ private HiveSyncConfig buildHiveSyncProps() {
props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode);
props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException);
props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
props.put(HiveSyncConfig.META_SYNC_BASE_PATH, cfg.basePath);
props.put(HiveSyncConfig.META_SYNC_BASE_FILE_FORMAT, "PARQUET");
props.put(DataSourceWriteOptions.PARTITIONS_TO_DELETE().key(), cfg.partitions);
props.put(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), cfg.partitionValueExtractorClass);
props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), cfg.hivePartitionsField);

return DataSourceUtils.buildHiveSyncConfig(props, cfg.basePath, "PARQUET");
return new HiveSyncConfig(props, new Configuration());
}

private void verifyHiveConfigs() {
Expand All @@ -366,9 +370,9 @@ private void verifyHiveConfigs() {

private void syncHive(HiveSyncConfig hiveSyncConfig) {
LOG.info("Syncing target hoodie table with hive table("
+ hiveSyncConfig.tableName
+ hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME)
+ "). Hive metastore URL :"
+ hiveSyncConfig.jdbcUrl
+ hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_URL)
+ ", basePath :" + cfg.basePath);
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
Expand All @@ -378,7 +382,7 @@ private void syncHive(HiveSyncConfig hiveSyncConfig) {
}
hiveConf.addResource(fs.getConf());
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig, hiveConf, fs);
HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig.getProps(), hiveConf);
hiveSyncTool.syncHoodieTable();
}

Expand Down

0 comments on commit aa7c571

Please sign in to comment.