Skip to content

Commit

Permalink
Core: Implement BaseMetastoreCatalog.registerTable() (#5037)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehul2500 authored Jul 22, 2022
1 parent ae96bdf commit 3c0fd8f
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsIntegTestUtil;
Expand All @@ -42,6 +44,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -295,6 +298,37 @@ public void testDropNamespace() {
Assert.assertFalse("namespace must not exist", response.hasItem());
}

@Test
public void testRegisterTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier identifier = TableIdentifier.of(namespace, catalogTableName);
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
Assertions.assertThat(catalog.dropTable(identifier, false)).isTrue();
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((DynamoDbTableOperations) ops).currentMetadataLocation();
Assertions.assertThat(catalog.registerTable(identifier, metadataLocation)).isNotNull();
Assertions.assertThat(catalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(catalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(catalog.dropNamespace(namespace)).isTrue();
}

@Test
public void testRegisterExistingTable() {
Namespace namespace = Namespace.of(genRandomName());
catalog.createNamespace(namespace);
TableIdentifier identifier = TableIdentifier.of(namespace, catalogTableName);
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((DynamoDbTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class);
Assertions.assertThat(catalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(catalog.dropNamespace(namespace)).isTrue();
}

private static String genRandomName() {
return UUID.randomUUID().toString().replace("-", "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.awssdk.services.glue.model.Column;
Expand Down Expand Up @@ -433,4 +435,33 @@ public void testTablePropsDefinedAtCatalogLevel() {
"table-key5",
table.properties().get("key5"));
}

@Test
public void testRegisterTable() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
TableIdentifier identifier = TableIdentifier.of(namespace, tableName);
Table table = glueCatalog.loadTable(identifier);
String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThat(glueCatalog.dropTable(identifier, false)).isTrue();
Assertions.assertThat(glueCatalog.registerTable(identifier, metadataLocation)).isNotNull();
Assertions.assertThat(glueCatalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}

@Test
public void testRegisterTableAlreadyExists() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
TableIdentifier identifier = TableIdentifier.of(namespace, tableName);
Table table = glueCatalog.loadTable(identifier);
String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThatThrownBy(() -> glueCatalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class);
Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -65,6 +66,26 @@ public Table loadTable(TableIdentifier identifier) {
return result;
}

@Override
public Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(
identifier != null && isValidIdentifier(identifier), "Invalid identifier: %s", identifier);
Preconditions.checkArgument(metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Cannot register an empty metadata file location as a table");

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new BaseMetastoreCatalogTableBuilder(identifier, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -47,6 +50,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -589,4 +593,33 @@ public void testTablePropsDefinedAtCatalogLevel() throws IOException {
"table-key5",
table.properties().get("key5"));
}

@Test
public void testRegisterTable() throws IOException {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
TableIdentifier identifier2 = TableIdentifier.of("a", "t2");
HadoopCatalog catalog = hadoopCatalog();
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((HadoopTableOperations) ops).current().metadataFileLocation();
Assertions.assertThat(catalog.registerTable(identifier2, metadataLocation)).isNotNull();
Assertions.assertThat(catalog.loadTable(identifier2)).isNotNull();
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
Assertions.assertThat(catalog.dropTable(identifier2)).isTrue();
}

@Test
public void testRegisterExistingTable() throws IOException {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
HadoopCatalog catalog = hadoopCatalog();
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((HadoopTableOperations) ops).current().metadataFileLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}
}
28 changes: 28 additions & 0 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -58,6 +60,7 @@
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -639,4 +642,29 @@ public void testConversions() {
Assert.assertEquals(ns, JdbcUtil.stringToNamespace(nsString));
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
catalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThat(catalog.registerTable(identifier, metadataLocation)).isNotNull();
Assertions.assertThat(catalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
catalog.createTable(identifier, SCHEMA);
Table registeringTable = catalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((JdbcTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> catalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(catalog.dropTable(identifier)).isTrue();
}
}
31 changes: 31 additions & 0 deletions dell/src/test/java/org/apache/iceberg/dell/ecs/TestEcsCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.dell.mock.ecs.EcsS3MockRule;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -35,6 +39,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -170,4 +175,30 @@ public void testRenameTable() {
Assert.assertFalse("Old table does not exist", ecsCatalog.tableExists(TableIdentifier.of("a", "t1")));
Assert.assertTrue("New table exists", ecsCatalog.tableExists(TableIdentifier.of("b", "t2")));
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
ecsCatalog.createTable(identifier, SCHEMA);
Table registeringTable = ecsCatalog.loadTable(identifier);
ecsCatalog.dropTable(identifier, false);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((EcsTableOperations) ops).currentMetadataLocation();
Assertions.assertThat(ecsCatalog.registerTable(identifier, metadataLocation)).isNotNull();
Assertions.assertThat(ecsCatalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(ecsCatalog.dropTable(identifier, true)).isTrue();
}

@Test
public void testRegisterExistingTable() {
TableIdentifier identifier = TableIdentifier.of("a", "t1");
ecsCatalog.createTable(identifier, SCHEMA);
Table registeringTable = ecsCatalog.loadTable(identifier);
TableOperations ops = ((HasTableOperations) registeringTable).operations();
String metadataLocation = ((EcsTableOperations) ops).currentMetadataLocation();
Assertions.assertThatThrownBy(() -> ecsCatalog.registerTable(identifier, metadataLocation))
.isInstanceOf(AlreadyExistsException.class)
.hasMessage("Table already exists: a.t1");
Assertions.assertThat(ecsCatalog.dropTable(identifier, true)).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -51,7 +49,6 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -234,23 +231,6 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

@Override
public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

// Throw an exception if this table already exists in the catalog.
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
ops.commit(null, metadata);

return new BaseTable(ops, identifier.toString());
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
String newMetadataLocation = (base == null) && (metadata.metadataFileLocation() != null) ?
metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);

boolean delete = true;
try {
Expand Down
Loading

0 comments on commit 3c0fd8f

Please sign in to comment.