Skip to content

Commit

Permalink
Allow users to set hoodie configs figs for Compactor, Cleaner and HDF…
Browse files Browse the repository at this point in the history
…SParquetImporter utility scripts
  • Loading branch information
bvaradar authored and n3nash committed May 24, 2019
1 parent 145034c commit 2fe526d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
Expand All @@ -57,14 +59,22 @@
* Loads data from Parquet Sources
*/
public class HDFSParquetImporter implements Serializable {
private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);

public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg;
private transient FileSystem fs;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private TypedProperties props;

public HDFSParquetImporter(Config cfg) throws IOException {
this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -116,7 +126,7 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);

HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism, Optional.empty());
cfg.parallelism, Optional.empty(), props);

JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
// Get instant time.
Expand Down Expand Up @@ -247,6 +257,12 @@ public static class Config implements Serializable {
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
public int retry = 0;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for importing")
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -35,7 +34,7 @@

public class HoodieCleaner {

private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class);

/**
* Config for Cleaner
Expand All @@ -55,14 +54,14 @@ public class HoodieCleaner {
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
TypedProperties props;
private TypedProperties props;

public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());

this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
}

Expand All @@ -86,8 +85,7 @@ public static class Config implements Serializable {

@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for cleaning")
public String propsFilePath =
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
public String propsFilePath = null;

@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TypedProperties;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -37,9 +41,12 @@ public class HoodieCompactor {
private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;

public HoodieCompactor(Config cfg) {
this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
}

public static class Config implements Serializable {
Expand Down Expand Up @@ -70,6 +77,14 @@ public static class Config implements Serializable {
public String strategyClassName = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for compacting")
public String propsFilePath = null;

@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
public List<String> configs = new ArrayList<>();
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -108,15 +123,15 @@ private int doCompact(JavaSparkContext jsc) throws Exception {
//Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.empty());
Optional.empty(), props);
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}

private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema.
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Optional.of(cfg.strategyClassName));
Optional.of(cfg.strategyClassName), props);
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.uber.hoodie.utilities;

import com.google.common.base.Preconditions;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
Expand Down Expand Up @@ -100,6 +101,16 @@ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath,
}
}

public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = new TypedProperties();
props.stream().forEach(x -> {
String[] kv = x.split("=");
Preconditions.checkArgument(kv.length == 2);
properties.setProperty(kv[0], kv[1]);
});
return properties;
}

/**
* Parse Schema from file
*
Expand Down Expand Up @@ -163,7 +174,8 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas
* @param parallelism Parallelism
*/
public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
String schemaStr, int parallelism, Optional<String> compactionStrategyClass) throws Exception {
String schemaStr, int parallelism, Optional<String> compactionStrategyClass, TypedProperties properties)
throws Exception {
HoodieCompactionConfig compactionConfig =
compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy))
Expand All @@ -173,6 +185,7 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String
.combineInput(true, true)
.withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties)
.build();
return new HoodieWriteClient(jsc, config);
}
Expand Down

0 comments on commit 2fe526d

Please sign in to comment.