Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Added ability to add uuid suffix to the table location in Hive catalog #2850

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -54,17 +55,22 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

public static final String APPEND_UUID_SUFFIX_TO_TABLE_LOCATION = "append-uuid-suffix-to-table-location";
public static final boolean APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT = false;

private String name;
private Configuration conf;
private FileIO fileIO;
private ClientPool<HiveMetaStoreClient, TException> clients;
private boolean useUniqueTableName;

public HiveCatalog() {
}
Expand Down Expand Up @@ -107,6 +113,10 @@ public void initialize(String inputName, Map<String, String> properties) {
this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

this.clients = new CachedClientPool(conf, properties);
if (properties.containsKey(APPEND_UUID_SUFFIX_TO_TABLE_LOCATION)) {
this.useUniqueTableName = PropertyUtil.propertyAsBoolean(properties,
APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, APPEND_UUID_SUFFIX_TO_TABLE_LOCATION_DEFAULT);
}
}

@Override
Expand Down Expand Up @@ -438,7 +448,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0]));
if (databaseData.getLocationUri() != null) {
// If the database location is set use it as a base.
return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name());
return String.format("%s/%s", databaseData.getLocationUri(), getTableName(tableIdentifier));
}

} catch (TException e) {
Expand All @@ -455,7 +465,7 @@ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
"%s/%s.db/%s",
warehouseLocation,
tableIdentifier.namespace().levels()[0],
tableIdentifier.name());
getTableName(tableIdentifier));
}

private String getWarehouseLocation() {
Expand All @@ -464,6 +474,14 @@ private String getWarehouseLocation() {
return warehouseLocation;
}

private String getTableName(TableIdentifier tableIdentifier) {
if (useUniqueTableName) {
return String.format("%s-%s", tableIdentifier.name(), UUID.randomUUID());
} else {
return tableIdentifier.name();
}
}

private Map<String, String> convertToMetadata(Database database) {

Map<String, String> meta = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -71,6 +72,12 @@ public static Object[][] parameters() {
"default-namespace", "default",
"parquet-enabled", "true",
"cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
) },
{ "spark_catalog_with_unique_location", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION, "true"
) }
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
Expand All @@ -40,8 +41,14 @@
import org.junit.Test;

public class TestCreateTable extends SparkCatalogTestBase {
private boolean shouldHaveUniqueTableLocation;

public TestCreateTable(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);

if ("true".equals(config.get(HiveCatalog.APPEND_UUID_SUFFIX_TO_TABLE_LOCATION))) {
this.shouldHaveUniqueTableLocation = true;
}
}

@After
Expand Down Expand Up @@ -279,4 +286,24 @@ public void testDowngradeTableToFormatV1ThroughTablePropertyFails() {
"Cannot downgrade v2 table to v1",
() -> sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='1')", tableName));
}

@Test
public void testCreateTableWithUniqueLocation() {
Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent));
sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertNotNull("Should load the new table", table);

String location = table.location();
if (shouldHaveUniqueTableLocation) {
String tableNameWithUuidSuffix = ".*[0-9a-f]{8}\\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\\b[0-9a-f]{12}";
Assert.assertTrue(
"Should have uuid suffix in table name",
location.matches(tableNameWithUuidSuffix));
} else {
Assert.assertTrue("Should end with table name ",
location.endsWith(tableIdent.name()));
}
}
}