diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java index c37cfb98e8d4..96d8f69101d4 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveClientConfig.java @@ -148,6 +148,8 @@ public class HiveClientConfig private boolean s3SelectPushdownEnabled; private int s3SelectPushdownMaxConnections = 500; + private boolean isTemporaryStagingDirectoryEnabled = true; + public int getMaxInitialSplits() { return maxInitialSplits; @@ -1190,4 +1192,17 @@ public HiveClientConfig setS3SelectPushdownMaxConnections(int s3SelectPushdownMa this.s3SelectPushdownMaxConnections = s3SelectPushdownMaxConnections; return this; } + + @Config("hive.temporary-staging-directory-enabled") + @ConfigDescription("Should use (if possible) temporary staging directory for write operations") + public HiveClientConfig setTemporaryStagingDirectoryEnabled(boolean temporaryStagingDirectoryEnabled) + { + this.isTemporaryStagingDirectoryEnabled = temporaryStagingDirectoryEnabled; + return this; + } + + public boolean isTemporaryStagingDirectoryEnabled() + { + return isTemporaryStagingDirectoryEnabled; + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java index 57ff66cc2b66..3d5a41f35563 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java @@ -27,6 +27,7 @@ import java.util.Optional; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS; +import static io.prestosql.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled; import static io.prestosql.plugin.hive.HiveWriteUtils.createTemporaryPath; import static io.prestosql.plugin.hive.HiveWriteUtils.getTableDefaultLocation; import static io.prestosql.plugin.hive.HiveWriteUtils.isS3FileSystem; @@ -59,7 +60,7 @@ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, Conn throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath)); } - if (shouldUseTemporaryDirectory(context, targetPath)) { + if (shouldUseTemporaryDirectory(session, context, targetPath)) { Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath); return new LocationHandle(targetPath, writePath, false, STAGE_AND_MOVE_TO_TARGET_DIRECTORY); } @@ -74,7 +75,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName()); Path targetPath = new Path(table.getStorage().getLocation()); - if (shouldUseTemporaryDirectory(context, targetPath)) { + if (shouldUseTemporaryDirectory(session, context, targetPath)) { Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath); return new LocationHandle(targetPath, writePath, true, STAGE_AND_MOVE_TO_TARGET_DIRECTORY); } @@ -83,10 +84,11 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, } } - private boolean shouldUseTemporaryDirectory(HdfsContext context, Path path) + private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path) { - // skip using temporary directory for S3 - return !isS3FileSystem(context, hdfsEnvironment, path); + return isTemporaryStagingDirectoryEnabled(session) + // skip using temporary directory for S3 + && !isS3FileSystem(context, hdfsEnvironment, path); } @Override diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java index afc646035f30..fff94e01d1bf 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java @@ -76,6 +76,7 @@ public final class HiveSessionProperties private static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write"; private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count"; private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled"; + private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled"; private final List> sessionProperties; @@ -296,6 +297,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon S3_SELECT_PUSHDOWN_ENABLED, "S3 Select pushdown enabled", hiveClientConfig.isS3SelectPushdownEnabled(), + false), + booleanProperty( + TEMPORARY_STAGING_DIRECTORY_ENABLED, + "Should use temporary staging directory for write operations", + hiveClientConfig.isTemporaryStagingDirectoryEnabled(), false)); } @@ -495,6 +501,11 @@ public static boolean isOptimizedMismatchedBucketCount(ConnectorSession session) return session.getProperty(OPTIMIZE_MISMATCHED_BUCKET_COUNT, Boolean.class); } + public static boolean isTemporaryStagingDirectoryEnabled(ConnectorSession session) + { + return session.getProperty(TEMPORARY_STAGING_DIRECTORY_ENABLED, Boolean.class); + } + public static PropertyMetadata dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden) { return new PropertyMetadata<>( diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/LocationHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/LocationHandle.java index 5c004cc26bfb..a1cce7ba207a 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/LocationHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/LocationHandle.java @@ -107,15 +107,15 @@ public WriteMode getJsonSerializableWriteMode() public enum WriteMode { /** - * common mode for new table or existing table (both new and existing partition) + * common mode for new table or existing table (both new and existing partition) and when staging directory is enabled */ STAGE_AND_MOVE_TO_TARGET_DIRECTORY(false), /** - * for new table in S3 + * for new table in S3 or when staging directory is disabled */ DIRECT_TO_TARGET_NEW_DIRECTORY(true), /** - * for existing table in S3 (both new and existing partition) + * for existing table in S3 (both new and existing partition) or when staging directory is disabled */ DIRECT_TO_TARGET_EXISTING_DIRECTORY(true), /**/; diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java index d898253a1993..0b0b53b3f57a 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveClientConfig.java @@ -115,7 +115,8 @@ public void testDefaults() .setCollectColumnStatisticsOnWrite(false) .setCollectColumnStatisticsOnWrite(false) .setS3SelectPushdownEnabled(false) - .setS3SelectPushdownMaxConnections(500)); + .setS3SelectPushdownMaxConnections(500) + .setTemporaryStagingDirectoryEnabled(true)); } @Test @@ -199,6 +200,7 @@ public void testExplicitPropertyMappings() .put("hive.collect-column-statistics-on-write", "true") .put("hive.s3select-pushdown.enabled", "true") .put("hive.s3select-pushdown.max-connections", "1234") + .put("hive.temporary-staging-directory-enabled", "false") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -279,7 +281,8 @@ public void testExplicitPropertyMappings() .setCollectColumnStatisticsOnWrite(true) .setCollectColumnStatisticsOnWrite(true) .setS3SelectPushdownEnabled(true) - .setS3SelectPushdownMaxConnections(1234); + .setS3SelectPushdownMaxConnections(1234) + .setTemporaryStagingDirectoryEnabled(false); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java index d4f3167955f1..4b1137f2cfb4 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java @@ -20,6 +20,7 @@ import io.prestosql.Session; import io.prestosql.connector.ConnectorId; import io.prestosql.cost.StatsAndCosts; +import io.prestosql.metadata.InsertTableHandle; import io.prestosql.metadata.Metadata; import io.prestosql.metadata.QualifiedObjectName; import io.prestosql.metadata.TableHandle; @@ -3550,6 +3551,37 @@ public void testPrunePartitionFailure() assertUpdate("DROP TABLE test_prune_failure"); } + @Test + public void testTemporaryStagingDirectorySessionProperty() + { + String tableName = "test_temporary_staging_directory_session_property"; + assertUpdate(format("CREATE TABLE %s(i int)", tableName)); + + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("hive", "temporary_staging_directory_enabled", "false") + .build(); + + HiveInsertTableHandle hiveInsertTableHandle = getHiveInsertTableHandle(session, tableName); + assertEquals(hiveInsertTableHandle.getLocationHandle().getWritePath(), hiveInsertTableHandle.getLocationHandle().getTargetPath()); + + assertUpdate("DROP TABLE " + tableName); + } + + private HiveInsertTableHandle getHiveInsertTableHandle(Session session, String tableName) + { + Metadata metadata = ((DistributedQueryRunner) getQueryRunner()).getCoordinator().getMetadata(); + return transaction(getQueryRunner().getTransactionManager(), getQueryRunner().getAccessControl()) + .execute(session, transactionSession -> { + QualifiedObjectName objectName = new QualifiedObjectName(catalog, TPCH_SCHEMA, tableName); + Optional handle = metadata.getTableHandle(transactionSession, objectName); + InsertTableHandle insertTableHandle = metadata.beginInsert(transactionSession, handle.get()); + HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) insertTableHandle.getConnectorHandle(); + + metadata.finishInsert(transactionSession, insertTableHandle, ImmutableList.of(), ImmutableList.of()); + return hiveInsertTableHandle; + }); + } + private Session getParallelWriteSession() { return Session.builder(getSession())