Skip to content

Commit

Permalink
Hive: Set the Table owner on table creation (#5763)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborkaszab authored Oct 21, 2022
1 parent 6e0a952 commit b215c48
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 56 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,6 @@ private TableProperties() {}

public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

public static final String HMS_TABLE_OWNER = "hms_table_owner";
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -281,7 +282,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updateHiveTable = true;
LOG.debug("Committing existing table: {}", fullName);
} else {
tbl = newHmsTable();
tbl = newHmsTable(metadata);
LOG.debug("Committing new table: {}", fullName);
}

Expand Down Expand Up @@ -417,14 +418,15 @@ Table loadHmsTable() throws TException, InterruptedException {
}
}

private Table newHmsTable() {
private Table newHmsTable(TableMetadata metadata) {
Preconditions.checkNotNull(metadata, "'metadata' parameter can't be null");
final long currentTimeMillis = System.currentTimeMillis();

Table newTable =
new Table(
tableName,
database,
System.getProperty("user.name"),
metadata.property(TableProperties.HMS_TABLE_OWNER, System.getProperty("user.name")),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
Expand Down Expand Up @@ -452,13 +454,14 @@ private void setHmsTableParameters(
Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);

// push all Iceberg table properties into HMS
metadata
.properties()
metadata.properties().entrySet().stream()
.filter(entry -> !entry.getKey().equalsIgnoreCase(TableProperties.HMS_TABLE_OWNER))
.forEach(
(key, value) -> {
entry -> {
String key = entry.getKey();
// translate key names between Iceberg and HMS where needed
String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key);
parameters.put(hmsKey, value);
parameters.put(hmsKey, entry.getValue());
});
if (metadata.uuid() != null) {
parameters.put(TableProperties.UUID, metadata.uuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ public class TestHiveCatalog extends HiveMetastoreTest {

@Rule public TemporaryFolder temp = new TemporaryFolder();

private Schema getTestSchema() {
return new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
}

@Test
public void testCreateTableBuilder() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();
Expand All @@ -120,10 +123,7 @@ public void testCreateTableBuilder() throws Exception {

@Test
public void testCreateTableWithCaching() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();
Expand Down Expand Up @@ -188,10 +188,7 @@ public void testInitializeCatalogWithProperties() {

@Test
public void testCreateTableTxnBuilder() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();

Expand All @@ -211,10 +208,7 @@ public void testCreateTableTxnBuilder() throws Exception {

@Test
public void testReplaceTxnBuilder() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();
Expand Down Expand Up @@ -259,12 +253,31 @@ public void testReplaceTxnBuilder() throws Exception {
}
}

@Test
public void testCreateTableWithOwner() throws Exception {
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();
String owner = "some_owner";
ImmutableMap<String, String> properties =
ImmutableMap.of(TableProperties.HMS_TABLE_OWNER, owner);

try {
Table table = catalog.createTable(tableIdent, schema, spec, location, properties);
org.apache.hadoop.hive.metastore.api.Table hmsTable =
metastoreClient.getTable(DB_NAME, "tbl");
Assert.assertEquals(owner, hmsTable.getOwner());
Map<String, String> hmsTableParams = hmsTable.getParameters();
Assert.assertFalse(hmsTableParams.containsKey(TableProperties.HMS_TABLE_OWNER));
} finally {
catalog.dropTable(tableIdent);
}
}

@Test
public void testCreateTableDefaultSortOrder() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

Expand All @@ -283,10 +296,7 @@ public void testCreateTableDefaultSortOrder() throws Exception {

@Test
public void testCreateTableCustomSortOrder() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
SortOrder order = SortOrder.builderFor(schema).asc("id", NULLS_FIRST).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");
Expand Down Expand Up @@ -442,8 +452,7 @@ public void testRemoveNamespaceProperties() throws TException {
public void testDropNamespace() throws TException {
Namespace namespace = Namespace.of("dbname_drop");
TableIdentifier identifier = TableIdentifier.of(namespace, "table");
Schema schema =
new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
Schema schema = getTestSchema();

catalog.createNamespace(namespace, meta);
catalog.createTable(identifier, schema);
Expand Down Expand Up @@ -476,8 +485,7 @@ public void testDropNamespace() throws TException {
@Test
public void testDropTableWithoutMetadataFile() {
TableIdentifier identifier = TableIdentifier.of(DB_NAME, "tbl");
Schema tableSchema =
new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
Schema tableSchema = getTestSchema();
catalog.createTable(identifier, tableSchema);
String metadataFileLocation = catalog.newTableOps(identifier).current().metadataFileLocation();
TableOperations ops = catalog.newTableOps(identifier);
Expand All @@ -490,10 +498,7 @@ public void testDropTableWithoutMetadataFile() {

@Test
public void testTableName() {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

Expand All @@ -520,10 +525,7 @@ private String defaultUri(Namespace namespace) throws TException {

@Test
public void testUUIDinTableProperties() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();

Expand All @@ -538,10 +540,7 @@ public void testUUIDinTableProperties() throws Exception {

@Test
public void testSnapshotStatsTableProperties() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
TableIdentifier tableIdentifier = TableIdentifier.of(DB_NAME, "tbl");
String location = temp.newFolder("tbl").toString();

Expand Down Expand Up @@ -649,10 +648,7 @@ public void testNotExposeTableProperties() {

@Test
public void testSetDefaultPartitionSpec() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

try {
Expand All @@ -672,10 +668,7 @@ public void testSetDefaultPartitionSpec() throws Exception {

@Test
public void testSetCurrentSchema() throws Exception {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

try {
Expand Down Expand Up @@ -718,7 +711,7 @@ public void testConstructorWarehousePathWithEndSlash() {

@Test
public void testTablePropsDefinedAtCatalogLevel() {
Schema schema = new Schema(required(1, "id", Types.IntegerType.get(), "unique ID"));
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl");

ImmutableMap<String, String> catalogProps =
Expand Down

0 comments on commit b215c48

Please sign in to comment.