From ce42133e1f8fc30b2c0ff467921887425263b91c Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 27 Jul 2022 17:37:51 +0900 Subject: [PATCH] Add support for migrate procedure in Iceberg --- docs/src/main/sphinx/connector/iceberg.rst | 14 +- .../io/trino/plugin/hive/HiveMetadata.java | 8 +- plugin/trino-iceberg/pom.xml | 11 +- .../AbstractIcebergTableOperations.java | 13 +- .../IcebergFileMetastoreCatalogModule.java | 6 + .../glue/IcebergGlueCatalogModule.java | 23 +- .../IcebergHiveMetastoreCatalogModule.java | 6 + .../iceberg/procedure/MigrateProcedure.java | 386 ++++++++++++++++++ .../iceberg/TestIcebergMigrateProcedure.java | 309 ++++++++++++++ .../iceberg/TestIcebergProcedureCalls.java | 113 +++++ 10 files changed, 876 insertions(+), 13 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 1e4e73215d7a..6a095a721509 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -933,8 +933,18 @@ Migrating existing tables ------------------------- The connector can read from or write to Hive tables that have been migrated to Iceberg. -There is no Trino support for migrating Hive tables to Iceberg, so you need to either use -the Iceberg API or Apache Spark. +An SQL procedure ``system.migrate`` allows the caller to replace +a Hive table with an Iceberg table, loaded with the source’s data files. +Table schema, partitioning, properties, and location will be copied from the source table. +Migrate will fail if any table partition uses an unsupported format:: + + CALL iceberg.system.migrate(schema_name => 'testdb', table_name => 'customer_orders') + +In addition, you can provide a ``recursive_directory`` argument to migrate the table with recursive directories. +The possible values are ``true``, ``false`` and ``fail``. The default value is ``fail`` that throws an exception +if nested directory exists:: + + CALL iceberg.system.migrate(schema_name => 'testdb', table_name => 'customer_orders', recursive_directory => 'true') .. _iceberg-table-properties: diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index bb2b193592d7..88e1ef2e19fa 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -346,7 +346,7 @@ public class HiveMetadata public static final String BUCKETING_VERSION = "bucketing_version"; public static final String TABLE_COMMENT = "comment"; public static final String STORAGE_TABLE = "storage_table"; - private static final String TRANSACTIONAL = "transactional"; + public static final String TRANSACTIONAL = "transactional"; public static final String PRESTO_VIEW_COMMENT = "Presto View"; public static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = "/* Presto View */"; @@ -3463,7 +3463,11 @@ public List listTablePrivileges(ConnectorSession session, SchemaTable private static HiveStorageFormat extractHiveStorageFormat(Table table) { - StorageFormat storageFormat = table.getStorage().getStorageFormat(); + return extractHiveStorageFormat(table.getStorage().getStorageFormat()); + } + + public static HiveStorageFormat extractHiveStorageFormat(StorageFormat storageFormat) + { String outputFormat = storageFormat.getOutputFormat(); String serde = storageFormat.getSerde(); diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index a9e765727d73..108be1711811 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -211,6 +211,11 @@ iceberg-core + + org.apache.iceberg + iceberg-orc + + org.apache.iceberg iceberg-parquet @@ -290,12 +295,6 @@ runtime - - org.apache.iceberg - iceberg-orc - runtime - - org.jetbrains annotations diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index ea217d6ad8b9..b35b623318ef 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -47,10 +47,13 @@ import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; +import static io.trino.plugin.iceberg.procedure.MigrateProcedure.PROVIDER_PROPERTY_KEY; +import static io.trino.plugin.iceberg.procedure.MigrateProcedure.PROVIDER_PROPERTY_VALUE; import static java.lang.String.format; import static java.time.temporal.ChronoUnit.MILLIS; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION; import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; @@ -145,7 +148,15 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) } if (base == null) { - commitNewTable(metadata); + if (PROVIDER_PROPERTY_VALUE.equals(metadata.properties().get(PROVIDER_PROPERTY_KEY))) { + // Assume this is a table executing migrate procedure + version = OptionalInt.of(0); + currentMetadataLocation = metadata.properties().get(METADATA_LOCATION_PROP); + commitToExistingTable(base, metadata); + } + else { + commitNewTable(metadata); + } } else { commitToExistingTable(base, metadata); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java index bd865e4190fa..a8dd62765d7c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java @@ -16,6 +16,7 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.units.Duration; import io.trino.plugin.hive.HideDeltaLakeTables; @@ -26,9 +27,12 @@ import io.trino.plugin.iceberg.catalog.MetastoreValidator; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; +import io.trino.plugin.iceberg.procedure.MigrateProcedure; +import io.trino.spi.procedure.Procedure; import java.util.concurrent.TimeUnit; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule.HIDE_DELTA_LAKE_TABLES_IN_ICEBERG; @@ -49,5 +53,7 @@ protected void setup(Binder binder) // ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg config.setStatsCacheTtl(new Duration(0, TimeUnit.SECONDS)); }); + Multibinder procedures = newSetBinder(binder, Procedure.class); + procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java index bde325210df1..38e9b4d0edd9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java @@ -14,16 +14,28 @@ package io.trino.plugin.iceberg.catalog.glue; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.Table; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.HideDeltaLakeTables; +import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore; import io.trino.plugin.hive.metastore.glue.GlueCredentialsProvider; import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.procedure.MigrateProcedure; +import io.trino.spi.procedure.Procedure; +import java.util.function.Predicate; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static org.weakref.jmx.guice.ExportBinder.newExporter; @@ -36,11 +48,18 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(GlueHiveMetastoreConfig.class); configBinder(binder).bindConfig(IcebergGlueCatalogConfig.class); binder.bind(GlueMetastoreStats.class).in(Scopes.SINGLETON); - binder.bind(AWSGlueAsync.class).toProvider(GlueClientProvider.class).in(Scopes.SINGLETON); newExporter(binder).export(GlueMetastoreStats.class).withGeneratedName(); binder.bind(AWSCredentialsProvider.class).toProvider(GlueCredentialsProvider.class).in(Scopes.SINGLETON); binder.bind(IcebergTableOperationsProvider.class).to(GlueIcebergTableOperationsProvider.class).in(Scopes.SINGLETON); binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON); newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName(); + + // Required to inject HiveMetastoreFactory for migrate procedure + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); + newOptionalBinder(binder, Key.get(new TypeLiteral>() {}, ForGlueHiveMetastore.class)) + .setBinding().toInstance(table -> true); + install(new GlueMetastoreModule()); + Multibinder procedures = newSetBinder(binder, Procedure.class); + procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java index 3dced4066341..c40dfa4e6c60 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -16,6 +16,7 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.units.Duration; import io.trino.plugin.hive.HideDeltaLakeTables; @@ -26,9 +27,12 @@ import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.MetastoreValidator; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.procedure.MigrateProcedure; +import io.trino.spi.procedure.Procedure; import java.util.concurrent.TimeUnit; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; public class IcebergHiveMetastoreCatalogModule @@ -51,5 +55,7 @@ protected void setup(Binder binder) // ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg config.setStatsCacheTtl(new Duration(0, TimeUnit.SECONDS)); }); + Multibinder procedures = newSetBinder(binder, Procedure.class); + procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java new file mode 100644 index 000000000000..ea0d167afcdd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java @@ -0,0 +1,386 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.base.Enums; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.hive.HiveStorageFormat; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.Partition; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory; +import io.trino.plugin.hive.metastore.Storage; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergSecurityConfig; +import io.trino.plugin.iceberg.PartitionData; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.fileio.ForwardingInputFile; +import io.trino.spi.TrinoException; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.procedure.Procedure; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.orc.OrcMetrics; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Streams.concat; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL; +import static io.trino.plugin.hive.HiveMetadata.extractHiveStorageFormat; +import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; +import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable; +import static io.trino.plugin.hive.util.HiveUtil.isHudiTable; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM; +import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; +import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn; +import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.Boolean.parseBoolean; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.hive.metastore.TableType.MANAGED_TABLE; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.iceberg.SortOrder.unsorted; +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.mapping.NameMappingParser.toJson; + +public class MigrateProcedure + implements Provider +{ + private static final Logger log = Logger.get(MigrateProcedure.class); + + public static final String PROVIDER_PROPERTY_KEY = "provider"; + public static final String PROVIDER_PROPERTY_VALUE = "iceberg"; + private static final MetricsConfig METRICS_CONFIG = MetricsConfig.getDefault(); + + private final TrinoCatalogFactory catalogFactory; + private final HiveMetastoreFactory metastoreFactory; + private final TrinoFileSystemFactory fileSystemFactory; + private final TypeManager typeManager; + private final int formatVersion; + private final boolean isUsingSystemSecurity; + + private enum RecursiveDirectory + { + TRUE, + FALSE, + FAIL, + /**/ + } + + private static final MethodHandle MIGRATE; + + static { + try { + MIGRATE = lookup().unreflect(MigrateProcedure.class.getMethod("migrate", ConnectorSession.class, String.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + @Inject + public MigrateProcedure( + TrinoCatalogFactory catalogFactory, + @RawHiveMetastoreFactory HiveMetastoreFactory metastoreFactory, + TrinoFileSystemFactory fileSystemFactory, + TypeManager typeManager, + IcebergConfig icebergConfig, + IcebergSecurityConfig securityConfig) + { + this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.formatVersion = icebergConfig.getFormatVersion(); + this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM; + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "migrate", + ImmutableList.of( + new Procedure.Argument("SCHEMA_NAME", VARCHAR), + new Procedure.Argument("TABLE_NAME", VARCHAR), + new Procedure.Argument("RECURSIVE_DIRECTORY", VARCHAR, false, utf8Slice("fail"))), + MIGRATE.bindTo(this)); + } + + public void migrate(ConnectorSession session, String schemaName, String tableName, String recursiveDirectory) + { + // this line guarantees that classLoader that we stored in the field will be used inside try/catch + // as we captured reference to PluginClassLoader during initialization of this class + // we can use it now to correctly execute the procedure + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doMigrate(session, schemaName, tableName, recursiveDirectory); + } + } + + public void doMigrate(ConnectorSession session, String schemaName, String tableName, String recursiveDirectory) + { + SchemaTableName sourceTableName = new SchemaTableName(schemaName, tableName); + TrinoCatalog catalog = catalogFactory.create(session.getIdentity()); + HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity())); + RecursiveDirectory recursive = Enums.getIfPresent(RecursiveDirectory.class, recursiveDirectory.toUpperCase(ENGLISH)).toJavaUtil() + .orElseThrow(() -> new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Invalid recursive_directory: " + recursiveDirectory)); + + io.trino.plugin.hive.metastore.Table hiveTable = metastore.getTable(schemaName, tableName).orElseThrow(() -> new TableNotFoundException(sourceTableName)); + String transactionalProperty = hiveTable.getParameters().get(TRANSACTIONAL); + if (parseBoolean(transactionalProperty)) { + throw new TrinoException(NOT_SUPPORTED, "Migrating transactional tables is unsupported"); + } + if (!hiveTable.getTableType().equalsIgnoreCase(MANAGED_TABLE.name())) { + throw new TrinoException(NOT_SUPPORTED, "The procedure supports migrating only managed tables: " + hiveTable.getTableType()); + } + if (isDeltaLakeTable(hiveTable)) { + throw new TrinoException(NOT_SUPPORTED, "The procedure doesn't support migrating Delta Lake tables"); + } + if (isHudiTable(hiveTable)) { + throw new TrinoException(NOT_SUPPORTED, "The procedure doesn't support migrating Hudi tables"); + } + if (isIcebergTable(hiveTable)) { + throw new TrinoException(NOT_SUPPORTED, "The table is already an Iceberg table"); + } + + Schema schema = toIcebergSchema(concat(hiveTable.getDataColumns().stream(), hiveTable.getPartitionColumns().stream()).toList()); + NameMapping nameMapping = MappingUtil.create(schema); + HiveStorageFormat storageFormat = extractHiveStorageFormat(hiveTable.getStorage().getStorageFormat()); + String location = hiveTable.getStorage().getLocation(); + + Map properties = icebergTableProperties(location, hiveTable.getParameters(), nameMapping); + PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitionColumnNames(hiveTable)); + try { + ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); + if (hiveTable.getPartitionColumns().isEmpty()) { + log.debug("Building data files from %s", location); + dataFilesBuilder.addAll(buildDataFiles(session, recursive, storageFormat, location, partitionSpec, new PartitionData(new Object[]{}), nameMapping)); + } + else { + Map> partitions = listAllPartitions(metastore, hiveTable); + int fileCount = 1; + for (Map.Entry> partition : partitions.entrySet()) { + Storage storage = partition.getValue().orElseThrow().getStorage(); + log.debug("Building data files from '%s' for partition %d of %d", storage.getLocation(), fileCount++, partitions.size()); + HiveStorageFormat partitionStorageFormat = extractHiveStorageFormat(storage.getStorageFormat()); + StructLike partitionData = DataFiles.data(partitionSpec, partition.getKey()); + dataFilesBuilder.addAll(buildDataFiles(session, recursive, partitionStorageFormat, storage.getLocation(), partitionSpec, partitionData, nameMapping)); + } + } + + log.debug("Start new transaction"); + Transaction transaction = catalog.newCreateTableTransaction( + session, + sourceTableName, + schema, + parsePartitionFields(schema, toPartitionFields(hiveTable)), + unsorted(), + location, + properties); + + List dataFiles = dataFilesBuilder.build(); + log.debug("Append data %d data files", dataFiles.size()); + Table table = transaction.table(); + AppendFiles append = table.newAppend(); + dataFiles.forEach(append::appendFile); + append.commit(); + + log.debug("Set preparatory table properties in a metastore for migrations"); + PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser()); + io.trino.plugin.hive.metastore.Table newTable = io.trino.plugin.hive.metastore.Table.builder(hiveTable) + .setParameter(METADATA_LOCATION_PROP, location) + .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH)) + .build(); + metastore.replaceTable(schemaName, tableName, newTable, principalPrivileges); + + transaction.commitTransaction(); + log.debug("Successfully migrated %s table to Iceberg format", sourceTableName); + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to migrate table", e); + } + } + + private Map icebergTableProperties(String location, Map hiveTableProperties, NameMapping nameMapping) + { + Map icebergTableProperties = new HashMap<>(); + + // Copy all Hive table properties so that we can undo the migration easily. This is same as Spark implementation. + icebergTableProperties.putAll(hiveTableProperties); + icebergTableProperties.remove("path"); + icebergTableProperties.remove("transient_lastDdlTime"); + icebergTableProperties.remove("serialization.format"); + + icebergTableProperties.put("migrated", "true"); + icebergTableProperties.putIfAbsent("location", location); + icebergTableProperties.put(PROVIDER_PROPERTY_KEY, PROVIDER_PROPERTY_VALUE); + icebergTableProperties.put(METADATA_LOCATION_PROP, location); + icebergTableProperties.put(DEFAULT_NAME_MAPPING, toJson(nameMapping)); + icebergTableProperties.put(FORMAT_VERSION, String.valueOf(formatVersion)); + + return ImmutableMap.copyOf(icebergTableProperties); + } + + private Schema toIcebergSchema(List columns) + { + AtomicInteger nextFieldId = new AtomicInteger(1); + List icebergColumns = new ArrayList<>(); + for (Column column : columns) { + int index = icebergColumns.size(); + org.apache.iceberg.types.Type type = toIcebergTypeForNewColumn(typeManager.getType(column.getType().getTypeSignature()), nextFieldId); + Types.NestedField field = Types.NestedField.of(index, false, column.getName(), type, column.getComment().orElse(null)); + icebergColumns.add(field); + } + org.apache.iceberg.types.Type icebergSchema = Types.StructType.of(icebergColumns); + icebergSchema = TypeUtil.assignFreshIds(icebergSchema, nextFieldId::getAndIncrement); + return new Schema(icebergSchema.asStructType().fields()); + } + + public Map> listAllPartitions(HiveMetastore metastore, io.trino.plugin.hive.metastore.Table table) + { + List partitionNames = table.getPartitionColumns().stream().map(Column::getName).collect(toImmutableList()); + Optional> partitions = metastore.getPartitionNamesByFilter(table.getDatabaseName(), table.getTableName(), partitionNames, TupleDomain.all()); + if (partitions.isEmpty()) { + return ImmutableMap.of(); + } + return metastore.getPartitionsByNames(table, partitions.get()); + } + + private List buildDataFiles(ConnectorSession session, RecursiveDirectory recursive, HiveStorageFormat format, String location, PartitionSpec partitionSpec, StructLike partition, NameMapping nameMapping) + throws IOException + { + // TODO: Introduce parallelism + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + FileIterator files = fileSystem.listFiles(location); + ImmutableList.Builder dataFilesBuilder = ImmutableList.builder(); + while (files.hasNext()) { + FileEntry file = files.next(); + if (file.location().contains("/_") || file.location().contains("/.")) { + continue; + } + if (recursive == RecursiveDirectory.FALSE && isRecursive(location, file.location())) { + continue; + } + else if (recursive == RecursiveDirectory.FAIL && isRecursive(location, file.location())) { + throw new TrinoException(NOT_SUPPORTED, "Recursive directory must not exist when recursive_directory argument is 'fail': " + file.location()); + } + + Metrics metrics = loadMetrics(fileSystem, format, file.location(), nameMapping); + DataFile dataFile = buildDataFile(file, partition, partitionSpec, format.name(), metrics); + dataFilesBuilder.add(dataFile); + } + List dataFiles = dataFilesBuilder.build(); + log.debug("Found %d files in '%s'", dataFiles.size(), location); + return dataFiles; + } + + private static boolean isRecursive(String baseLocation, String location) + { + verify(location.startsWith(baseLocation), "%s should start with %s", location, baseLocation); + String suffix = location.substring(baseLocation.length() + 1).replaceFirst("^/+", ""); + return suffix.contains("/"); + } + + private Metrics loadMetrics(TrinoFileSystem fileSystem, HiveStorageFormat storageFormat, String path, NameMapping nameMapping) + { + InputFile inputFile = new ForwardingInputFile(fileSystem.newInputFile(path)); + return switch (storageFormat) { + case ORC -> OrcMetrics.fromInputFile(inputFile, METRICS_CONFIG, nameMapping); + case PARQUET -> ParquetUtil.fileMetrics(inputFile, METRICS_CONFIG, nameMapping); + case AVRO -> new Metrics(Avro.rowCount(inputFile), null, null, null, null); + default -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage format: " + storageFormat); + }; + } + + private static List toPartitionFields(io.trino.plugin.hive.metastore.Table table) + { + ImmutableList.Builder fields = ImmutableList.builder(); + fields.addAll(getPartitionColumnNames(table)); + table.getStorage().getBucketProperty() + .ifPresent(bucket -> { + throw new TrinoException(NOT_SUPPORTED, "Cannot migrate bucketed table: " + bucket.getBucketedBy()); + }); + return fields.build(); + } + + private static List getPartitionColumnNames(io.trino.plugin.hive.metastore.Table table) + { + return table.getPartitionColumns().stream() + .map(Column::getName) + .collect(toImmutableList()); + } + + private static DataFile buildDataFile(FileEntry file, StructLike partition, PartitionSpec spec, String format, Metrics metrics) + { + return DataFiles.builder(spec) + .withPath(file.location()) + .withFormat(format) + .withFileSizeInBytes(file.length()) + .withMetrics(metrics) + .withPartition(partition) + .build(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java new file mode 100644 index 000000000000..6dd83af21e3e --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java @@ -0,0 +1,309 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestIcebergMigrateProcedure + extends AbstractTestQueryFramework +{ + private Path dataDirectory; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder().build(); + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"); + queryRunner.installPlugin(new TestingHivePlugin()); + queryRunner.createCatalog("hive", "hive", ImmutableMap.builder() + .put("hive.metastore", "file") + .put("hive.metastore.catalog.dir", dataDirectory.toString()) + .put("hive.security", "allow-all") + .buildOrThrow()); + return queryRunner; + } + + @Test(dataProvider = "fileFormats") + public void testMigrateTable(IcebergFileFormat fileFormat) + { + String tableName = "test_migrate_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " WITH (format='" + fileFormat + "') AS SELECT 1 x", 1); + assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*"); + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES 1"); + assertQuery("SELECT count(*) FROM " + icebergTableName, "VALUES 1"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES (2)", 1); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1), (2)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @DataProvider + public static Object[][] fileFormats() + { + return Stream.of(IcebergFileFormat.values()) + .map(fileFormat -> new Object[] {fileFormat}) + .toArray(Object[][]::new); + } + + @Test + public void testMigratePartitionedTable() + { + String tableName = "test_migrate_partitioned_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part_col']) AS SELECT 1 id, 'part1' part_col", 1); + assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*"); + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'part1')"); + + // Make sure partition column is preserved + assertThat(query("SELECT partition FROM iceberg.tpch.\"" + tableName + "$partitions\"")) + .skippingTypesCheck() + .matches("SELECT CAST(row('part1') AS row(part_col varchar))"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES (2, 'part2')", 1); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1, 'part1'), (2, 'part2')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMigrateTableWithRecursiveDirectory() + throws Exception + { + String tableName = "test_migrate_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT 1 x", 1); + + // Copy a file to nested directory + Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName)); + Path nestedDirectory = tableLocation.resolve("nested"); + try (Stream files = Files.list(tableLocation)) { + Path file = files.filter(path -> !path.getFileName().toString().startsWith(".")).collect(onlyElement()); + Files.createDirectory(nestedDirectory); + Files.copy(file, nestedDirectory.resolve(file.getFileName())); + } + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'true')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1), (1)"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES (2)", 1); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1), (1), (2)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMigrateTableWithoutRecursiveDirectory() + throws Exception + { + String tableName = "test_migrate_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT 1 x", 1); + + // Copy a file to nested directory + Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName)); + Path nestedDirectory = tableLocation.resolve("nested"); + try (Stream files = Files.list(tableLocation)) { + Path file = files.filter(path -> !path.getFileName().toString().startsWith(".")).collect(onlyElement()); + Files.createDirectory(nestedDirectory); + Files.copy(file, nestedDirectory.resolve(file.getFileName())); + } + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'false')"); + + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1)"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES (2)", 1); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (1), (2)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testMigrateTableFailRecursiveDirectory() + throws Exception + { + String tableName = "test_migrate_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT 1 x", 1); + + // Copy a file to nested directory + Path tableLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableName)); + Path nestedDirectory = tableLocation.resolve("nested"); + try (Stream files = Files.list(tableLocation)) { + Path file = files.filter(path -> !path.getFileName().toString().startsWith(".")).collect(onlyElement()); + Files.createDirectory(nestedDirectory); + Files.copy(file, nestedDirectory.resolve(file.getFileName())); + } + + // The default and explicit 'fail' mode should throw an exception when nested directory exists + assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "')", "Failed to migrate table"); + assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'fail')", "Failed to migrate table"); + + assertQuery("SELECT * FROM " + hiveTableName, "VALUES (1)"); + + assertUpdate("DROP TABLE " + hiveTableName); + } + + @Test + public void testMigrateTablePreserveComments() + { + String tableName = "test_migrate_comments_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + "(col int COMMENT 'column comment') COMMENT 'table comment'"); + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertEquals(getTableComment(tableName), "table comment"); + assertEquals(getColumnComment(tableName, "col"), "column comment"); + + assertUpdate("DROP TABLE " + tableName); + } + + private String getTableComment(String tableName) + { + return (String) computeScalar("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'iceberg' AND schema_name = 'tpch' AND table_name = '" + tableName + "'"); + } + + private String getColumnComment(String tableName, String columnName) + { + return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_catalog = 'iceberg' AND table_schema = 'tpch' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + } + + @Test + public void testMigrateUnsupportedColumnType() + { + String tableName = "test_migrate_unsupported_column_type_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT timestamp '2021-01-01 00:00:00.000' x", 1); + + assertQueryFails( + "CALL iceberg.system.migrate('tpch', '" + tableName + "')", + "\\QTimestamp precision (3) not supported for Iceberg. Use \"timestamp(6)\" instead."); + + assertQuery("SELECT * FROM " + hiveTableName, "VALUES timestamp '2021-01-01 00:00:00.000'"); + assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*"); + + assertUpdate("DROP TABLE " + hiveTableName); + } + + @Test + public void testMigrateUnsupportedTableFormat() + { + String tableName = "test_migrate_unsupported_table_format_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " WITH (format = 'RCBINARY') AS SELECT 1 x", 1); + + assertThatThrownBy(() -> query("CALL iceberg.system.migrate('tpch', '" + tableName + "')")) + .hasStackTraceContaining("Unsupported storage format: RCBINARY"); + + assertQuery("SELECT * FROM " + hiveTableName, "VALUES 1"); + assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*"); + + assertUpdate("DROP TABLE " + hiveTableName); + } + + @Test + public void testMigrateUnsupportedBucketedTable() + { + String tableName = "test_migrate_unsupported_bucketed_table_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part'], bucketed_by = ARRAY['bucket'], bucket_count = 10) AS SELECT 1 bucket, 'test' part", 1); + + assertThatThrownBy(() -> query("CALL iceberg.system.migrate('tpch', '" + tableName + "')")) + .hasStackTraceContaining("Cannot migrate bucketed table: [bucket]"); + + assertQuery("SELECT * FROM " + hiveTableName, "VALUES (1, 'test')"); + assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*"); + + assertUpdate("DROP TABLE " + hiveTableName); + } + + @Test + public void testMigrateUnsupportedTableType() + { + String viewName = "test_migrate_unsupported_table_type_" + randomNameSuffix(); + String trinoViewInHive = "hive.tpch." + viewName; + String trinoViewInIceberg = "iceberg.tpch." + viewName; + + assertUpdate("CREATE VIEW " + trinoViewInHive + " AS SELECT 1 x"); + + assertQueryFails( + "CALL iceberg.system.migrate('tpch', '" + viewName + "')", + "The procedure supports migrating only managed tables: .*"); + + assertQuery("SELECT * FROM " + trinoViewInHive, "VALUES 1"); + assertQuery("SELECT * FROM " + trinoViewInIceberg, "VALUES 1"); + + assertUpdate("DROP VIEW " + trinoViewInHive); + } + + @Test + public void testMigrateEmptyTable() + { + String tableName = "test_migrate_empty_" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + assertUpdate("CREATE TABLE " + hiveTableName + " (col int)"); + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertQuery("DESCRIBE " + icebergTableName, "VALUES ('col', 'integer', '', '')"); + assertQueryReturnsEmptyResult("SELECT * FROM " + icebergTableName); + + assertUpdate("DROP TABLE " + tableName); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java index c25bf836e89f..bd5bbf88b0d3 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java @@ -14,6 +14,7 @@ package io.trino.tests.product.iceberg; import io.trino.tempto.ProductTest; +import org.assertj.core.api.Assertions; import org.testng.annotations.Test; import static io.trino.tempto.assertions.QueryAssert.Row.row; @@ -22,12 +23,124 @@ import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.ICEBERG; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.utils.QueryExecutors.onSpark; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestIcebergProcedureCalls extends ProductTest { + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testMigrateHiveTable() + { + String tableName = "test_migrate_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + String sparkTableName = "iceberg_test.default." + tableName; + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + onTrino().executeQuery("CREATE TABLE " + hiveTableName + " AS SELECT 1 x"); + + onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + icebergTableName)) + .containsOnly(row(1)); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)) + .containsOnly(row(1)); + + onTrino().executeQuery("DROP TABLE IF EXISTS " + icebergTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testMigrateHivePartitionedTable() + { + String tableName = "test_migrate_partitioned_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + String sparkTableName = "iceberg_test.default." + tableName; + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + onTrino().executeQuery("CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part']) AS SELECT 1 x, 'test' part"); + + onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + icebergTableName)) + .containsOnly(row(1, "test")); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)) + .containsOnly(row(1, "test")); + + Assertions.assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE " + icebergTableName).getOnlyValue()) + .contains("partitioning = ARRAY['part']"); + + onTrino().executeQuery("DROP TABLE IF EXISTS " + icebergTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testMigrateHiveBucketedTable() + { + String tableName = "test_migrate_bucketed_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + onTrino().executeQuery("" + + "CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part'], bucketed_by = ARRAY['bucket'], bucket_count = 10)" + + "AS SELECT 1 bucket, 'test' part"); + + assertThatThrownBy(() -> onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')")) + .hasStackTraceContaining("Cannot migrate bucketed table"); + + assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM " + icebergTableName)) + .hasMessageContaining("Not an Iceberg table: default." + tableName); + assertThat(onTrino().executeQuery("SELECT * FROM " + hiveTableName)) + .containsOnly(row(1, "test")); + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + } + + @Test + public void testMigrateHiveBucketedOnMultipleColumns() + { + String tableName = "test_migrate_bucketed_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + onTrino().executeQuery("" + + "CREATE TABLE " + hiveTableName + " WITH (partitioned_by = ARRAY['part'], bucketed_by = ARRAY['bucket', 'another_bucket'], bucket_count = 10)" + + "AS SELECT 1 bucket, 'a' another_bucket, 'test' part"); + + assertThatThrownBy(() -> onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')")) + .hasStackTraceContaining("Cannot migrate bucketed table"); + + assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM " + icebergTableName)) + .hasMessageContaining("Not an Iceberg table: default." + tableName); + assertThat(onTrino().executeQuery("SELECT * FROM " + hiveTableName)) + .containsOnly(row(1, "a", "test")); + + onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testMigrateUnsupportedTransactionalTable() + { + String tableName = "test_migrate_unsupported_transactional_table_" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + onTrino().executeQuery("CREATE TABLE " + hiveTableName + " WITH (transactional = true) AS SELECT 1 x"); + + assertThatThrownBy(() -> onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')")) + .hasMessageContaining("Migrating transactional tables is unsupported"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + hiveTableName)).containsOnly(row(1)); + assertThatThrownBy(() -> onTrino().executeQuery("SELECT * FROM " + icebergTableName)) + .hasMessageContaining("Not an Iceberg table"); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) public void testRollbackToSnapshot() throws InterruptedException