Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create external table location for Hive #17920

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe

external = true;
targetPath = Optional.of(getValidatedExternalLocation(externalLocation));
checkExternalPath(session, targetPath.get());
checkExternalPathAndCreateIfNotExists(session, targetPath.get());
}
else {
external = false;
Expand Down Expand Up @@ -1319,18 +1319,33 @@ private static Location getValidatedExternalLocation(String location)
return validated;
}

private void checkExternalPath(ConnectorSession session, Location location)
private void checkExternalPathAndCreateIfNotExists(ConnectorSession session, Location location)
{
try {
if (!fileSystemFactory.create(session).directoryExists(location).orElse(true)) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + location);
if (writesToNonManagedTablesEnabled) {
createDirectory(session, location);
}
else {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location must be a directory: " + location);
}
}
}
catch (IOException | IllegalArgumentException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e);
}
}

private void createDirectory(ConnectorSession session, Location location)
{
try {
fileSystemFactory.create(session).createDirectory(location);
}
catch (IOException e) {
throw new TrinoException(INVALID_TABLE_PROPERTY, e.getMessage());
}
}

private void checkPartitionTypesSupported(List<HiveColumnHandle> partitionColumns)
{
for (HiveColumnHandle partitionColumn : partitionColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.plugin.hive.security.SqlStandardAccessControlMetadata;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
Expand Down Expand Up @@ -90,6 +91,7 @@
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -105,6 +107,7 @@
import static io.trino.plugin.hive.AbstractTestHive.filterNonHiddenColumnMetadata;
import static io.trino.plugin.hive.AbstractTestHive.getAllSplits;
import static io.trino.plugin.hive.AbstractTestHive.getSplits;
import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand All @@ -122,6 +125,7 @@
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.MaterializedResult.materializeSourceDataStream;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingPageSinkId.TESTING_PAGE_SINK_ID;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -143,6 +147,7 @@ public abstract class AbstractTestHiveFileSystem
protected SchemaTableName tableWithHeader;
protected SchemaTableName tableWithHeaderAndFooter;
protected SchemaTableName temporaryCreateTable;
protected SchemaTableName temporaryCreateTableWithExternalLocation;

protected HdfsEnvironment hdfsEnvironment;
protected LocationService locationService;
Expand Down Expand Up @@ -190,8 +195,11 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec

String random = randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
temporaryCreateTable = new SchemaTableName(database, "tmp_trino_test_create_" + random);
temporaryCreateTableWithExternalLocation = new SchemaTableName(database, "tmp_trino_test_create_external" + random);

config = new HiveConfig().setS3SelectPushdownEnabled(s3SelectPushdownEnabled);
config = new HiveConfig()
.setWritesToNonManagedTablesEnabled(true)
.setS3SelectPushdownEnabled(s3SelectPushdownEnabled);

HivePartitionManager hivePartitionManager = new HivePartitionManager(config);

Expand Down Expand Up @@ -670,6 +678,24 @@ public void testTableCreation()
}
}

@Test
public void testTableCreationExternalLocation()
throws Exception
{
for (HiveStorageFormat storageFormat : HiveStorageFormat.values()) {
if (storageFormat == HiveStorageFormat.CSV) {
// CSV supports only unbounded VARCHAR type
continue;
}
if (storageFormat == HiveStorageFormat.REGEX) {
// REGEX format is read-only
continue;
}
createExternalTableOnNonExistingPath(temporaryCreateTableWithExternalLocation, storageFormat);
dropTable(temporaryCreateTableWithExternalLocation);
}
}

private void createTable(SchemaTableName tableName, HiveStorageFormat storageFormat)
throws Exception
{
Expand Down Expand Up @@ -734,6 +760,84 @@ private void createTable(SchemaTableName tableName, HiveStorageFormat storageFor
}
}

private void createExternalTableOnNonExistingPath(SchemaTableName tableName, HiveStorageFormat storageFormat)
throws Exception
{
List<ColumnMetadata> columns = ImmutableList.of(new ColumnMetadata("id", BIGINT));
String externalLocation = getBasePath() + "/external_" + randomNameSuffix();

MaterializedResult data = MaterializedResult.resultBuilder(newSession(), BIGINT)
.row(1L)
.row(3L)
.row(2L)
.build();

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

Map<String, Object> tableProperties = ImmutableMap.<String, Object>builder()
.putAll(createTableProperties(storageFormat))
.put(EXTERNAL_LOCATION_PROPERTY, externalLocation)
.buildOrThrow();

// begin creating the table
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, columns, tableProperties);
metadata.createTable(session, tableMetadata, true);

transaction.commit();

// Hack to work around the metastore not being configured for S3 or other FS.
// The metastore tries to validate the location when creating the
// table, which fails without explicit configuration for file system.
// We work around that by using a dummy location when creating the
// table and update it here to the correct location.
Location location = locationService.getTableWriteInfo(new LocationHandle(externalLocation, externalLocation, LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY), false).targetPath();
metastoreClient.updateTableLocation(database, tableName.getTableName(), location.toString());
}

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

ConnectorTableHandle connectorTableHandle = getTableHandle(metadata, tableName);
ConnectorInsertTableHandle outputHandle = metadata.beginInsert(session, connectorTableHandle, ImmutableList.of(), NO_RETRIES);

ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, outputHandle, TESTING_PAGE_SINK_ID);
sink.appendPage(data.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());

metadata.finishInsert(session, outputHandle, fragments, ImmutableList.of());
transaction.commit();
}

try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();

// load the new table
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());

// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), columns);
assertEquals(tableMetadata.getProperties().get("external_location"), externalLocation);

// verify the data
metadata.beginQuery(session);
ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, tableHandle);
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle, columnHandles, DynamicFilter.EMPTY)) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
assertEqualsIgnoreOrder(result.getMaterializedRows(), data.getMaterializedRows());
}

metadata.cleanupQuery(session);
}
}

private void dropTable(SchemaTableName table)
{
try (Transaction transaction = newTransaction()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tpch.TpchTable.CUSTOMER;
import static io.trino.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static java.nio.file.Files.createTempDirectory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;

public class TestHiveCreateExternalTable
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -87,4 +90,58 @@ public void testCreateExternalTableAsWithExistingDirectory()

assertQueryFails(createTableSql, "Target directory for table '.*' already exists:.*");
}

@Test
public void testCreateExternalTableOnNonExistingPath()
throws Exception
{
java.nio.file.Path tempDir = createTempDirectory(null);
// delete dir, trino should recreate it
deleteRecursively(tempDir, ALLOW_INSECURE);
String tableName = "test_create_external_non_exists_" + randomNameSuffix();

@Language("SQL") String createTableSql = format("" +
"CREATE TABLE %s.%s.%s (\n" +
" col1 varchar,\n" +
" col2 varchar\n" +
")\n" +
"WITH (\n" +
" external_location = '%s',\n" +
" format = 'TEXTFILE'\n" +
")",
getSession().getCatalog().get(),
getSession().getSchema().get(),
tableName,
tempDir.toUri().toASCIIString());

assertUpdate(createTableSql);
String actual = (String) computeScalar("SHOW CREATE TABLE " + tableName);
assertEquals(actual, createTableSql);
assertUpdate("DROP TABLE " + tableName);
deleteRecursively(tempDir, ALLOW_INSECURE);
}

@Test
public void testCreateExternalTableOnExistingPathToFile()
throws Exception
{
File tempFile = File.createTempFile("temp", ".tmp");
tempFile.deleteOnExit();
String tableName = "test_create_external_on_file_" + randomNameSuffix();

@Language("SQL") String createTableSql = format("""
CREATE TABLE %s.%s.%s (
col1 varchar,
col2 varchar
)WITH (
external_location = '%s',
format = 'TEXTFILE')
""",
getSession().getCatalog().get(),
getSession().getSchema().get(),
tableName,
tempFile.toPath().toUri().toASCIIString());

assertQueryFails(createTableSql, ".*Destination exists and is not a directory.*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,32 @@ public void testBasicOperationsWithProvidedTableLocation(boolean partitioned, Lo
}
}

@Test(dataProvider = "locationPatternsDataProvider")
public void testBasicOperationsWithProvidedTableLocationNonCTAS(boolean partitioned, LocationPattern locationPattern)
{
// this test needed, because execution path for CTAS and simple create is different
String tableName = "test_basic_operations_" + randomNameSuffix();
String location = locationPattern.locationForTable(bucketName, schemaName, tableName);
String partitionQueryPart = (partitioned ? ",partitioned_by = ARRAY['col_int']" : "");

String create = "CREATE TABLE " + tableName + "(col_str varchar, col_int integer) WITH (external_location = '" + location + "' " + partitionQueryPart + ")";
if (locationPattern == DOUBLE_SLASH || locationPattern == TRIPLE_SLASH || locationPattern == TWO_TRAILING_SLASHES) {
assertQueryFails(create, "\\QUnsupported location that cannot be internally represented: " + location);
return;
}
assertUpdate(create);
try (UncheckedCloseable ignored = onClose("DROP TABLE " + tableName)) {
String actualTableLocation = getTableLocation(tableName);
assertThat(actualTableLocation).isEqualTo(location);

assertUpdate("INSERT INTO " + tableName + " VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)", 4);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");

assertThat(getTableFiles(actualTableLocation)).isNotEmpty();
validateDataFiles(partitioned ? "col_int" : "", tableName, actualTableLocation);
}
}

@Override // Row-level modifications are not supported for Hive tables
@Test(dataProvider = "locationPatternsDataProvider")
public void testBasicOperationsWithProvidedSchemaLocation(boolean partitioned, LocationPattern locationPattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
return ImmutableList.of(
testOnEnvironment(EnvMultinode.class)
.withGroups("configured_features", "hdfs_no_impersonation")
.withExcludedTests("io.trino.tests.product.TestImpersonation.testExternalLocationTableCreationSuccess")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this exclusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, don't remember any specific reason, need to run and check this test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood. please recover this info & capture it as a code comment.

Copy link
Contributor Author

@vlad-lyutenko vlad-lyutenko Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood. please recover this info & capture it as a code comment.

got it I didn't want enable hive.non-managed-table-writes-enabled for whole EnvMultinode environment to break other tests,
which is required for this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build(),
testOnEnvironment(EnvSinglenodeKerberosHdfsNoImpersonation.class)
.withGroups("configured_features", "storage_formats", "hdfs_no_impersonation", "hive_kerberos")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ hive.hdfs.impersonation.enabled=true
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.max-partitions-for-eager-load=100
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ hive.fs.new-directory-permissions=0700
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.max-partitions-for-eager-load=100
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ hive.hdfs.trino.keytab=/etc/hadoop/conf/hdfs-other.keytab
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.max-partitions-for-eager-load=100
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ hive.security=sql-standard
hive.hive-views.enabled=true

hive.hdfs.wire-encryption.enabled=true
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ hive.max-partitions-for-eager-load=100
hive.security=sql-standard
#required for testAccessControlSetHiveViewAuthorization() product test
hive.hive-views.enabled=true
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ hive.hdfs.trino.keytab=/etc/hadoop/conf/hdfs.keytab
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.max-partitions-for-eager-load=100
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ hive.max-partitions-for-eager-load=100
hive.security=sql-standard
#required for testAccessControlSetHiveViewAuthorization() product test
hive.hive-views.enabled=true
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ hive.max-partitions-for-eager-load=100
hive.security=sql-standard
#required for testAccessControlSetHiveViewAuthorization() product test
hive.hive-views.enabled=true
hive.non-managed-table-writes-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ hive.hdfs.trino.credential-cache.location=/etc/trino/conf/hdfs-krbcc
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.max-partitions-for-eager-load=100
hive.non-managed-table-writes-enabled=true
Loading