From 85b146d3d508e68b2f1ff10f232c37c6a474b77b Mon Sep 17 00:00:00 2001 From: huberylee Date: Fri, 20 May 2022 22:25:32 +0800 Subject: [PATCH] [HUDI-3985] Refactor DLASyncTool to support read hoodie table as spark datasource table (#5532) --- .../org/apache/hudi/DataSourceOptions.scala | 2 +- .../hudi/command/DropHoodieTableCommand.scala | 2 +- .../CreateHoodieTableAsSelectCommand.scala | 2 +- .../sql/hudi/catalog/HoodieCatalog.scala | 2 +- .../{hudi-dla-sync => hudi-adb-sync}/pom.xml | 2 +- .../src/assembly/src.xml | 0 .../sync/adb/AbstractAdbSyncHoodieClient.java | 128 +++++ .../apache/hudi/sync/adb/AdbSyncConfig.java | 240 ++++++++++ .../org/apache/hudi/sync/adb/AdbSyncTool.java | 283 +++++++++++ .../hudi/sync/adb/HoodieAdbJdbcClient.java | 440 ++++++++++++++++++ .../hudi/sync/adb/HoodieAdbSyncException.java | 29 ++ .../hudi/sync/adb/TestAdbSyncConfig.java | 65 +++ .../resources/log4j-surefire-quiet.properties | 0 .../test/resources/log4j-surefire.properties | 0 .../org/apache/hudi/dla/DLASyncConfig.java | 111 ----- .../java/org/apache/hudi/dla/DLASyncTool.java | 213 --------- .../org/apache/hudi/dla/HoodieDLAClient.java | 428 ----------------- .../java/org/apache/hudi/dla/util/Utils.java | 77 --- .../apache/hudi/dla/TestDLASyncConfig.java | 55 --- .../org/apache/hudi/hive/HiveSyncTool.java | 84 +--- .../apache/hudi/hive/TestHiveSyncTool.java | 2 +- .../hive/TestParquet2SparkSchemaUtils.java | 2 +- .../hudi/sync/common/AbstractSyncTool.java | 82 ++++ .../hudi/sync/common}/util/ConfigUtils.java | 2 +- .../util/Parquet2SparkSchemaUtils.java | 2 +- hudi-sync/pom.xml | 2 +- 26 files changed, 1281 insertions(+), 974 deletions(-) rename hudi-sync/{hudi-dla-sync => hudi-adb-sync}/pom.xml (99%) rename hudi-sync/{hudi-dla-sync => hudi-adb-sync}/src/assembly/src.xml (100%) create mode 100644 hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java create mode 100644 hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java create mode 100644 hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java create mode 100644 hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java create mode 100644 hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java create mode 100644 hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java rename hudi-sync/{hudi-dla-sync => hudi-adb-sync}/src/test/resources/log4j-surefire-quiet.properties (100%) rename hudi-sync/{hudi-dla-sync => hudi-adb-sync}/src/test/resources/log4j-surefire.properties (100%) delete mode 100644 hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java delete mode 100644 hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java delete mode 100644 hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java delete mode 100644 hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java delete mode 100644 hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java rename hudi-sync/{hudi-hive-sync/src/main/java/org/apache/hudi/hive => hudi-sync-common/src/main/java/org/apache/hudi/sync/common}/util/ConfigUtils.java (98%) rename hudi-sync/{hudi-hive-sync/src/main/java/org/apache/hudi/hive => hudi-sync-common/src/main/java/org/apache/hudi/sync/common}/util/Parquet2SparkSchemaUtils.java (99%) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 0d4c7cf184dd..36dd07f28a18 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -26,11 +26,11 @@ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.Option import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} -import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig +import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index 68582fc2795d..c24d0fd992d9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType -import org.apache.hudi.hive.util.ConfigUtils +import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 66aeb850e49e..733bd67a0b0d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.HiveSyncConfig -import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.sql.InsertMode import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps +import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index 2c5261a12f14..f30976f58ea2 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hudi.catalog import org.apache.hadoop.fs.Path import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.sql.InsertMode +import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.catalyst.TableIdentifier diff --git a/hudi-sync/hudi-dla-sync/pom.xml b/hudi-sync/hudi-adb-sync/pom.xml similarity index 99% rename from hudi-sync/hudi-dla-sync/pom.xml rename to hudi-sync/hudi-adb-sync/pom.xml index 3770225ef7fc..0dd8783b6713 100644 --- a/hudi-sync/hudi-dla-sync/pom.xml +++ b/hudi-sync/hudi-adb-sync/pom.xml @@ -25,7 +25,7 @@ 4.0.0 - hudi-dla-sync + hudi-adb-sync jar diff --git a/hudi-sync/hudi-dla-sync/src/assembly/src.xml b/hudi-sync/hudi-adb-sync/src/assembly/src.xml similarity index 100% rename from hudi-sync/hudi-dla-sync/src/assembly/src.xml rename to hudi-sync/hudi-adb-sync/src/assembly/src.xml diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java new file mode 100644 index 000000000000..84316ddb1152 --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.adb; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.PartitionValueExtractor; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClient { + protected AdbSyncConfig adbSyncConfig; + protected PartitionValueExtractor partitionValueExtractor; + protected HoodieTimeline activeTimeline; + + public AbstractAdbSyncHoodieClient(AdbSyncConfig syncConfig, FileSystem fs) { + super(syncConfig.basePath, syncConfig.assumeDatePartitioning, + syncConfig.useFileListingFromMetadata, false, fs); + this.adbSyncConfig = syncConfig; + final String clazz = adbSyncConfig.partitionValueExtractorClass; + try { + this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(clazz).newInstance(); + } catch (Exception e) { + throw new HoodieException("Fail to init PartitionValueExtractor class " + clazz, e); + } + + activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public List getPartitionEvents(Map, String> tablePartitions, + List partitionStoragePartitions) { + Map paths = new HashMap<>(); + + for (Map.Entry, String> entry : tablePartitions.entrySet()) { + List partitionValues = entry.getKey(); + String fullTablePartitionPath = entry.getValue(); + paths.put(String.join(", ", partitionValues), fullTablePartitionPath); + } + List events = new ArrayList<>(); + for (String storagePartition : partitionStoragePartitions) { + Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, storagePartition); + String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); + // Check if the partition values or if hdfs path is the same + List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); + if (adbSyncConfig.useHiveStylePartitioning) { + String partition = String.join("/", storagePartitionValues); + storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition); + fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); + } + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } + } + } + return events; + } + + public void close() { + + } + + public abstract Map, String> scanTablePartitions(String tableName) throws Exception; + + public abstract void updateTableDefinition(String tableName, SchemaDifference schemaDiff) throws Exception; + + public abstract boolean databaseExists(String databaseName) throws Exception; + + public abstract void createDatabase(String databaseName) throws Exception; + + public abstract void dropTable(String tableName); + + protected String getDatabasePath() { + String dbLocation = adbSyncConfig.dbLocation; + Path dbLocationPath; + if (StringUtils.isNullOrEmpty(dbLocation)) { + if (new Path(adbSyncConfig.basePath).isRoot()) { + dbLocationPath = new Path(adbSyncConfig.basePath); + } else { + dbLocationPath = new Path(adbSyncConfig.basePath).getParent(); + } + } else { + dbLocationPath = new Path(dbLocation); + } + return generateAbsolutePathStr(dbLocationPath); + } + + protected String generateAbsolutePathStr(Path path) { + String absolutePathStr = path.toString(); + if (path.toUri().getScheme() == null) { + absolutePathStr = getDefaultFs() + absolutePathStr; + } + return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/"; + } + + protected String getDefaultFs() { + return fs.getConf().get("fs.defaultFS"); + } +} diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java new file mode 100644 index 000000000000..ae2e7024e587 --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.adb; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.sync.common.HoodieSyncConfig; + +import com.beust.jcommander.Parameter; + +/** + * Configs needed to sync data into Alibaba Cloud AnalyticDB(ADB). + */ +public class AdbSyncConfig extends HoodieSyncConfig { + + @Parameter(names = {"--user"}, description = "Adb username", required = true) + public String adbUser; + + @Parameter(names = {"--pass"}, description = "Adb password", required = true) + public String adbPass; + + @Parameter(names = {"--jdbc-url"}, description = "Adb jdbc connect url", required = true) + public String jdbcUrl; + + @Parameter(names = {"--skip-ro-suffix"}, description = "Whether skip the `_ro` suffix for read optimized table when syncing") + public Boolean skipROSuffix; + + @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing") + public Boolean skipRTSync; + + @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2") + public Boolean useHiveStylePartitioning; + + @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type") + public Boolean supportTimestamp; + + @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table") + public Boolean syncAsSparkDataSourceTable; + + @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true) + public String tableProperties; + + @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true) + public String serdeProperties; + + @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore") + public int sparkSchemaLengthThreshold; + + @Parameter(names = {"--db-location"}, description = "Database location") + public String dbLocation; + + @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database") + public Boolean autoCreateDatabase = true; + + @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing") + public Boolean skipLastCommitTimeSync = false; + + @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation") + public Boolean dropTableBeforeCreation = false; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + public static final ConfigProperty ADB_SYNC_USER = ConfigProperty + .key("hoodie.datasource.adb.sync.username") + .noDefaultValue() + .withDocumentation("ADB username"); + + public static final ConfigProperty ADB_SYNC_PASS = ConfigProperty + .key("hoodie.datasource.adb.sync.password") + .noDefaultValue() + .withDocumentation("ADB user password"); + + public static final ConfigProperty ADB_SYNC_JDBC_URL = ConfigProperty + .key("hoodie.datasource.adb.sync.jdbc_url") + .noDefaultValue() + .withDocumentation("Adb jdbc connect url"); + + public static final ConfigProperty ADB_SYNC_SKIP_RO_SUFFIX = ConfigProperty + .key("hoodie.datasource.adb.sync.skip_ro_suffix") + .defaultValue(true) + .withDocumentation("Whether skip the `_ro` suffix for read optimized table when syncing"); + + public static final ConfigProperty ADB_SYNC_SKIP_RT_SYNC = ConfigProperty + .key("hoodie.datasource.adb.sync.skip_rt_sync") + .defaultValue(true) + .withDocumentation("Whether skip the rt table when syncing"); + + public static final ConfigProperty ADB_SYNC_USE_HIVE_STYLE_PARTITIONING = ConfigProperty + .key("hoodie.datasource.adb.sync.hive_style_partitioning") + .defaultValue(false) + .withDocumentation("Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2"); + + public static final ConfigProperty ADB_SYNC_SUPPORT_TIMESTAMP = ConfigProperty + .key("hoodie.datasource.adb.sync.support_timestamp") + .defaultValue(false) + .withDocumentation("If true, converts int64(timestamp_micros) to timestamp type"); + + public static final ConfigProperty ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE = ConfigProperty + .key("hoodie.datasource.adb.sync.sync_as_spark_datasource") + .defaultValue(true) + .withDocumentation("Whether sync this table as spark data source table"); + + public static final ConfigProperty ADB_SYNC_TABLE_PROPERTIES = ConfigProperty + .key("hoodie.datasource.adb.sync.table_properties") + .noDefaultValue() + .withDocumentation("Table properties, to support read hoodie table as datasource table"); + + public static final ConfigProperty ADB_SYNC_SERDE_PROPERTIES = ConfigProperty + .key("hoodie.datasource.adb.sync.serde_properties") + .noDefaultValue() + .withDocumentation("Serde properties, to support read hoodie table as datasource table"); + + public static final ConfigProperty ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty + .key("hoodie.datasource.adb.sync.schema_string_length_threshold") + .defaultValue(4000) + .withDocumentation("The maximum length allowed in a single cell when storing additional schema information in Hive's metastore"); + + public static final ConfigProperty ADB_SYNC_DB_LOCATION = ConfigProperty + .key("hoodie.datasource.adb.sync.db_location") + .noDefaultValue() + .withDocumentation("Database location"); + + public static final ConfigProperty ADB_SYNC_AUTO_CREATE_DATABASE = ConfigProperty + .key("hoodie.datasource.adb.sync.auto_create_database") + .defaultValue(true) + .withDocumentation("Whether auto create adb database"); + + public static final ConfigProperty ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC = ConfigProperty + .key("hoodie.datasource.adb.sync.skip_last_commit_time_sync") + .defaultValue(false) + .withDocumentation("Whether skip last commit time syncing"); + + public static final ConfigProperty ADB_SYNC_DROP_TABLE_BEFORE_CREATION = ConfigProperty + .key("hoodie.datasource.adb.sync.drop_table_before_creation") + .defaultValue(false) + .withDocumentation("Whether drop table before creation"); + + public AdbSyncConfig() { + this(new TypedProperties()); + } + + public AdbSyncConfig(TypedProperties props) { + super(props); + + adbUser = getString(ADB_SYNC_USER); + adbPass = getString(ADB_SYNC_PASS); + jdbcUrl = getString(ADB_SYNC_JDBC_URL); + skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX); + skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC); + useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING); + supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP); + syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE); + tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES); + serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES); + sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD); + dbLocation = getString(ADB_SYNC_DB_LOCATION); + autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE); + skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC); + dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION); + } + + public static TypedProperties toProps(AdbSyncConfig cfg) { + TypedProperties properties = new TypedProperties(); + properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName); + properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName); + properties.put(ADB_SYNC_USER.key(), cfg.adbUser); + properties.put(ADB_SYNC_PASS.key(), cfg.adbPass); + properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl); + properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath); + properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields)); + properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass); + properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning)); + properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix)); + properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync)); + properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning)); + properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata)); + properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp)); + properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties); + properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties); + properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable)); + properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold)); + properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion); + properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation); + properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase)); + properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync)); + properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation)); + + return properties; + } + + @Override + public String toString() { + return "AdbSyncConfig{" + + "adbUser='" + adbUser + '\'' + + ", adbPass='" + adbPass + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", skipROSuffix=" + skipROSuffix + + ", skipRTSync=" + skipRTSync + + ", useHiveStylePartitioning=" + useHiveStylePartitioning + + ", supportTimestamp=" + supportTimestamp + + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable + + ", tableProperties='" + tableProperties + '\'' + + ", serdeProperties='" + serdeProperties + '\'' + + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + + ", dbLocation='" + dbLocation + '\'' + + ", autoCreateDatabase=" + autoCreateDatabase + + ", skipLastCommitTimeSync=" + skipLastCommitTimeSync + + ", dropTableBeforeCreation=" + dropTableBeforeCreation + + ", help=" + help + + ", databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", basePath='" + basePath + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", partitionFields=" + partitionFields + + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", decodePartition=" + decodePartition + + ", useFileListingFromMetadata=" + useFileListingFromMetadata + + ", isConditionalSync=" + isConditionalSync + + ", sparkVersion='" + sparkVersion + '\'' + + '}'; + } +} diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java new file mode 100644 index 000000000000..8c2f9e20451c --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.adb; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; +import org.apache.hudi.sync.common.AbstractSyncTool; +import org.apache.hudi.sync.common.util.ConfigUtils; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Adb sync tool is mainly used to sync hoodie tables to Alibaba Cloud AnalyticDB(ADB), + * it can be used as API `AdbSyncTool.syncHoodieTable(AdbSyncConfig)` or as command + * line `java -cp hoodie-hive.jar AdbSyncTool [args]` + * + *

+ * This utility will get the schema from the latest commit and will sync ADB table schema, + * incremental partitions will be synced as well. + */ +@SuppressWarnings("WeakerAccess") +public class AdbSyncTool extends AbstractSyncTool { + private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class); + + public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; + public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; + + private final AdbSyncConfig adbSyncConfig; + private final AbstractAdbSyncHoodieClient hoodieAdbClient; + private final String snapshotTableName; + private final Option roTableTableName; + + public AdbSyncTool(TypedProperties props, Configuration conf, FileSystem fs) { + super(props, conf, fs); + this.adbSyncConfig = new AdbSyncConfig(props); + this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs); + switch (hoodieAdbClient.getTableType()) { + case COPY_ON_WRITE: + this.snapshotTableName = adbSyncConfig.tableName; + this.roTableTableName = Option.empty(); + break; + case MERGE_ON_READ: + this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE; + this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName) + : Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE); + break; + default: + throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType() + + ", basePath:" + hoodieAdbClient.getBasePath()); + } + } + + private AbstractAdbSyncHoodieClient getHoodieAdbClient(AdbSyncConfig adbSyncConfig, FileSystem fs) { + return new HoodieAdbJdbcClient(adbSyncConfig, fs); + } + + @Override + public void syncHoodieTable() { + try { + switch (hoodieAdbClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(snapshotTableName, false, false); + break; + case MERGE_ON_READ: + // Sync a ro table for MOR table + syncHoodieTable(roTableTableName.get(), false, true); + // Sync a rt table for MOR table + if (!adbSyncConfig.skipRTSync) { + syncHoodieTable(snapshotTableName, true, false); + } + break; + default: + throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType() + + ", basePath:" + hoodieAdbClient.getBasePath()); + } + } catch (Exception re) { + throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re); + } finally { + hoodieAdbClient.close(); + } + } + + private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, + boolean readAsOptimized) throws Exception { + LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}", + tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType()); + + if (adbSyncConfig.autoCreateDatabase) { + try { + synchronized (AdbSyncTool.class) { + if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) { + hoodieAdbClient.createDatabase(adbSyncConfig.databaseName); + } + } + } catch (Exception e) { + throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName + + ", useRealtimeInputFormat = " + useRealtimeInputFormat, e); + } + } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) { + throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName); + } + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], + // we can remove this logical. + if (hoodieAdbClient.isBootstrap() + && hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + adbSyncConfig.syncAsSparkDataSourceTable = false; + LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName); + } + + if (adbSyncConfig.dropTableBeforeCreation) { + LOG.info("Drop table before creation, tableName:{}", tableName); + hoodieAdbClient.dropTable(tableName); + } + + boolean tableExists = hoodieAdbClient.tableExists(tableName); + + // Get the parquet schema for this table looking at the latest commit + MessageType schema = hoodieAdbClient.getDataSchema(); + + // Sync schema if needed + syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); + LOG.info("Sync schema complete, start syncing partitions for table:{}", tableName); + + // Get the last time we successfully synced partitions + Option lastCommitTimeSynced = Option.empty(); + if (tableExists) { + lastCommitTimeSynced = hoodieAdbClient.getLastCommitTimeSynced(tableName); + } + LOG.info("Last commit time synced was found:{}", lastCommitTimeSynced.orElse("null")); + + // Scan synced partitions + List writtenPartitionsSince; + if (adbSyncConfig.partitionFields.isEmpty()) { + writtenPartitionsSince = new ArrayList<>(); + } else { + writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + } + LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size()); + + // Sync the partitions if needed + syncPartitions(tableName, writtenPartitionsSince); + + // Update sync commit time + // whether to skip syncing commit time stored in tbl properties, since it is time consuming. + if (!adbSyncConfig.skipLastCommitTimeSync) { + hoodieAdbClient.updateLastCommitTimeSynced(tableName); + } + LOG.info("Sync complete for table:{}", tableName); + } + + /** + * Get the latest schema from the last commit and check if its in sync with the ADB + * table schema. If not, evolves the table schema. + * + * @param tableName The table to be synced + * @param tableExists Whether target table exists + * @param useRealTimeInputFormat Whether using realtime input format + * @param readAsOptimized Whether read as optimized table + * @param schema The extracted schema + */ + private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, + boolean readAsOptimized, MessageType schema) throws Exception { + // Append spark table properties & serde properties + Map tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties); + Map serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties); + if (adbSyncConfig.syncAsSparkDataSourceTable) { + Map sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields, + adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema); + Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath); + tableProperties.putAll(sparkTableProperties); + serdeProperties.putAll(sparkSerdeProperties); + LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}", + tableName, tableExists, tableProperties, serdeProperties); + } + + // Check and sync schema + if (!tableExists) { + LOG.info("ADB table [{}] is not found, creating it", tableName); + String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat); + + // Custom serde will not work with ALTER TABLE REPLACE COLUMNS + // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive + // /ql/exec/DDLTask.java#L3488 + hoodieAdbClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), + ParquetHiveSerDe.class.getName(), serdeProperties, tableProperties); + } else { + // Check if the table schema has evolved + Map tableSchema = hoodieAdbClient.getTableSchema(tableName); + SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields, + adbSyncConfig.supportTimestamp); + if (!schemaDiff.isEmpty()) { + LOG.info("Schema difference found for table:{}", tableName); + hoodieAdbClient.updateTableDefinition(tableName, schemaDiff); + } else { + LOG.info("No Schema difference for table:{}", tableName); + } + } + } + + /** + * Syncs the list of storage partitions passed in (checks if the partition is in adb, if not adds it or if the + * partition path does not match, it updates the partition path). + */ + private void syncPartitions(String tableName, List writtenPartitionsSince) { + try { + if (adbSyncConfig.partitionFields.isEmpty()) { + LOG.info("Not a partitioned table."); + return; + } + + Map, String> partitions = hoodieAdbClient.scanTablePartitions(tableName); + List partitionEvents = hoodieAdbClient.getPartitionEvents(partitions, writtenPartitionsSince); + List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); + LOG.info("New Partitions:{}", newPartitions); + hoodieAdbClient.addPartitionsToTable(tableName, newPartitions); + List updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); + LOG.info("Changed Partitions:{}", updatePartitions); + hoodieAdbClient.updatePartitionsToTable(tableName, updatePartitions); + } catch (Exception e) { + throw new HoodieAdbSyncException("Failed to sync partitions for table:" + tableName, e); + } + } + + private List filterPartitions(List events, PartitionEventType eventType) { + return events.stream().filter(s -> s.eventType == eventType) + .map(s -> s.storagePartition).collect(Collectors.toList()); + } + + public static void main(String[] args) { + // parse the params + final AdbSyncConfig cfg = new AdbSyncConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + Configuration hadoopConf = new Configuration(); + FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf); + new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable(); + } +} diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java new file mode 100644 index 000000000000..a347ba701110 --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.adb; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.SchemaDifference; +import org.apache.hudi.hive.util.HiveSchemaUtil; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient { + private static final Logger LOG = LoggerFactory.getLogger(HoodieAdbJdbcClient.class); + + public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync"; + // Make sure we have the jdbc driver in classpath + private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; + public static final String ADB_ESCAPE_CHARACTER = ""; + private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES"; + + static { + try { + Class.forName(DRIVER_NAME); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); + } + } + + private Connection connection; + + public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) { + super(syncConfig, fs); + createAdbConnection(); + LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl); + } + + private void createAdbConnection() { + if (connection == null) { + try { + Class.forName(DRIVER_NAME); + } catch (ClassNotFoundException e) { + LOG.error("Unable to load jdbc driver class", e); + return; + } + try { + this.connection = DriverManager.getConnection( + adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass); + } catch (SQLException e) { + throw new HoodieException("Cannot create adb connection ", e); + } + } + } + + @Override + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties) { + try { + LOG.info("Creating table:{}", tableName); + String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, + getHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); + executeAdbSql(createSQLQuery); + } catch (IOException e) { + throw new HoodieException("Fail to create table:" + tableName, e); + } + } + + @Override + public void dropTable(String tableName) { + LOG.info("Dropping table:{}", tableName); + String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`"; + executeAdbSql(dropTable); + } + + public Map getTableSchema(String tableName) { + Map schema = new HashMap<>(); + ResultSet result = null; + try { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + result = databaseMetaData.getColumns(adbSyncConfig.databaseName, + adbSyncConfig.databaseName, tableName, null); + while (result.next()) { + String columnName = result.getString(4); + String columnType = result.getString(6); + if ("DECIMAL".equals(columnType)) { + int columnSize = result.getInt("COLUMN_SIZE"); + int decimalDigits = result.getInt("DECIMAL_DIGITS"); + columnType += String.format("(%s,%s)", columnSize, decimalDigits); + } + schema.put(columnName, columnType); + } + return schema; + } catch (SQLException e) { + throw new HoodieException("Fail to get table schema:" + tableName, e); + } finally { + closeQuietly(result, null); + } + } + + @Override + public void addPartitionsToTable(String tableName, List partitionsToAdd) { + if (partitionsToAdd.isEmpty()) { + LOG.info("No partitions to add for table:{}", tableName); + return; + } + + LOG.info("Adding partitions to table:{}, partitionNum:{}", tableName, partitionsToAdd.size()); + String sql = constructAddPartitionsSql(tableName, partitionsToAdd); + executeAdbSql(sql); + } + + private void executeAdbSql(String sql) { + Statement stmt = null; + try { + stmt = connection.createStatement(); + LOG.info("Executing sql:{}", sql); + stmt.execute(sql); + } catch (SQLException e) { + throw new HoodieException("Fail to execute sql:" + sql, e); + } finally { + closeQuietly(null, stmt); + } + } + + private T executeQuerySQL(String sql, Function function) { + Statement stmt = null; + try { + stmt = connection.createStatement(); + LOG.info("Executing sql:{}", sql); + return function.apply(stmt.executeQuery(sql)); + } catch (SQLException e) { + throw new HoodieException("Fail to execute sql:" + sql, e); + } finally { + closeQuietly(null, stmt); + } + } + + public void createDatabase(String databaseName) { + String rootPath = getDatabasePath(); + LOG.info("Creating database:{}, databaseLocation:{}", databaseName, rootPath); + String sql = constructCreateDatabaseSql(rootPath); + executeAdbSql(sql); + } + + public boolean databaseExists(String databaseName) { + String sql = constructShowCreateDatabaseSql(databaseName); + Function transform = resultSet -> { + try { + return resultSet.next(); + } catch (Exception e) { + if (e.getMessage().contains("Unknown database `" + databaseName + "`")) { + return false; + } else { + throw new HoodieException("Fail to execute sql:" + sql, e); + } + } + }; + return executeQuerySQL(sql, transform); + } + + @Override + public boolean doesTableExist(String tableName) { + String sql = constructShowLikeTableSql(tableName); + Function transform = resultSet -> { + try { + return resultSet.next(); + } catch (Exception e) { + throw new HoodieException("Fail to execute sql:" + sql, e); + } + }; + return executeQuerySQL(sql, transform); + } + + @Override + public boolean tableExists(String tableName) { + return doesTableExist(tableName); + } + + @Override + public Option getLastCommitTimeSynced(String tableName) { + String sql = constructShowCreateTableSql(tableName); + + Function> transform = resultSet -> { + try { + if (resultSet.next()) { + String table = resultSet.getString(2); + Map attr = new HashMap<>(); + int index = table.indexOf(TBL_PROPERTIES_STR); + if (index != -1) { + String sub = table.substring(index + TBL_PROPERTIES_STR.length()); + sub = sub + .replaceAll("\\(", "") + .replaceAll("\\)", "") + .replaceAll("'", ""); + String[] str = sub.split(","); + + for (String s : str) { + String key = s.split("=")[0].trim(); + String value = s.split("=")[1].trim(); + attr.put(key, value); + } + } + return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null)); + } + return Option.empty(); + } catch (Exception e) { + throw new HoodieException("Fail to execute sql:" + sql, e); + } + }; + return executeQuerySQL(sql, transform); + } + + @Override + public void updateLastCommitTimeSynced(String tableName) { + // Set the last commit time from the TBLProperties + String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp(); + try { + String sql = constructUpdateTblPropertiesSql(tableName, lastCommitSynced); + executeAdbSql(sql); + } catch (Exception e) { + throw new HoodieHiveSyncException("Fail to get update last commit time synced:" + lastCommitSynced, e); + } + } + + @Override + public Option getLastReplicatedTime(String tableName) { + throw new UnsupportedOperationException("Not support getLastReplicatedTime yet"); + } + + @Override + public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { + throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet"); + } + + @Override + public void deleteLastReplicatedTimeStamp(String tableName) { + throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet"); + } + + @Override + public void updatePartitionsToTable(String tableName, List changedPartitions) { + if (changedPartitions.isEmpty()) { + LOG.info("No partitions to change for table:{}", tableName); + return; + } + + LOG.info("Changing partitions on table:{}, changedPartitionNum:{}", tableName, changedPartitions.size()); + List sqlList = constructChangePartitionsSql(tableName, changedPartitions); + for (String sql : sqlList) { + executeAdbSql(sql); + } + } + + @Override + public void dropPartitions(String tableName, List partitionsToDrop) { + throw new UnsupportedOperationException("Not support dropPartitions yet."); + } + + public Map, String> scanTablePartitions(String tableName) { + String sql = constructShowPartitionSql(tableName); + Function, String>> transform = resultSet -> { + Map, String> partitions = new HashMap<>(); + try { + while (resultSet.next()) { + if (resultSet.getMetaData().getColumnCount() > 0) { + String str = resultSet.getString(1); + if (!StringUtils.isNullOrEmpty(str)) { + List values = partitionValueExtractor.extractPartitionValuesInPath(str); + Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values)); + String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); + partitions.put(values, fullStoragePartitionPath); + } + } + } + } catch (Exception e) { + throw new HoodieException("Fail to execute sql:" + sql, e); + } + return partitions; + }; + return executeQuerySQL(sql, transform); + } + + public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) { + LOG.info("Adding columns for table:{}", tableName); + schemaDiff.getAddColumnTypes().forEach((columnName, columnType) -> + executeAdbSql(constructAddColumnSql(tableName, columnName, columnType)) + ); + + LOG.info("Updating columns' definition for table:{}", tableName); + schemaDiff.getUpdateColumnTypes().forEach((columnName, columnType) -> + executeAdbSql(constructChangeColumnSql(tableName, columnName, columnType)) + ); + } + + private String constructAddPartitionsSql(String tableName, List partitions) { + StringBuilder sqlBuilder = new StringBuilder("alter table `"); + sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`") + .append(tableName).append("`").append(" add if not exists "); + for (String partition : partitions) { + String partitionClause = getPartitionClause(partition); + Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition); + String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); + sqlBuilder.append(" partition (").append(partitionClause).append(") location '") + .append(fullPartitionPathStr).append("' "); + } + + return sqlBuilder.toString(); + } + + private List constructChangePartitionsSql(String tableName, List partitions) { + List changePartitions = new ArrayList<>(); + String useDatabase = "use `" + adbSyncConfig.databaseName + "`"; + changePartitions.add(useDatabase); + + String alterTable = "alter table `" + tableName + "`"; + for (String partition : partitions) { + String partitionClause = getPartitionClause(partition); + Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition); + String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); + String changePartition = alterTable + " add if not exists partition (" + partitionClause + + ") location '" + fullPartitionPathStr + "'"; + changePartitions.add(changePartition); + } + + return changePartitions; + } + + /** + * Generate Hive Partition from partition values. + * + * @param partition Partition path + * @return partition clause + */ + private String getPartitionClause(String partition) { + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(), + "Partition key parts " + adbSyncConfig.partitionFields + + " does not match with partition values " + partitionValues + ". Check partition strategy. "); + List partBuilder = new ArrayList<>(); + for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) { + partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'"); + } + + return String.join(",", partBuilder); + } + + private String constructShowPartitionSql(String tableName) { + return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName); + } + + private String constructShowCreateTableSql(String tableName) { + return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName); + } + + private String constructShowLikeTableSql(String tableName) { + return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName); + } + + private String constructCreateDatabaseSql(String rootPath) { + return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')", + adbSyncConfig.databaseName, rootPath); + } + + private String constructShowCreateDatabaseSql(String databaseName) { + return String.format("show create database `%s`", databaseName); + } + + private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) { + return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')", + adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced); + } + + private String constructAddColumnSql(String tableName, String columnName, String columnType) { + return String.format("alter table `%s`.`%s` add columns(`%s` %s)", + adbSyncConfig.databaseName, tableName, columnName, columnType); + } + + private String constructChangeColumnSql(String tableName, String columnName, String columnType) { + return String.format("alter table `%s`.`%s` change `%s` `%s` %s", + adbSyncConfig.databaseName, tableName, columnName, columnName, columnType); + } + + private HiveSyncConfig getHiveSyncConfig() { + HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); + hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields; + hiveSyncConfig.databaseName = adbSyncConfig.databaseName; + Path basePath = new Path(adbSyncConfig.basePath); + hiveSyncConfig.basePath = generateAbsolutePathStr(basePath); + return hiveSyncConfig; + } + + @Override + public void close() { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error("Fail to close connection", e); + } + } +} diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java new file mode 100644 index 000000000000..0deb9b94cd52 --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbSyncException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.adb; + +public class HoodieAdbSyncException extends RuntimeException { + public HoodieAdbSyncException(String message) { + super(message); + } + + public HoodieAdbSyncException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java new file mode 100644 index 000000000000..f4eb8fc7fc45 --- /dev/null +++ b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.adb; + +import org.apache.hudi.common.config.TypedProperties; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +public class TestAdbSyncConfig { + @Test + public void testCopy() { + AdbSyncConfig adbSyncConfig = new AdbSyncConfig(); + adbSyncConfig.partitionFields = Arrays.asList("a", "b"); + adbSyncConfig.basePath = "/tmp"; + adbSyncConfig.assumeDatePartitioning = true; + adbSyncConfig.databaseName = "test"; + adbSyncConfig.tableName = "test"; + adbSyncConfig.adbUser = "adb"; + adbSyncConfig.adbPass = "adb"; + adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306"; + adbSyncConfig.skipROSuffix = false; + adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n" + + "spark.sql.sources.schema.numParts = '1'\\n " + + "spark.sql.sources.schema.part.0 ='xx'\\n " + + "spark.sql.sources.schema.numPartCols = '1'\\n" + + "spark.sql.sources.schema.partCol.0 = 'dt'"; + adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'"; + adbSyncConfig.dbLocation = "file://tmp/test_db"; + + TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig); + AdbSyncConfig copied = new AdbSyncConfig(props); + + assertEquals(copied.partitionFields, adbSyncConfig.partitionFields); + assertEquals(copied.basePath, adbSyncConfig.basePath); + assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning); + assertEquals(copied.databaseName, adbSyncConfig.databaseName); + assertEquals(copied.tableName, adbSyncConfig.tableName); + assertEquals(copied.adbUser, adbSyncConfig.adbUser); + assertEquals(copied.adbPass, adbSyncConfig.adbPass); + assertEquals(copied.basePath, adbSyncConfig.basePath); + assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl); + assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix); + assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp); + } +} diff --git a/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties b/hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire-quiet.properties similarity index 100% rename from hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire-quiet.properties rename to hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire-quiet.properties diff --git a/hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties b/hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire.properties similarity index 100% rename from hudi-sync/hudi-dla-sync/src/test/resources/log4j-surefire.properties rename to hudi-sync/hudi-adb-sync/src/test/resources/log4j-surefire.properties diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java deleted file mode 100644 index d4d580fe276a..000000000000 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncConfig.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.dla; - -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; - -import com.beust.jcommander.Parameter; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * Configs needed to sync data into DLA. - */ -public class DLASyncConfig implements Serializable { - - @Parameter(names = {"--database"}, description = "name of the target database in DLA", required = true) - public String databaseName; - - @Parameter(names = {"--table"}, description = "name of the target table in DLA", required = true) - public String tableName; - - @Parameter(names = {"--user"}, description = "DLA username", required = true) - public String dlaUser; - - @Parameter(names = {"--pass"}, description = "DLA password", required = true) - public String dlaPass; - - @Parameter(names = {"--jdbc-url"}, description = "DLA jdbc connect url", required = true) - public String jdbcUrl; - - @Parameter(names = {"--base-path"}, description = "Basepath of hoodie table to sync", required = true) - public String basePath; - - @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by") - public List partitionFields = new ArrayList<>(); - - @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor " - + "to extract the partition values from HDFS path") - public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName(); - - @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this" - + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter") - public Boolean assumeDatePartitioning = false; - - @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering") - public Boolean skipROSuffix = false; - - @Parameter(names = {"--skip-rt-sync"}, description = "Skip the RT table syncing") - public Boolean skipRTSync = false; - - @Parameter(names = {"--hive-style-partitioning"}, description = "Use DLA hive style partitioning, true if like the following style: field1=value1/field2=value2") - public Boolean useDLASyncHiveStylePartitioning = false; - - @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") - public Boolean useFileListingFromMetadata = HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; - - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - - @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type") - public Boolean supportTimestamp = false; - - public static DLASyncConfig copy(DLASyncConfig cfg) { - DLASyncConfig newConfig = new DLASyncConfig(); - newConfig.databaseName = cfg.databaseName; - newConfig.tableName = cfg.tableName; - newConfig.dlaUser = cfg.dlaUser; - newConfig.dlaPass = cfg.dlaPass; - newConfig.jdbcUrl = cfg.jdbcUrl; - newConfig.basePath = cfg.basePath; - newConfig.partitionFields = cfg.partitionFields; - newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; - newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; - newConfig.skipROSuffix = cfg.skipROSuffix; - newConfig.skipRTSync = cfg.skipRTSync; - newConfig.useDLASyncHiveStylePartitioning = cfg.useDLASyncHiveStylePartitioning; - newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; - newConfig.supportTimestamp = cfg.supportTimestamp; - return newConfig; - } - - @Override - public String toString() { - return "DLASyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\'' - + ", dlaUser='" + dlaUser + '\'' + ", dlaPass='" + dlaPass + '\'' + ", jdbcUrl='" + jdbcUrl + '\'' - + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" - + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning - + ", useDLASyncHiveStylePartitioning=" + useDLASyncHiveStylePartitioning - + ", useFileListingFromMetadata=" + useFileListingFromMetadata - + ", help=" + help + '}'; - } -} diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java deleted file mode 100644 index 97838d03ed66..000000000000 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.dla; - -import com.beust.jcommander.JCommander; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.dla.util.Utils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.InvalidTableException; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.hive.SchemaDifference; -import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; -import org.apache.hudi.sync.common.AbstractSyncTool; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.parquet.schema.MessageType; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Tool to sync a hoodie table with a dla table. Either use it as a api - * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args] - *

- * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the - * partitions incrementally (all the partitions modified since the last commit) - */ -@SuppressWarnings("WeakerAccess") -public class DLASyncTool extends AbstractSyncTool { - - private static final Logger LOG = LogManager.getLogger(DLASyncTool.class); - public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; - public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; - - private final DLASyncConfig cfg; - private final HoodieDLAClient hoodieDLAClient; - private final String snapshotTableName; - private final Option roTableTableName; - - public DLASyncTool(TypedProperties properties, Configuration conf, FileSystem fs) { - super(properties, conf, fs); - this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs); - this.cfg = Utils.propertiesToConfig(properties); - switch (hoodieDLAClient.getTableType()) { - case COPY_ON_WRITE: - this.snapshotTableName = cfg.tableName; - this.roTableTableName = Option.empty(); - break; - case MERGE_ON_READ: - this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE; - this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) : - Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE); - break; - default: - LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); - throw new InvalidTableException(hoodieDLAClient.getBasePath()); - } - } - - @Override - public void syncHoodieTable() { - try { - switch (hoodieDLAClient.getTableType()) { - case COPY_ON_WRITE: - syncHoodieTable(snapshotTableName, false); - break; - case MERGE_ON_READ: - // sync a RO table for MOR - syncHoodieTable(roTableTableName.get(), false); - // sync a RT table for MOR - if (!cfg.skipRTSync) { - syncHoodieTable(snapshotTableName, true); - } - break; - default: - LOG.error("Unknown table type " + hoodieDLAClient.getTableType()); - throw new InvalidTableException(hoodieDLAClient.getBasePath()); - } - } catch (RuntimeException re) { - throw new HoodieException("Got runtime exception when dla syncing " + cfg.tableName, re); - } finally { - hoodieDLAClient.close(); - } - } - - private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { - LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath() - + " of type " + hoodieDLAClient.getTableType()); - // Check if the necessary table exists - boolean tableExists = hoodieDLAClient.tableExists(tableName); - // Get the parquet schema for this table looking at the latest commit - MessageType schema = hoodieDLAClient.getDataSchema(); - // Sync schema if needed - syncSchema(tableName, tableExists, useRealtimeInputFormat, schema); - - LOG.info("Schema sync complete. Syncing partitions for " + tableName); - // Get the last time we successfully synced partitions - // TODO : once DLA supports alter table properties - Option lastCommitTimeSynced = Option.empty(); - /*if (tableExists) { - lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName); - }*/ - LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); - List writtenPartitionsSince = hoodieDLAClient.getPartitionsWrittenToSince(lastCommitTimeSynced); - LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); - // Sync the partitions if needed - syncPartitions(tableName, writtenPartitionsSince); - - hoodieDLAClient.updateLastCommitTimeSynced(tableName); - LOG.info("Sync complete for " + tableName); - } - - /** - * Get the latest schema from the last commit and check if its in sync with the dla table schema. If not, evolves the - * table schema. - * - * @param tableExists - does table exist - * @param schema - extracted schema - */ - private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) { - // Check and sync schema - if (!tableExists) { - LOG.info("DLA table " + tableName + " is not found. Creating it"); - - String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET, useRealTimeInputFormat); - - // Custom serde will not work with ALTER TABLE REPLACE COLUMNS - // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive - // /ql/exec/DDLTask.java#L3488 - hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), - ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); - } else { - // Check if the table schema has evolved - Map tableSchema = hoodieDLAClient.getTableSchema(tableName); - SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields, cfg.supportTimestamp); - if (!schemaDiff.isEmpty()) { - LOG.info("Schema difference found for " + tableName); - hoodieDLAClient.updateTableDefinition(tableName, schemaDiff); - } else { - LOG.info("No Schema difference for " + tableName); - } - } - } - - /** - * Syncs the list of storage partitions passed in (checks if the partition is in dla, if not adds it or if the - * partition path does not match, it updates the partition path). - */ - private void syncPartitions(String tableName, List writtenPartitionsSince) { - try { - if (cfg.partitionFields.isEmpty()) { - LOG.info("not a partitioned table."); - return; - } - Map, String> partitions = hoodieDLAClient.scanTablePartitions(tableName); - List partitionEvents = - hoodieDLAClient.getPartitionEvents(partitions, writtenPartitionsSince); - List newPartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.ADD); - LOG.info("New Partitions " + newPartitions); - hoodieDLAClient.addPartitionsToTable(tableName, newPartitions); - List updatePartitions = filterPartitions(partitionEvents, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType.UPDATE); - LOG.info("Changed Partitions " + updatePartitions); - hoodieDLAClient.updatePartitionsToTable(tableName, updatePartitions); - } catch (Exception e) { - throw new HoodieException("Failed to sync partitions for table " + tableName, e); - } - } - - private List filterPartitions(List events, AbstractSyncHoodieClient.PartitionEvent.PartitionEventType eventType) { - return events.stream().filter(s -> s.eventType == eventType).map(s -> s.storagePartition) - .collect(Collectors.toList()); - } - - public static void main(String[] args) { - // parse the params - final DLASyncConfig cfg = new DLASyncConfig(); - JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help || args.length == 0) { - cmd.usage(); - System.exit(1); - } - Configuration hadoopConf = new Configuration(); - FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf); - new DLASyncTool(Utils.configToProperties(cfg), hadoopConf, fs).syncHoodieTable(); - } -} diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java deleted file mode 100644 index 10869eaf27b6..000000000000 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.dla; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hive.HiveSyncConfig; -import org.apache.hudi.hive.HoodieHiveSyncException; -import org.apache.hudi.hive.PartitionValueExtractor; -import org.apache.hudi.hive.SchemaDifference; -import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.parquet.schema.MessageType; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class HoodieDLAClient extends AbstractSyncHoodieClient { - private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class); - private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync"; - // Make sure we have the dla JDBC driver in classpath - private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; - private static final String DLA_ESCAPE_CHARACTER = ""; - private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES"; - - static { - try { - Class.forName(DRIVER_NAME); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); - } - } - - private Connection connection; - private DLASyncConfig dlaConfig; - private PartitionValueExtractor partitionValueExtractor; - - public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) { - super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, - false, fs); - this.dlaConfig = syncConfig; - try { - this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance(); - } catch (Exception e) { - throw new HoodieException( - "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e); - } - createDLAConnection(); - } - - private void createDLAConnection() { - if (connection == null) { - try { - Class.forName(DRIVER_NAME); - } catch (ClassNotFoundException e) { - LOG.error("Unable to load DLA driver class", e); - return; - } - try { - this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass); - LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl); - } catch (SQLException e) { - throw new HoodieException("Cannot create dla connection ", e); - } - } - } - - @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, - String outputFormatClass, String serdeClass, - Map serdeProperties, Map tableProperties) { - try { - String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), - inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); - LOG.info("Creating table with " + createSQLQuery); - updateDLASQL(createSQLQuery); - } catch (IOException e) { - throw new HoodieException("Failed to create table " + tableName, e); - } - } - - public Map getTableSchema(String tableName) { - if (!tableExists(tableName)) { - throw new IllegalArgumentException( - "Failed to get schema for table " + tableName + " does not exist"); - } - Map schema = new HashMap<>(); - ResultSet result = null; - try { - DatabaseMetaData databaseMetaData = connection.getMetaData(); - result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null); - while (result.next()) { - TYPE_CONVERTOR.doConvert(result, schema); - } - return schema; - } catch (SQLException e) { - throw new HoodieException("Failed to get table schema for " + tableName, e); - } finally { - closeQuietly(result, null); - } - } - - @Override - public void addPartitionsToTable(String tableName, List partitionsToAdd) { - if (partitionsToAdd.isEmpty()) { - LOG.info("No partitions to add for " + tableName); - return; - } - LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); - String sql = constructAddPartitions(tableName, partitionsToAdd); - updateDLASQL(sql); - } - - public String constructAddPartitions(String tableName, List partitions) { - return constructDLAAddPartitions(tableName, partitions); - } - - String generateAbsolutePathStr(Path path) { - String absolutePathStr = path.toString(); - if (path.toUri().getScheme() == null) { - absolutePathStr = getDefaultFs() + absolutePathStr; - } - return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/"; - } - - public List constructChangePartitions(String tableName, List partitions) { - List changePartitions = new ArrayList<>(); - String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER; - changePartitions.add(useDatabase); - String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER; - for (String partition : partitions) { - String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); - String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); - String changePartition = - alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'"; - changePartitions.add(changePartition); - } - return changePartitions; - } - - /** - * Generate Hive Partition from partition values. - * - * @param partition Partition path - * @return - */ - public String getPartitionClause(String partition) { - List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(), - "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues - + ". Check partition strategy. "); - List partBuilder = new ArrayList<>(); - for (int i = 0; i < dlaConfig.partitionFields.size(); i++) { - partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'"); - } - return partBuilder.stream().collect(Collectors.joining(",")); - } - - private String constructDLAAddPartitions(String tableName, List partitions) { - StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); - alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName) - .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER) - .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS "); - for (String partition : partitions) { - String partitionClause = getPartitionClause(partition); - Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); - String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); - alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr) - .append("' "); - } - return alterSQL.toString(); - } - - private void updateDLASQL(String sql) { - Statement stmt = null; - try { - stmt = connection.createStatement(); - LOG.info("Executing SQL " + sql); - stmt.execute(sql); - } catch (SQLException e) { - throw new HoodieException("Failed in executing SQL " + sql, e); - } finally { - closeQuietly(null, stmt); - } - } - - @Override - public boolean doesTableExist(String tableName) { - return tableExists(tableName); - } - - @Override - public boolean tableExists(String tableName) { - String sql = consutructShowCreateTableSQL(tableName); - Statement stmt = null; - ResultSet rs = null; - try { - stmt = connection.createStatement(); - rs = stmt.executeQuery(sql); - } catch (SQLException e) { - return false; - } finally { - closeQuietly(rs, stmt); - } - return true; - } - - @Override - public Option getLastCommitTimeSynced(String tableName) { - String sql = consutructShowCreateTableSQL(tableName); - Statement stmt = null; - ResultSet rs = null; - try { - stmt = connection.createStatement(); - rs = stmt.executeQuery(sql); - if (rs.next()) { - String table = rs.getString(2); - Map attr = new HashMap<>(); - int index = table.indexOf(TBL_PROPERTIES_STR); - if (index != -1) { - String sub = table.substring(index + TBL_PROPERTIES_STR.length()); - sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", ""); - String[] str = sub.split(","); - - for (int i = 0; i < str.length; i++) { - String key = str[i].split("=")[0].trim(); - String value = str[i].split("=")[1].trim(); - attr.put(key, value); - } - } - return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null)); - } - return Option.empty(); - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e); - } finally { - closeQuietly(rs, stmt); - } - } - - @Override - public void updateLastCommitTimeSynced(String tableName) { - // TODO : dla do not support update tblproperties, so do nothing. - } - - @Override - public Option getLastReplicatedTime(String tableName) { - // no op; unsupported - return Option.empty(); - } - - @Override - public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { - // no op; unsupported - } - - @Override - public void deleteLastReplicatedTimeStamp(String tableName) { - // no op; unsupported - } - - @Override - public void updatePartitionsToTable(String tableName, List changedPartitions) { - if (changedPartitions.isEmpty()) { - LOG.info("No partitions to change for " + tableName); - return; - } - LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); - List sqls = constructChangePartitions(tableName, changedPartitions); - for (String sql : sqls) { - updateDLASQL(sql); - } - } - - @Override - public void dropPartitions(String tableName, List partitionsToDrop) { - throw new UnsupportedOperationException("Not support dropPartitions yet."); - } - - public Map, String> scanTablePartitions(String tableName) { - String sql = constructShowPartitionSQL(tableName); - Statement stmt = null; - ResultSet rs = null; - Map, String> partitions = new HashMap<>(); - try { - stmt = connection.createStatement(); - LOG.info("Executing SQL " + sql); - rs = stmt.executeQuery(sql); - while (rs.next()) { - if (rs.getMetaData().getColumnCount() > 0) { - String str = rs.getString(1); - if (!StringUtils.isNullOrEmpty(str)) { - List values = partitionValueExtractor.extractPartitionValuesInPath(str); - Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values)); - String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); - partitions.put(values, fullStoragePartitionPath); - } - } - } - return partitions; - } catch (SQLException e) { - throw new HoodieException("Failed in executing SQL " + sql, e); - } finally { - closeQuietly(rs, stmt); - } - } - - public List getPartitionEvents(Map, String> tablePartitions, List partitionStoragePartitions) { - Map paths = new HashMap<>(); - - for (Map.Entry, String> entry : tablePartitions.entrySet()) { - List partitionValues = entry.getKey(); - Collections.sort(partitionValues); - String fullTablePartitionPath = entry.getValue(); - paths.put(String.join(", ", partitionValues), fullTablePartitionPath); - } - List events = new ArrayList<>(); - for (String storagePartition : partitionStoragePartitions) { - Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition); - String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); - // Check if the partition values or if hdfs path is the same - List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (dlaConfig.useDLASyncHiveStylePartitioning) { - String partition = String.join("/", storagePartitionValues); - storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); - fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); - } - Collections.sort(storagePartitionValues); - if (!storagePartitionValues.isEmpty()) { - String storageValue = String.join(", ", storagePartitionValues); - if (!paths.containsKey(storageValue)) { - events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); - } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { - events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); - } - } - } - return events; - } - - public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) { - ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns"); - ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type"); - Map columns = schemaDiff.getAddColumnTypes(); - for (Map.Entry entry : columns.entrySet()) { - String columnName = entry.getKey(); - String columnType = entry.getValue(); - StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER) - .append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".") - .append(DLA_ESCAPE_CHARACTER).append(tableName) - .append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(") - .append(columnName).append(" ").append(columnType).append(" )"); - LOG.info("Updating table definition with " + sqlBuilder); - updateDLASQL(sqlBuilder.toString()); - } - } - - @Override - public void close() { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException e) { - LOG.error("Could not close connection ", e); - } - } - - private String constructShowPartitionSQL(String tableName) { - String sql = "show partitions " + dlaConfig.databaseName + "." + tableName; - return sql; - } - - private String consutructShowCreateTableSQL(String tableName) { - String sql = "show create table " + dlaConfig.databaseName + "." + tableName; - return sql; - } - - private String getDefaultFs() { - return fs.getConf().get("fs.defaultFS"); - } - - private HiveSyncConfig toHiveSyncConfig() { - HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); - hiveSyncConfig.partitionFields = dlaConfig.partitionFields; - hiveSyncConfig.databaseName = dlaConfig.databaseName; - Path basePath = new Path(dlaConfig.basePath); - hiveSyncConfig.basePath = generateAbsolutePathStr(basePath); - return hiveSyncConfig; - } -} diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java deleted file mode 100644 index d1b0dd4e9d56..000000000000 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/util/Utils.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.dla.util; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.dla.DLASyncConfig; - -import java.util.ArrayList; -import java.util.Arrays; - -public class Utils { - public static String DLA_DATABASE_OPT_KEY = "hoodie.datasource.dla_sync.database"; - public static String DLA_TABLE_OPT_KEY = "hoodie.datasource.dla_sync.table"; - public static String DLA_USER_OPT_KEY = "hoodie.datasource.dla_sync.username"; - public static String DLA_PASS_OPT_KEY = "hoodie.datasource.dla_sync.password"; - public static String DLA_URL_OPT_KEY = "hoodie.datasource.dla_sync.jdbcurl"; - public static String BATH_PATH = "basePath"; - public static String DLA_PARTITION_FIELDS_OPT_KEY = "hoodie.datasource.dla_sync.partition_fields"; - public static String DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY = "hoodie.datasource.dla_sync.partition_extractor_class"; - public static String DLA_ASSUME_DATE_PARTITIONING = "hoodie.datasource.dla_sync.assume_date_partitioning"; - public static String DLA_SKIP_RO_SUFFIX = "hoodie.datasource.dla_sync.skip_ro_suffix"; - public static String DLA_SKIP_RT_SYNC = "hoodie.datasource.dla_sync.skip_rt_sync"; - public static String DLA_SYNC_HIVE_STYLE_PARTITIONING = "hoodie.datasource.dla_sync.hive.style.partitioning"; - - public static TypedProperties configToProperties(DLASyncConfig cfg) { - TypedProperties properties = new TypedProperties(); - properties.put(DLA_DATABASE_OPT_KEY, cfg.databaseName); - properties.put(DLA_TABLE_OPT_KEY, cfg.tableName); - properties.put(DLA_USER_OPT_KEY, cfg.dlaUser); - properties.put(DLA_PASS_OPT_KEY, cfg.dlaPass); - properties.put(DLA_URL_OPT_KEY, cfg.jdbcUrl); - properties.put(BATH_PATH, cfg.basePath); - properties.put(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY, cfg.partitionValueExtractorClass); - properties.put(DLA_ASSUME_DATE_PARTITIONING, String.valueOf(cfg.assumeDatePartitioning)); - properties.put(DLA_SKIP_RO_SUFFIX, String.valueOf(cfg.skipROSuffix)); - properties.put(DLA_SYNC_HIVE_STYLE_PARTITIONING, String.valueOf(cfg.useDLASyncHiveStylePartitioning)); - return properties; - } - - public static DLASyncConfig propertiesToConfig(TypedProperties properties) { - DLASyncConfig config = new DLASyncConfig(); - config.databaseName = properties.getProperty(DLA_DATABASE_OPT_KEY); - config.tableName = properties.getProperty(DLA_TABLE_OPT_KEY); - config.dlaUser = properties.getProperty(DLA_USER_OPT_KEY); - config.dlaPass = properties.getProperty(DLA_PASS_OPT_KEY); - config.jdbcUrl = properties.getProperty(DLA_URL_OPT_KEY); - config.basePath = properties.getProperty(BATH_PATH); - if (StringUtils.isNullOrEmpty(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY))) { - config.partitionFields = new ArrayList<>(); - } else { - config.partitionFields = Arrays.asList(properties.getProperty(DLA_PARTITION_FIELDS_OPT_KEY).split(",")); - } - config.partitionValueExtractorClass = properties.getProperty(DLA_PARTITION_EXTRACTOR_CLASS_OPT_KEY); - config.assumeDatePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_ASSUME_DATE_PARTITIONING, "false")); - config.skipROSuffix = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RO_SUFFIX, "false")); - config.skipRTSync = Boolean.parseBoolean(properties.getProperty(DLA_SKIP_RT_SYNC, "false")); - config.useDLASyncHiveStylePartitioning = Boolean.parseBoolean(properties.getProperty(DLA_SYNC_HIVE_STYLE_PARTITIONING, "false")); - return config; - } -} diff --git a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java b/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java deleted file mode 100644 index 366d5a24efb0..000000000000 --- a/hudi-sync/hudi-dla-sync/src/test/java/org/apache/hudi/dla/TestDLASyncConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.dla; - -import org.junit.jupiter.api.Test; -import java.util.Arrays; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestDLASyncConfig { - @Test - public void testCopy() { - DLASyncConfig dlaSyncConfig = new DLASyncConfig(); - List partitions = Arrays.asList("a", "b"); - dlaSyncConfig.partitionFields = partitions; - dlaSyncConfig.basePath = "/tmp"; - dlaSyncConfig.assumeDatePartitioning = true; - dlaSyncConfig.databaseName = "test"; - dlaSyncConfig.tableName = "test"; - dlaSyncConfig.dlaUser = "dla"; - dlaSyncConfig.dlaPass = "dla"; - dlaSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306"; - dlaSyncConfig.skipROSuffix = false; - - DLASyncConfig copied = DLASyncConfig.copy(dlaSyncConfig); - - assertEquals(copied.partitionFields, dlaSyncConfig.partitionFields); - assertEquals(copied.basePath, dlaSyncConfig.basePath); - assertEquals(copied.assumeDatePartitioning, dlaSyncConfig.assumeDatePartitioning); - assertEquals(copied.databaseName, dlaSyncConfig.databaseName); - assertEquals(copied.tableName, dlaSyncConfig.tableName); - assertEquals(copied.dlaUser, dlaSyncConfig.dlaUser); - assertEquals(copied.dlaPass, dlaSyncConfig.dlaPass); - assertEquals(copied.basePath, dlaSyncConfig.basePath); - assertEquals(copied.jdbcUrl, dlaSyncConfig.jdbcUrl); - assertEquals(copied.skipROSuffix, dlaSyncConfig.skipROSuffix); - assertEquals(copied.supportTimestamp, dlaSyncConfig.supportTimestamp); - } -} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 939fc114c088..5e343b9a62a0 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -27,9 +27,8 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.sync.common.util.ConfigUtils; import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; @@ -43,20 +42,13 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.parquet.schema.OriginalType.UTF8; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; - /** * Tool to sync a hoodie HDFS table with a hive metastore table. Either use it as a api * HiveSyncTool.syncHoodieTable(HiveSyncConfig) or as a command line java -cp hoodie-hive-sync.jar HiveSyncTool [args] @@ -248,8 +240,9 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea Map tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties); Map serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties); if (hiveSyncConfig.syncAsSparkDataSourceTable) { - Map sparkTableProperties = getSparkTableProperties(hiveSyncConfig.sparkSchemaLengthThreshold, schema); - Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized); + Map sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields, + hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema); + Map sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath); tableProperties.putAll(sparkTableProperties); serdeProperties.putAll(sparkSerdeProperties); } @@ -309,75 +302,6 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea return schemaChanged; } - /** - * Get Spark Sql related table properties. This is used for spark datasource table. - * @param schema The schema to write to the table. - * @return A new parameters added the spark's table properties. - */ - private Map getSparkTableProperties(int schemaLengthThreshold, MessageType schema) { - // Convert the schema and partition info used by spark sql to hive table properties. - // The following code refers to the spark code in - // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala - GroupType originGroupType = schema.asGroupType(); - List partitionNames = hiveSyncConfig.partitionFields; - List partitionCols = new ArrayList<>(); - List dataCols = new ArrayList<>(); - Map column2Field = new HashMap<>(); - - for (Type field : originGroupType.getFields()) { - column2Field.put(field.getName(), field); - } - // Get partition columns and data columns. - for (String partitionName : partitionNames) { - // Default the unknown partition fields to be String. - // Keep the same logical with HiveSchemaUtil#getPartitionKeyType. - partitionCols.add(column2Field.getOrDefault(partitionName, - new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8))); - } - - for (Type field : originGroupType.getFields()) { - if (!partitionNames.contains(field.getName())) { - dataCols.add(field); - } - } - - List reOrderedFields = new ArrayList<>(); - reOrderedFields.addAll(dataCols); - reOrderedFields.addAll(partitionCols); - GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields); - - Map sparkProperties = new HashMap<>(); - sparkProperties.put("spark.sql.sources.provider", "hudi"); - if (!StringUtils.isNullOrEmpty(hiveSyncConfig.sparkVersion)) { - sparkProperties.put("spark.sql.create.version", hiveSyncConfig.sparkVersion); - } - // Split the schema string to multi-parts according the schemaLengthThreshold size. - String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); - int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; - sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); - // Add each part of schema string to sparkProperties - for (int i = 0; i < numSchemaPart; i++) { - int start = i * schemaLengthThreshold; - int end = Math.min(start + schemaLengthThreshold, schemaString.length()); - sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end)); - } - // Add partition columns - if (!partitionNames.isEmpty()) { - sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size())); - for (int i = 0; i < partitionNames.size(); i++) { - sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i)); - } - } - return sparkProperties; - } - - private Map getSparkSerdeProperties(boolean readAsOptimized) { - Map sparkSerdeProperties = new HashMap<>(); - sparkSerdeProperties.put("path", hiveSyncConfig.basePath); - sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized)); - return sparkSerdeProperties; - } - /** * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1c2d53ed96de..b801f4d7daa1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.hive.testutils.HiveTestUtil; -import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.sync.common.util.ConfigUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java index 3ca31b04395a..b6940629af3d 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils; +import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils; import org.apache.spark.sql.execution.SparkSqlParser; import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter; import org.apache.spark.sql.internal.SQLConf; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java index 680b4a17ef5d..972ae1f96c51 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java @@ -18,12 +18,26 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.sync.common.util.ConfigUtils; +import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Properties; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + /** * Base class to sync Hudi meta data with Metastores to make * Hudi table queryable through external systems. @@ -46,4 +60,72 @@ public AbstractSyncTool(Properties props, FileSystem fileSystem) { public abstract void syncHoodieTable(); + /** + * Get Spark Sql related table properties. This is used for spark datasource table. + * @param schema The schema to write to the table. + * @return A new parameters added the spark's table properties. + */ + protected Map getSparkTableProperties(List partitionNames, String sparkVersion, + int schemaLengthThreshold, MessageType schema) { + // Convert the schema and partition info used by spark sql to hive table properties. + // The following code refers to the spark code in + // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala + GroupType originGroupType = schema.asGroupType(); + List partitionCols = new ArrayList<>(); + List dataCols = new ArrayList<>(); + Map column2Field = new HashMap<>(); + + for (Type field : originGroupType.getFields()) { + column2Field.put(field.getName(), field); + } + // Get partition columns and data columns. + for (String partitionName : partitionNames) { + // Default the unknown partition fields to be String. + // Keep the same logical with HiveSchemaUtil#getPartitionKeyType. + partitionCols.add(column2Field.getOrDefault(partitionName, + new PrimitiveType(Type.Repetition.REQUIRED, BINARY, partitionName, UTF8))); + } + + for (Type field : originGroupType.getFields()) { + if (!partitionNames.contains(field.getName())) { + dataCols.add(field); + } + } + + List reOrderedFields = new ArrayList<>(); + reOrderedFields.addAll(dataCols); + reOrderedFields.addAll(partitionCols); + GroupType reOrderedType = new GroupType(originGroupType.getRepetition(), originGroupType.getName(), reOrderedFields); + + Map sparkProperties = new HashMap<>(); + sparkProperties.put("spark.sql.sources.provider", "hudi"); + if (!StringUtils.isNullOrEmpty(sparkVersion)) { + sparkProperties.put("spark.sql.create.version", sparkVersion); + } + // Split the schema string to multi-parts according the schemaLengthThreshold size. + String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); + int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold; + sparkProperties.put("spark.sql.sources.schema.numParts", String.valueOf(numSchemaPart)); + // Add each part of schema string to sparkProperties + for (int i = 0; i < numSchemaPart; i++) { + int start = i * schemaLengthThreshold; + int end = Math.min(start + schemaLengthThreshold, schemaString.length()); + sparkProperties.put("spark.sql.sources.schema.part." + i, schemaString.substring(start, end)); + } + // Add partition columns + if (!partitionNames.isEmpty()) { + sparkProperties.put("spark.sql.sources.schema.numPartCols", String.valueOf(partitionNames.size())); + for (int i = 0; i < partitionNames.size(); i++) { + sparkProperties.put("spark.sql.sources.schema.partCol." + i, partitionNames.get(i)); + } + } + return sparkProperties; + } + + protected Map getSparkSerdeProperties(boolean readAsOptimized, String basePath) { + Map sparkSerdeProperties = new HashMap<>(); + sparkSerdeProperties.put("path", basePath); + sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized)); + return sparkSerdeProperties; + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java similarity index 98% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java index 94ebdaadd8ff..ca5224aef469 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ConfigUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.hive.util; +package org.apache.hudi.sync.common.util; import java.util.HashMap; import java.util.Map; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java similarity index 99% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java index debc262b5518..c5b98c17eb4a 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/Parquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/Parquet2SparkSchemaUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.hive.util; +package org.apache.hudi.sync.common.util; import org.apache.hudi.common.util.ValidationUtils; import org.apache.parquet.schema.GroupType; diff --git a/hudi-sync/pom.xml b/hudi-sync/pom.xml index 0ee145418f5e..ffcbac8a652e 100644 --- a/hudi-sync/pom.xml +++ b/hudi-sync/pom.xml @@ -32,7 +32,7 @@ hudi-datahub-sync - hudi-dla-sync + hudi-adb-sync hudi-hive-sync hudi-sync-common