Skip to content

Commit

Permalink
Address new review comments (trinodb#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored May 22, 2023
1 parent 916e85a commit 419ba5f
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 472 deletions.
15 changes: 8 additions & 7 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ To use Iceberg, you need:

* Network access from the Trino coordinator and workers to the distributed
object storage.
* Access to a Hive metastore service (HMS) / AWS Glue or a `Nessie server <https://projectnessie.org/>`_.
* Access to a Hive metastore service (HMS), AWS Glue or a `Nessie server <https://projectnessie.org/>`_.
* Network access from the Trino coordinator to the HMS. Hive
metastore access with the Thrift protocol defaults to using port 9083.

Expand Down Expand Up @@ -225,6 +225,7 @@ properties:
REST catalog does not support :doc:`views</sql/create-view>` or
:doc:`materialized views</sql/create-materialized-view>`.

.. _iceberg-nessie-catalog:

Nessie catalog
^^^^^^^^^^^^^^
Expand All @@ -236,12 +237,12 @@ properties:
==================================================== ============================================================
Property Name Description
==================================================== ============================================================
``iceberg.nessie.ref`` The branch/tag to use for Nessie, defaults to ``main``.

``iceberg.nessie.uri`` Nessie API endpoint URI (required).
``iceberg.nessie-catalog.uri`` Nessie API endpoint URI (required).
Example: ``https://localhost:19120/api/v1``

``iceberg.nessie.default-warehouse-dir`` Default warehouse directory for schemas created without an
``iceberg.nessie-catalog.ref`` The branch/tag to use for Nessie, defaults to ``main``.

``iceberg.nessie-catalog.default-warehouse-dir`` Default warehouse directory for schemas created without an
explicit ``location`` property.
Example: ``/tmp``
==================================================== ============================================================
Expand All @@ -250,8 +251,8 @@ Property Name Description

connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie.uri=https://localhost:19120/api/v1
iceberg.nessie.default-warehouse-dir=/tmp
iceberg.nessie-catalog.uri=https://localhost:19120/api/v1
iceberg.nessie-catalog.default-warehouse-dir=/tmp


.. _iceberg-jdbc-catalog:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,41 @@
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

public class NessieConfig
import java.net.URI;

public class IcebergNessieCatalogConfig
{
private String defaultReferenceName = "main";
private String serverUri;
private String defaultWarehouseDir;
private URI serverUri;

@NotNull
public String getDefaultReferenceName()
{
return defaultReferenceName;
}

@Config("iceberg.nessie.ref")
@Config("iceberg.nessie-catalog.ref")
@ConfigDescription("The default Nessie reference to work on")
public NessieConfig setDefaultReferenceName(String defaultReferenceName)
public IcebergNessieCatalogConfig setDefaultReferenceName(String defaultReferenceName)
{
this.defaultReferenceName = defaultReferenceName;
return this;
}

@NotNull
public String getServerUri()
public URI getServerUri()
{
return serverUri;
}

@Config("iceberg.nessie.uri")
@Config("iceberg.nessie-catalog.uri")
@ConfigDescription("The URI to connect to the Nessie server")
public NessieConfig setServerUri(String serverUri)
public IcebergNessieCatalogConfig setServerUri(String serverUri)
{
this.serverUri = serverUri;
if (serverUri != null) {
this.serverUri = URI.create(serverUri);
}
return this;
}

Expand All @@ -59,9 +63,9 @@ public String getDefaultWarehouseDir()
return defaultWarehouseDir;
}

@Config("iceberg.nessie.default-warehouse-dir")
@Config("iceberg.nessie-catalog.default-warehouse-dir")
@ConfigDescription("The default warehouse to use for Nessie")
public NessieConfig setDefaultWarehouseDir(String defaultWarehouseDir)
public IcebergNessieCatalogConfig setDefaultWarehouseDir(String defaultWarehouseDir)
{
this.defaultWarehouseDir = defaultWarehouseDir;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ public class IcebergNessieCatalogModule
@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(NessieConfig.class);
binder.bind(IcebergTableOperationsProvider.class).to(NessieIcebergTableOperationsProvider.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(IcebergNessieCatalogConfig.class);
binder.bind(IcebergTableOperationsProvider.class).to(IcebergNessieTableOperationsProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName();
binder.bind(TrinoCatalogFactory.class).to(TrinoNessieCatalogFactory.class).in(Scopes.SINGLETON);
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();
}

@Provides
@Singleton
public static NessieIcebergClient createNessieIcebergClient(NessieConfig nessieConfig)
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
{
return new NessieIcebergClient(
HttpClientBuilder.builder()
.withUri(nessieConfig.getServerUri())
.withUri(icebergNessieCatalogConfig.getServerUri())
.build(NessieApiV1.class),
nessieConfig.getDefaultReferenceName(),
icebergNessieCatalogConfig.getDefaultReferenceName(),
null,
ImmutableMap.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Namespace;

import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.catalog.nessie.NessieIcebergUtil.toIdentifier;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class NessieIcebergTableOperations
public class IcebergNessieTableOperations
extends AbstractIcebergTableOperations
{
private final NessieIcebergClient nessieClient;
private IcebergTable table;

protected NessieIcebergTableOperations(
protected IcebergNessieTableOperations(
NessieIcebergClient nessieClient,
FileIO fileIo,
ConnectorSession session,
Expand Down Expand Up @@ -95,7 +97,7 @@ protected void commitNewTable(TableMetadata metadata)
{
verify(version.isEmpty(), "commitNewTable called on a table which already exists");
try {
nessieClient.commitTable(null, metadata, writeNewMetadata(metadata, 0), table, NessieIcebergUtil.toKey(new SchemaTableName(database, this.tableName)));
nessieClient.commitTable(null, metadata, writeNewMetadata(metadata, 0), table, toKey(new SchemaTableName(database, this.tableName)));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
Expand All @@ -112,7 +114,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
verify(version.orElseThrow() >= 0, "commitToExistingTable called on a new table");
try {
nessieClient.commitTable(base, metadata, writeNewMetadata(metadata, version.getAsInt() + 1), table, NessieIcebergUtil.toKey(new SchemaTableName(database, this.tableName)));
nessieClient.commitTable(base, metadata, writeNewMetadata(metadata, version.getAsInt() + 1), table, toKey(new SchemaTableName(database, this.tableName)));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
Expand All @@ -123,4 +125,9 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
}
shouldRefresh = true;
}

private static ContentKey toKey(SchemaTableName tableName)
{
return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

import static java.util.Objects.requireNonNull;

public class NessieIcebergTableOperationsProvider
public class IcebergNessieTableOperationsProvider
implements IcebergTableOperationsProvider
{
private final TrinoFileSystemFactory fileSystemFactory;
private final NessieIcebergClient nessieClient;

@Inject
public NessieIcebergTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, NessieIcebergClient nessieClient)
public IcebergNessieTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, NessieIcebergClient nessieClient)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
Expand All @@ -49,7 +49,7 @@ public IcebergTableOperations createTableOperations(
Optional<String> owner,
Optional<String> location)
{
return new NessieIcebergTableOperations(
return new IcebergNessieTableOperations(
nessieClient,
new ForwardingFileIo(fileSystemFactory.create(session)),
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@

import io.trino.spi.connector.SchemaTableName;
import org.apache.iceberg.catalog.TableIdentifier;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.Namespace;

final class NessieIcebergUtil
final class IcebergNessieUtil
{
private NessieIcebergUtil() {}

static ContentKey toKey(SchemaTableName tableName)
{
return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName());
}
private IcebergNessieUtil() {}

static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.NessieIcebergUtil.toIdentifier;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -84,7 +84,7 @@ public TrinoNessieCatalog(
public boolean namespaceExists(ConnectorSession session, String namespace)
{
try {
return null != nessieClient.loadNamespaceMetadata(Namespace.of(namespace));
return nessieClient.loadNamespaceMetadata(Namespace.of(namespace)) != null;
}
catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ public TrinoNessieCatalogFactory(
TrinoFileSystemFactory fileSystemFactory,
IcebergTableOperationsProvider tableOperationsProvider,
NessieIcebergClient nessieClient,
NessieConfig nessieConfig,
IcebergNessieCatalogConfig icebergNessieCatalogConfig,
IcebergConfig icebergConfig)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
this.warehouseLocation = requireNonNull(nessieConfig, "nessieConfig is null").getDefaultWarehouseDir();
this.warehouseLocation = requireNonNull(icebergNessieCatalogConfig, "nessieConfig is null").getDefaultWarehouseDir();
requireNonNull(icebergConfig, "icebergConfig is null");
this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6561,33 +6561,6 @@ protected void verifySchemaNameLengthFailurePermissible(Throwable e)
assertThat(e).hasMessageMatching("Schema name must be shorter than or equal to '128' characters but got '129'");
}

@Override
public void testRenameTableToLongTableName()
{
// Override because the max name length is different from CREATE TABLE case
String sourceTableName = "test_rename_source_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + sourceTableName + " AS SELECT 123 x", 1);

String baseTableName = "test_rename_target_" + randomNameSuffix();

String validTargetTableName = baseTableName + "z".repeat(maxTableRenameLength().getAsInt() - baseTableName.length());
assertUpdate("ALTER TABLE " + sourceTableName + " RENAME TO " + validTargetTableName);
assertTrue(getQueryRunner().tableExists(getSession(), validTargetTableName));
assertQuery("SELECT x FROM " + validTargetTableName, "VALUES 123");
assertUpdate("DROP TABLE " + validTargetTableName);

assertUpdate("CREATE TABLE " + sourceTableName + " AS SELECT 123 x", 1);
String invalidTargetTableName = validTargetTableName + "z";
assertThatThrownBy(() -> assertUpdate("ALTER TABLE " + sourceTableName + " RENAME TO " + invalidTargetTableName))
.satisfies(this::verifyTableNameLengthFailurePermissible);
verifyInvalidTargetTableDoesNotExist(invalidTargetTableName);
}

protected void verifyInvalidTargetTableDoesNotExist(String invalidTargetTableName)
{
assertFalse(getQueryRunner().tableExists(getSession(), invalidTargetTableName));
}

@Test
public void testSnapshotSummariesHaveTrinoQueryIdFormatV1()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,23 @@ public void testJdbcCatalog()
.shutdown();
}

private static ConnectorFactory getConnectorFactory()
{
return getOnlyElement(new IcebergPlugin().getConnectorFactories());
}

@Test
public void testNessieMetastore()
public void testNessieCatalog()
{
ConnectorFactory factory = getConnectorFactory();

factory.create(
"test",
Map.of(
"iceberg.catalog.type", "nessie",
"iceberg.nessie.default-warehouse-dir", "/tmp",
"iceberg.nessie.uri", "http://foo:1234"),
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.uri", "http://foo:1234"),
new TestingConnectorContext())
.shutdown();
}

private static ConnectorFactory getConnectorFactory()
{
return getOnlyElement(new IcebergPlugin().getConnectorFactories());
}
}
Loading

0 comments on commit 419ba5f

Please sign in to comment.