Skip to content

Commit

Permalink
Add support for migrate procedure in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 14, 2023
1 parent 32ef8a9 commit ce42133
Show file tree
Hide file tree
Showing 10 changed files with 876 additions and 13 deletions.
14 changes: 12 additions & 2 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,18 @@ Migrating existing tables
-------------------------

The connector can read from or write to Hive tables that have been migrated to Iceberg.
There is no Trino support for migrating Hive tables to Iceberg, so you need to either use
the Iceberg API or Apache Spark.
An SQL procedure ``system.migrate`` allows the caller to replace
a Hive table with an Iceberg table, loaded with the source’s data files.
Table schema, partitioning, properties, and location will be copied from the source table.
Migrate will fail if any table partition uses an unsupported format::

CALL iceberg.system.migrate(schema_name => 'testdb', table_name => 'customer_orders')

In addition, you can provide a ``recursive_directory`` argument to migrate the table with recursive directories.
The possible values are ``true``, ``false`` and ``fail``. The default value is ``fail`` that throws an exception
if nested directory exists::

CALL iceberg.system.migrate(schema_name => 'testdb', table_name => 'customer_orders', recursive_directory => 'true')

.. _iceberg-table-properties:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public class HiveMetadata
public static final String BUCKETING_VERSION = "bucketing_version";
public static final String TABLE_COMMENT = "comment";
public static final String STORAGE_TABLE = "storage_table";
private static final String TRANSACTIONAL = "transactional";
public static final String TRANSACTIONAL = "transactional";
public static final String PRESTO_VIEW_COMMENT = "Presto View";
public static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = "/* Presto View */";

Expand Down Expand Up @@ -3463,7 +3463,11 @@ public List<GrantInfo> listTablePrivileges(ConnectorSession session, SchemaTable

private static HiveStorageFormat extractHiveStorageFormat(Table table)
{
StorageFormat storageFormat = table.getStorage().getStorageFormat();
return extractHiveStorageFormat(table.getStorage().getStorageFormat());
}

public static HiveStorageFormat extractHiveStorageFormat(StorageFormat storageFormat)
{
String outputFormat = storageFormat.getOutputFormat();
String serde = storageFormat.getSerde();

Expand Down
11 changes: 5 additions & 6 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@
<artifactId>iceberg-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
Expand Down Expand Up @@ -290,12 +295,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@
import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
import static io.trino.plugin.iceberg.IcebergUtil.parseVersion;
import static io.trino.plugin.iceberg.procedure.MigrateProcedure.PROVIDER_PROPERTY_KEY;
import static io.trino.plugin.iceberg.procedure.MigrateProcedure.PROVIDER_PROPERTY_VALUE;
import static java.lang.String.format;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION;
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT;
Expand Down Expand Up @@ -145,7 +148,15 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
}

if (base == null) {
commitNewTable(metadata);
if (PROVIDER_PROPERTY_VALUE.equals(metadata.properties().get(PROVIDER_PROPERTY_KEY))) {
// Assume this is a table executing migrate procedure
version = OptionalInt.of(0);
currentMetadataLocation = metadata.properties().get(METADATA_LOCATION_PROP);
commitToExistingTable(base, metadata);
}
else {
commitNewTable(metadata);
}
}
else {
commitToExistingTable(base, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HideDeltaLakeTables;
Expand All @@ -26,9 +27,12 @@
import io.trino.plugin.iceberg.catalog.MetastoreValidator;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory;
import io.trino.plugin.iceberg.procedure.MigrateProcedure;
import io.trino.spi.procedure.Procedure;

import java.util.concurrent.TimeUnit;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule.HIDE_DELTA_LAKE_TABLES_IN_ICEBERG;

Expand All @@ -49,5 +53,7 @@ protected void setup(Binder binder)
// ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg
config.setStatsCacheTtl(new Duration(0, TimeUnit.SECONDS));
});
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,28 @@
package io.trino.plugin.iceberg.catalog.glue;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.glue.AWSGlueAsync;
import com.amazonaws.services.glue.model.Table;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.HideDeltaLakeTables;
import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore;
import io.trino.plugin.hive.metastore.glue.GlueCredentialsProvider;
import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreModule;
import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.procedure.MigrateProcedure;
import io.trino.spi.procedure.Procedure;

import java.util.function.Predicate;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand All @@ -36,11 +48,18 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(GlueHiveMetastoreConfig.class);
configBinder(binder).bindConfig(IcebergGlueCatalogConfig.class);
binder.bind(GlueMetastoreStats.class).in(Scopes.SINGLETON);
binder.bind(AWSGlueAsync.class).toProvider(GlueClientProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(GlueMetastoreStats.class).withGeneratedName();
binder.bind(AWSCredentialsProvider.class).toProvider(GlueCredentialsProvider.class).in(Scopes.SINGLETON);
binder.bind(IcebergTableOperationsProvider.class).to(GlueIcebergTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON);
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();

// Required to inject HiveMetastoreFactory for migrate procedure
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
newOptionalBinder(binder, Key.get(new TypeLiteral<Predicate<Table>>() {}, ForGlueHiveMetastore.class))
.setBinding().toInstance(table -> true);
install(new GlueMetastoreModule());
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HideDeltaLakeTables;
Expand All @@ -26,9 +27,12 @@
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.MetastoreValidator;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.procedure.MigrateProcedure;
import io.trino.spi.procedure.Procedure;

import java.util.concurrent.TimeUnit;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;

public class IcebergHiveMetastoreCatalogModule
Expand All @@ -51,5 +55,7 @@ protected void setup(Binder binder)
// ensure caching metastore wrapper isn't created, as it's not leveraged by Iceberg
config.setStatsCacheTtl(new Duration(0, TimeUnit.SECONDS));
});
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(MigrateProcedure.class).in(Scopes.SINGLETON);
}
}
Loading

0 comments on commit ce42133

Please sign in to comment.