Skip to content

Commit

Permalink
Hudi 3730 u tsfixand add (#11)
Browse files Browse the repository at this point in the history
* [HUDI-3730] fix TestHiveSyncTool

* [HUDI-3730] fix TestHiveSyncTool : getHiveConf, getDefaultValue

* [HUDI-3730] fix TestHiveSyncTool : Use TypedProperties in internal method, getDefaultValue

Co-authored-by: jian.feng <[email protected]>
  • Loading branch information
2 people authored and xushiyan committed Jun 30, 2022
1 parent 23e4353 commit 041f402
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void setPropertyIfNonNull(String key, String value) {
}
}

public void setStringListIfNonNull(String key, List<String> value, String delimiter) {
if (value != null) {
setPropertyIfNonNull(key, String.join(delimiter, value));
}
}

@Override
public String getProperty(String key) {
Object oval = super.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.sync.adb;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.hive.HiveSyncConfig;

Expand Down Expand Up @@ -176,31 +177,31 @@ public static class AdbSyncConfigParams {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

public Properties toProps() {
final Properties props = hiveSyncConfigParams.toProps();
props.setProperty(META_SYNC_DATABASE_NAME.key(), hiveSyncConfigParams.hoodieSyncConfigParams.databaseName);
props.setProperty(META_SYNC_TABLE_NAME.key(), hiveSyncConfigParams.hoodieSyncConfigParams.tableName);
props.setProperty(ADB_SYNC_USER.key(), hiveSyncConfigParams.hiveUser);
props.setProperty(ADB_SYNC_PASS.key(), hiveSyncConfigParams.hivePass);
props.setProperty(ADB_SYNC_JDBC_URL.key(), hiveSyncConfigParams.jdbcUrl);
props.setProperty(META_SYNC_BASE_PATH.key(), hiveSyncConfigParams.hoodieSyncConfigParams.basePath);
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), String.join(",", hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields));
props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), hiveSyncConfigParams.hoodieSyncConfigParams.partitionValueExtractorClass);
props.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.assumeDatePartitioning));
props.setProperty(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(hiveSyncConfigParams.skipROSuffix));
props.setProperty(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(skipRTSync));
props.setProperty(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(useHiveStylePartitioning));
props.setProperty(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.useFileListingFromMetadata));
props.setProperty(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(supportTimestamp));
props.setProperty(ADB_SYNC_TABLE_PROPERTIES.key(), tableProperties);
props.setProperty(ADB_SYNC_SERDE_PROPERTIES.key(), serdeProperties);
props.setProperty(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(syncAsSparkDataSourceTable));
props.setProperty(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(sparkSchemaLengthThreshold));
props.setProperty(META_SYNC_SPARK_VERSION.key(), hiveSyncConfigParams.hoodieSyncConfigParams.sparkVersion);
props.setProperty(ADB_SYNC_DB_LOCATION.key(), dbLocation);
props.setProperty(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(autoCreateDatabase));
props.setProperty(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(skipLastCommitTimeSync));
props.setProperty(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(dropTableBeforeCreation));
public TypedProperties toProps() {
final TypedProperties props = hiveSyncConfigParams.toProps();
props.setPropertyIfNonNull(META_SYNC_DATABASE_NAME.key(), hiveSyncConfigParams.hoodieSyncConfigParams.databaseName);
props.setPropertyIfNonNull(META_SYNC_TABLE_NAME.key(), hiveSyncConfigParams.hoodieSyncConfigParams.tableName);
props.setPropertyIfNonNull(ADB_SYNC_USER.key(), hiveSyncConfigParams.hiveUser);
props.setPropertyIfNonNull(ADB_SYNC_PASS.key(), hiveSyncConfigParams.hivePass);
props.setPropertyIfNonNull(ADB_SYNC_JDBC_URL.key(), hiveSyncConfigParams.jdbcUrl);
props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), hiveSyncConfigParams.hoodieSyncConfigParams.basePath);
props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", hiveSyncConfigParams.hoodieSyncConfigParams.partitionFields));
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), hiveSyncConfigParams.hoodieSyncConfigParams.partitionValueExtractorClass);
props.setPropertyIfNonNull(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.assumeDatePartitioning));
props.setPropertyIfNonNull(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(hiveSyncConfigParams.skipROSuffix));
props.setPropertyIfNonNull(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(skipRTSync));
props.setPropertyIfNonNull(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(useHiveStylePartitioning));
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(hiveSyncConfigParams.hoodieSyncConfigParams.useFileListingFromMetadata));
props.setPropertyIfNonNull(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(supportTimestamp));
props.setPropertyIfNonNull(ADB_SYNC_TABLE_PROPERTIES.key(), tableProperties);
props.setPropertyIfNonNull(ADB_SYNC_SERDE_PROPERTIES.key(), serdeProperties);
props.setPropertyIfNonNull(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(syncAsSparkDataSourceTable));
props.setPropertyIfNonNull(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(sparkSchemaLengthThreshold));
props.setPropertyIfNonNull(META_SYNC_SPARK_VERSION.key(), hiveSyncConfigParams.hoodieSyncConfigParams.sparkVersion);
props.setPropertyIfNonNull(ADB_SYNC_DB_LOCATION.key(), dbLocation);
props.setPropertyIfNonNull(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(autoCreateDatabase));
props.setPropertyIfNonNull(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(skipLastCommitTimeSync));
props.setPropertyIfNonNull(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(dropTableBeforeCreation));
return props;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.hive;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.sync.common.HoodieSyncConfig;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -233,8 +234,8 @@ public static class HiveSyncConfigParams {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

public Properties toProps() {
final Properties props = hoodieSyncConfigParams.toProps();
public TypedProperties toProps() {
final TypedProperties props = hoodieSyncConfigParams.toProps();
// TODO add mappings here
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
Map<String, String> serdeProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES));
if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
config.getString(META_SYNC_SPARK_VERSION), config.getInt(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema);
config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema);
Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH));
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {

public JDBCExecutor(HiveSyncConfig config) {
super(config);
Objects.requireNonNull(config.getString(HIVE_URL), "--jdbc-url option is required for jdbc sync mode");
Objects.requireNonNull(config.getString(HIVE_USER), "--user option is required for jdbc sync mode");
Objects.requireNonNull(config.getString(HIVE_PASS), "--pass option is required for jdbc sync mode");
Objects.requireNonNull(config.getStringOrDefault(HIVE_URL), "--jdbc-url option is required for jdbc sync mode");
Objects.requireNonNull(config.getStringOrDefault(HIVE_USER), "--user option is required for jdbc sync mode");
Objects.requireNonNull(config.getStringOrDefault(HIVE_PASS), "--pass option is required for jdbc sync mode");
this.config = config;
createHiveConnection(config.getString(HIVE_URL), config.getString(HIVE_USER), config.getString(HIVE_PASS));
createHiveConnection(config.getStringOrDefault(HIVE_URL), config.getStringOrDefault(HIVE_USER), config.getStringOrDefault(HIVE_PASS));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.hive.replication;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -46,9 +47,9 @@ public static class GlobalHiveSyncConfigParams {
@Parameter(names = {"--replicated-timestamp"}, description = "Add globally replicated timestamp to enable consistent reads across clusters")
public String globallyReplicatedTimeStamp;

public Properties toProps() {
final Properties props = hiveSyncConfigParams.toProps();
props.setProperty(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key(), globallyReplicatedTimeStamp);
public TypedProperties toProps() {
final TypedProperties props = hiveSyncConfigParams.toProps();
props.setPropertyIfNonNull(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key(), globallyReplicatedTimeStamp);
return props;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public void load() throws IOException {
Properties mkGlobalHiveSyncProps(boolean forRemote) {
Properties props = new Properties(loadedProps);
props.putAll(globalHiveSyncConfigParams.toProps());
String basePath = forRemote ? loadedProps.getProperty(REMOTE_BASE_PATH)
: loadedProps.getProperty(LOCAL_BASE_PATH, loadedProps.getProperty(META_SYNC_BASE_PATH.key()));
String basePath = forRemote ? loadedProps.getProperty(REMOTE_BASE_PATH, "")
: loadedProps.getProperty(LOCAL_BASE_PATH, loadedProps.getProperty(META_SYNC_BASE_PATH.key(), ""));
props.setProperty(META_SYNC_BASE_PATH.key(), basePath);
String jdbcUrl = forRemote ? loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, loadedProps.getProperty(HIVE_URL.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ public void testUpdateTableComments(String syncMode) throws Exception {
List<FieldSchema> fieldSchemas = hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
int commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (fieldSchema.getComment().isPresent()) {
if (fieldSchema.getComment().isPresent() && !StringUtils.isNullOrEmpty(fieldSchema.getComment().get())) {
commentCnt++;
}
}
Expand All @@ -550,7 +550,7 @@ public void testSyncWithCommentedSchema(String syncMode) throws Exception {
List<FieldSchema> fieldSchemas = hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
int commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (fieldSchema.getComment().isPresent()) {
if (fieldSchema.getComment().isPresent() && !StringUtils.isNullOrEmpty(fieldSchema.getComment().get())) {
commentCnt++;
}
}
Expand All @@ -562,7 +562,7 @@ public void testSyncWithCommentedSchema(String syncMode) throws Exception {
fieldSchemas = hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
commentCnt = 0;
for (FieldSchema fieldSchema : fieldSchemas) {
if (fieldSchema.getComment().isPresent()) {
if (fieldSchema.getComment().isPresent() && !StringUtils.isNullOrEmpty(fieldSchema.getComment().get())) {
commentCnt++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -203,19 +204,19 @@ public static class HoodieSyncConfigParams {
@Parameter(names = {"--help", "-h"}, help = true)
public boolean help = false;

public Properties toProps() {
final Properties props = new Properties();
props.setProperty(META_SYNC_BASE_PATH.key(), basePath);
props.setProperty(META_SYNC_DATABASE_NAME.key(), databaseName);
props.setProperty(META_SYNC_TABLE_NAME.key(), tableName);
props.setProperty(META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), String.join(",", partitionFields));
props.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), partitionValueExtractorClass);
props.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(assumeDatePartitioning));
props.setProperty(META_SYNC_DECODE_PARTITION.key(), String.valueOf(decodePartition));
props.setProperty(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(useFileListingFromMetadata));
props.setProperty(META_SYNC_CONDITIONAL_SYNC.key(), String.valueOf(isConditionalSync));
props.setProperty(META_SYNC_SPARK_VERSION.key(), sparkVersion);
public TypedProperties toProps() {
final TypedProperties props = new TypedProperties();
props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), basePath);
props.setPropertyIfNonNull(META_SYNC_DATABASE_NAME.key(), databaseName);
props.setPropertyIfNonNull(META_SYNC_TABLE_NAME.key(), tableName);
props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);
props.setStringListIfNonNull(META_SYNC_PARTITION_FIELDS.key(), partitionFields, ",");
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), partitionValueExtractorClass);
props.setPropertyIfNonNull(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(assumeDatePartitioning));
props.setPropertyIfNonNull(META_SYNC_DECODE_PARTITION.key(), String.valueOf(decodePartition));
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(useFileListingFromMetadata));
props.setPropertyIfNonNull(META_SYNC_CONDITIONAL_SYNC.key(), String.valueOf(isConditionalSync));
props.setPropertyIfNonNull(META_SYNC_SPARK_VERSION.key(), sparkVersion);
return props;
}
}
Expand Down

0 comments on commit 041f402

Please sign in to comment.