Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanup for Iceberg connector #11074

Merged
merged 5 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
Expand All @@ -55,7 +54,6 @@ public class IcebergConnector
private final ConnectorPageSourceProvider pageSourceProvider;
private final ConnectorPageSinkProvider pageSinkProvider;
private final ConnectorNodePartitioningProvider nodePartitioningProvider;
private final Set<SystemTable> systemTables;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> schemaProperties;
private final List<PropertyMetadata<?>> tableProperties;
Expand All @@ -70,7 +68,6 @@ public IcebergConnector(
ConnectorPageSourceProvider pageSourceProvider,
ConnectorPageSinkProvider pageSinkProvider,
ConnectorNodePartitioningProvider nodePartitioningProvider,
Set<SystemTable> systemTables,
Set<SessionPropertiesProvider> sessionPropertiesProviders,
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
Expand All @@ -84,7 +81,6 @@ public IcebergConnector(
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
Expand Down Expand Up @@ -132,12 +128,6 @@ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
return nodePartitioningProvider;
}

@Override
public Set<SystemTable> getSystemTables()
{
return systemTables;
}

@Override
public Set<Procedure> getProcedures()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
Expand Down Expand Up @@ -122,7 +121,6 @@ public static Connector createConnector(
new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
ImmutableSet.of(),
sessionPropertiesProviders,
IcebergSchemaProperties.SCHEMA_PROPERTIES,
icebergTableProperties.getTableProperties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface TrinoCatalog
{
List<String> listNamespaces(ConnectorSession session);

boolean dropNamespace(ConnectorSession session, String namespace);
void dropNamespace(ConnectorSession session, String namespace);

Map<String, Object> loadNamespaceMetadata(ConnectorSession session, String namespace);

Expand All @@ -68,7 +68,7 @@ Transaction newCreateTableTransaction(
String location,
Map<String, String> properties);

boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData);
void dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData);

void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void createNamespace(ConnectorSession session, String namespace, Map<Stri
}

@Override
public boolean dropNamespace(ConnectorSession session, String namespace)
public void dropNamespace(ConnectorSession session, String namespace)
{
// basic sanity check to provide a better error message
if (!listTables(session, Optional.of(namespace)).isEmpty() ||
Expand Down Expand Up @@ -239,7 +239,6 @@ public boolean dropNamespace(ConnectorSession session, String namespace)
}).orElse(deleteSchemaLocationsFallback);

metastore.dropDatabase(namespace, deleteData);
return true;
}

@Override
Expand Down Expand Up @@ -296,7 +295,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
}

@Override
public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData)
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
Table table = loadTable(session, schemaTableName);
Expand All @@ -306,7 +305,6 @@ public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableNa
throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino");
}
metastore.dropTable(schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData);
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION;
Expand All @@ -74,8 +75,6 @@ public abstract class AbstractMetastoreTableOperations
{
private static final Logger log = Logger.get(AbstractMetastoreTableOperations.class);

public static final String METADATA_LOCATION = "metadata_location";
public static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location";
protected static final String METADATA_FOLDER_NAME = "metadata";

protected static final StorageFormat STORAGE_FORMAT = StorageFormat.create(
Expand Down Expand Up @@ -151,9 +150,9 @@ public TableMetadata refresh()
throw new UnknownTableTypeException(getSchemaTableName());
}

String metadataLocation = table.getParameters().get(METADATA_LOCATION);
String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
if (metadataLocation == null) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName()));
throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION_PROP, getSchemaTableName()));
}

refreshFromMetadataLocation(metadataLocation);
Expand Down Expand Up @@ -190,34 +189,22 @@ protected void commitNewTable(TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

Table table;
try {
Table.Builder builder = Table.builder()
.setDatabaseName(database)
.setTableName(tableName)
.setOwner(owner)
.setTableType(TableType.EXTERNAL_TABLE.name())
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.setParameter(METADATA_LOCATION, newMetadataLocation);
String tableComment = metadata.properties().get(TABLE_COMMENT);
if (tableComment != null) {
builder.setParameter(TABLE_COMMENT, tableComment);
}
table = builder.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
Table.Builder builder = Table.builder()
.setDatabaseName(database)
.setTableName(tableName)
.setOwner(owner)
.setTableType(TableType.EXTERNAL_TABLE.name())
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.setParameter(METADATA_LOCATION_PROP, newMetadataLocation);
String tableComment = metadata.properties().get(TABLE_COMMENT);
if (tableComment != null) {
electrum marked this conversation as resolved.
Show resolved Hide resolved
builder.setParameter(TABLE_COMMENT, tableComment);
}
Table table = builder.build();

PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES);
metastore.createTable(table, privileges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg.catalog.file;

import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreUtil;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations;
Expand All @@ -27,8 +28,9 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;

@NotThreadSafe
public class FileMetastoreTableOperations
Expand All @@ -49,38 +51,26 @@ public FileMetastoreTableOperations(
@Override
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);
Table currentTable = getTable();

Table table;
try {
Table currentTable = getTable();
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION_PROP);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}
Table table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION_PROP, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation)
.build();

// todo privileges should not be replaced for an alter
PrincipalPrivileges privileges = owner.isEmpty() && table.getOwner().isPresent() ? NO_PRIVILEGES : buildInitialPrivilegeSet(table.getOwner().get());
PrincipalPrivileges privileges = table.getOwner().map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES);
metastore.replaceTable(database, tableName, table, privileges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreUtil;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastore;
Expand All @@ -30,10 +31,11 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;

@NotThreadSafe
public class HiveMetastoreTableOperations
Expand Down Expand Up @@ -73,7 +75,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
.orElseThrow(() -> new TableNotFoundException(getSchemaTableName())));

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION_PROP);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
Expand All @@ -82,8 +84,8 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.setParameter(METADATA_LOCATION_PROP, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation)
.build();
}
catch (RuntimeException e) {
Expand All @@ -97,7 +99,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
}

// todo privileges should not be replaced for an alter
PrincipalPrivileges privileges = owner.isEmpty() && table.getOwner().isPresent() ? NO_PRIVILEGES : buildInitialPrivilegeSet(table.getOwner().get());
PrincipalPrivileges privileges = table.getOwner().map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES);
electrum marked this conversation as resolved.
Show resolved Hide resolved
metastore.replaceTable(database, tableName, table, privileges);
}
finally {
Expand Down