Skip to content

Commit

Permalink
[HUDI-4474] Fix inferring props for meta sync (apache#6310)
Browse files Browse the repository at this point in the history
- HoodieConfig#setDefaults looks up declared fields, so 
  should pass static class for reflection, otherwise, subclasses 
  of HoodieSyncConfig won't set defaults properly.
- Pass all write client configs of deltastreamer to meta sync
- Make org.apache.hudi.hive.MultiPartKeysValueExtractor 
  default for deltastreamer, to align with SQL and flink
  • Loading branch information
xushiyan authored and fengjian committed Apr 5, 2023
1 parent d0c3928 commit 2f12731
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 6 deletions.
5 changes: 3 additions & 2 deletions docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public class ITTestHoodieDemo extends ITTestBase {
private HoodieFileFormat baseFileFormat;

private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
" --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ "
+ " --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.password=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.partition_fields=%s "
Expand Down Expand Up @@ -215,6 +216,7 @@ private void ingestFirstBatchAndHiveSync() throws Exception {
+ " --user hive"
+ " --pass hive"
+ " --jdbc-url jdbc:hive2://hiveserver:10000"
+ " --partition-value-extractor org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor"
+ " --partitioned-by dt",
("spark-submit"
+ " --conf \'spark.executor.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console\'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public HiveSyncTool(Properties props, Configuration hadoopConf) {
HiveSyncConfig config = new HiveSyncConfig(props, hadoopConf);
this.config = config;
this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME);
this.tableName = config.getString(META_SYNC_TABLE_NAME);
this.tableName = config.getStringOrDefault(META_SYNC_TABLE_NAME);
initSyncClient(config);
initTableNameVars(config);
}
Expand All @@ -109,6 +109,7 @@ protected void initSyncClient(HiveSyncConfig config) {
}

private void initTableNameVars(HiveSyncConfig config) {
final String tableName = config.getStringOrDefault(META_SYNC_TABLE_NAME);
if (syncClient != null) {
switch (syncClient.getTableType()) {
case COPY_ON_WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor;

Expand Down Expand Up @@ -92,6 +93,7 @@
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -138,6 +140,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveSyncProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
hiveSyncProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
hiveSyncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName());
hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");

hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, hiveTestService.getHiveConf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
Expand All @@ -46,6 +50,8 @@
*/
public class HoodieSyncConfig extends HoodieConfig {

private static final Logger LOG = LogManager.getLogger(HoodieSyncConfig.class);

public static final ConfigProperty<String> META_SYNC_BASE_PATH = ConfigProperty
.key("hoodie.datasource.meta.sync.base.path")
.defaultValue("")
Expand Down Expand Up @@ -84,7 +90,7 @@ public class HoodieSyncConfig extends HoodieConfig {

public static final ConfigProperty<String> META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty
.key("hoodie.datasource.hive_sync.partition_extractor_class")
.defaultValue("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor")
.defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor")
.withInferFunction(cfg -> {
if (StringUtils.nonEmpty(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))) {
int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length;
Expand Down Expand Up @@ -138,7 +144,12 @@ public HoodieSyncConfig(Properties props) {

public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
super(props);
setDefaults(getClass().getName());
LOG.debug("Passed in properties:\n" + props.entrySet()
.stream()
.sorted(Comparator.comparing(e -> e.getKey().toString()))
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("\n")));
setDefaults(HoodieSyncConfig.class.getName());
this.hadoopConf = hadoopConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ public void runMetaSync() {

TypedProperties metaProps = new TypedProperties();
metaProps.putAll(props);
metaProps.putAll(writeClient.getConfig().getProps());
if (props.getBoolean(HIVE_SYNC_BUCKET_SYNC.key(), HIVE_SYNC_BUCKET_SYNC.defaultValue())) {
metaProps.put(HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
Expand Down

0 comments on commit 2f12731

Please sign in to comment.