From 57e48d2e78f35da01b6e1bd5a657dc79c2672ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 12 Oct 2021 22:04:27 +0200 Subject: [PATCH] Add Hive OPTIMIZE table procedure Add support for compacting small files for non-transactional, non-bucketed Hive tables. ALTER TABLE xxxxx EXECUTE OPTIMIZE WITH(file_size_threshold = ...) --- .../hive/BackgroundHiveSplitLoader.java | 14 +- .../io/trino/plugin/hive/HiveConnector.java | 10 + .../trino/plugin/hive/HiveHandleResolver.java | 7 + .../plugin/hive/HiveLocationService.java | 8 + .../io/trino/plugin/hive/HiveMetadata.java | 238 +++++++++++++++++- .../plugin/hive/HivePageSinkProvider.java | 8 + .../plugin/hive/HivePartitionManager.java | 4 +- .../trino/plugin/hive/HiveSplitManager.java | 12 +- .../io/trino/plugin/hive/HiveSplitSource.java | 33 ++- .../plugin/hive/HiveTableExecuteHandle.java | 110 ++++++++ .../io/trino/plugin/hive/HiveTableHandle.java | 92 ++++++- .../hive/InternalHiveConnectorFactory.java | 3 + .../io/trino/plugin/hive/LocationService.java | 2 + .../SemiTransactionalHiveMetastore.java | 27 +- .../hive/procedure/HiveProcedureModule.java | 4 + .../procedure/OptimizeTableProcedure.java | 43 ++++ .../hive/util/InternalHiveSplitFactory.java | 9 +- .../hive/TestBackgroundHiveSplitLoader.java | 9 +- .../plugin/hive/TestHiveConnectorTest.java | 172 +++++++++++++ .../plugin/hive/TestHiveSplitSource.java | 21 +- 20 files changed, 791 insertions(+), 35 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/OptimizeTableProcedure.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java index fecc6d4bb132..597707953b2f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java @@ -164,6 +164,7 @@ public class BackgroundHiveSplitLoader private final ConcurrentLazyQueue partitions; private final Deque> fileIterators = new ConcurrentLinkedDeque<>(); private final Optional validWriteIds; + private final Optional maxSplitFileSize; // Purpose of this lock: // * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource. @@ -204,7 +205,8 @@ public BackgroundHiveSplitLoader( boolean recursiveDirWalkerEnabled, boolean ignoreAbsentPartitions, boolean optimizeSymlinkListing, - Optional validWriteIds) + Optional validWriteIds, + Optional maxSplitFileSize) { this.table = table; this.transaction = requireNonNull(transaction, "transaction is null"); @@ -226,6 +228,7 @@ public BackgroundHiveSplitLoader( this.partitions = new ConcurrentLazyQueue<>(partitions); this.hdfsContext = new HdfsContext(session); this.validWriteIds = requireNonNull(validWriteIds, "validWriteIds is null"); + this.maxSplitFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null"); } @Override @@ -465,7 +468,8 @@ private ListenableFuture loadPartition(HivePartitionMetadata partition) getMaxInitialSplitSize(session), isForceLocalScheduling(session), s3SelectPushdownEnabled, - transaction); + transaction, + maxSplitFileSize); // To support custom input formats, we want to call getSplits() // on the input format to obtain file splits. @@ -653,7 +657,8 @@ private ListenableFuture createHiveSymlinkSplits( getMaxInitialSplitSize(session), isForceLocalScheduling(session), s3SelectPushdownEnabled, - transaction); + transaction, + maxSplitFileSize); lastResult = addSplitsToSource(targetSplits, splitFactory); if (stopped) { return COMPLETED_FUTURE; @@ -709,7 +714,8 @@ Optional> buildManifestFileIterator( getMaxInitialSplitSize(session), isForceLocalScheduling(session), s3SelectPushdownEnabled, - transaction); + transaction, + maxSplitFileSize); return Optional.of(locatedFileStatuses.stream() .map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), splittable, Optional.empty())) .filter(Optional::isPresent) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java index c11dce46aeff..8c1f2f883dad 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnector.java @@ -29,6 +29,7 @@ import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.procedure.Procedure; import io.trino.spi.session.PropertyMetadata; @@ -55,6 +56,7 @@ public class HiveConnector private final ConnectorNodePartitioningProvider nodePartitioningProvider; private final Set systemTables; private final Set procedures; + private final Set tableProcedures; private final Set eventListeners; private final List> sessionProperties; private final List> schemaProperties; @@ -77,6 +79,7 @@ public HiveConnector( ConnectorNodePartitioningProvider nodePartitioningProvider, Set systemTables, Set procedures, + Set tableProcedures, Set eventListeners, Set sessionPropertiesProviders, List> schemaProperties, @@ -95,6 +98,7 @@ public HiveConnector( this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null")); this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null")); + this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null")); this.eventListeners = ImmutableSet.copyOf(requireNonNull(eventListeners, "eventListeners is null")); this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream() .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream()) @@ -241,4 +245,10 @@ public final void shutdown() { lifeCycleManager.stop(); } + + @Override + public Set getTableProcedures() + { + return tableProcedures; + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveHandleResolver.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveHandleResolver.java index f02a499af864..f7a889e524d0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveHandleResolver.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveHandleResolver.java @@ -19,6 +19,7 @@ import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -55,6 +56,12 @@ public Class getInsertTableHandleClass() return HiveInsertTableHandle.class; } + @Override + public Class getTableExecuteHandleClass() + { + return HiveTableExecuteHandle.class; + } + @Override public Class getTransactionHandleClass() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java index 3e7f03f1bfe6..78d84c4f2106 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java @@ -88,6 +88,14 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, } } + @Override + public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table) + { + // For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute + Path targetPath = new Path(table.getStorage().getLocation()); + return new LocationHandle(targetPath, targetPath, true, DIRECT_TO_TARGET_EXISTING_DIRECTORY); + } + private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional externalLocation) { return isTemporaryStagingDirectoryEnabled(session) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 7481228d17c7..703cd3ba24d9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -25,7 +25,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; import io.airlift.slice.Slice; +import io.airlift.units.DataSize; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; @@ -45,6 +47,7 @@ import io.trino.plugin.hive.metastore.SortingColumn; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.procedure.OptimizeTableProcedure; import io.trino.plugin.hive.security.AccessControlMetadata; import io.trino.plugin.hive.statistics.HiveStatisticsProvider; import io.trino.plugin.hive.util.HiveBucketing; @@ -55,6 +58,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.CatalogSchemaName; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -65,6 +69,7 @@ import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTablePartitioning; @@ -124,6 +129,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -222,6 +228,7 @@ import static io.trino.plugin.hive.HiveTableProperties.getPartitionedBy; import static io.trino.plugin.hive.HiveTableProperties.getSingleCharacterProperty; import static io.trino.plugin.hive.HiveTableProperties.isTransactional; +import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS; import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.hive.HiveType.toHiveType; import static io.trino.plugin.hive.HiveWriterFactory.computeBucketedFileName; @@ -260,6 +267,7 @@ import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery; import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem; import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType; +import static io.trino.plugin.hive.util.RetryDriver.retry; import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD; import static io.trino.plugin.hive.util.Statistics.createComputedStatisticsToPartitionMap; import static io.trino.plugin.hive.util.Statistics.createEmptyPartitionStatistics; @@ -292,6 +300,8 @@ public class HiveMetadata implements TransactionalMetadata { + private static final Logger log = Logger.get(HiveMetadata.class); + public static final String PRESTO_VERSION_NAME = "presto_version"; public static final String TRINO_CREATED_BY = "trino_created_by"; public static final String PRESTO_QUERY_ID_NAME = "presto_query_id"; @@ -1916,6 +1926,230 @@ private static Map getColumnStatistics(Map getTableHandleForExecute(ConnectorSession session, ConnectorTableHandle tableHandle, String procedureName, Map executeProperties) + { + if (procedureName.equals(OptimizeTableProcedure.NAME)) { + return getTableHandleForOptimize(session, tableHandle, executeProperties); + } + throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'"); + } + + private Optional getTableHandleForOptimize(ConnectorSession session, ConnectorTableHandle tableHandle, Map executeProperties) + { + // TODO lots of that is copied from beginInsert; rafactoring opportunity + + HiveIdentity identity = new HiveIdentity(session); + HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; + SchemaTableName tableName = hiveTableHandle.getSchemaTableName(); + + Table table = metastore.getTable(identity, tableName.getSchemaName(), tableName.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName)); + + checkTableIsWritable(table, writesToNonManagedTablesEnabled); + + for (Column column : table.getDataColumns()) { + if (!isWritableType(column.getType())) { + throw new TrinoException(NOT_SUPPORTED, format("Optimizing Hive table %s with column type %s not supported", tableName, column.getType())); + } + } + + if (isTransactionalTable(table.getParameters())) { + throw new TrinoException(NOT_SUPPORTED, format("Optimizing transactional Hive table %s is not supported", tableName)); + } + + if (table.getStorage().getBucketProperty().isPresent()) { + throw new TrinoException(NOT_SUPPORTED, format("Optimizing bucketed Hive table %s is not supported", tableName)); + } + + // TODO forcing NANOSECONDS precision here so we do not loose data. In future we may be smarter; options: + // - respect timestamp_precision but recognize situation when rounding occurs, and fail query + // - detect data's precision and maintain it + List columns = hiveColumnHandles(table, typeManager, NANOSECONDS).stream() + .filter(columnHandle -> !columnHandle.isHidden()) + .collect(toImmutableList()); + + HiveStorageFormat tableStorageFormat = extractHiveStorageFormat(table); + Optional.ofNullable(table.getParameters().get(SKIP_HEADER_COUNT_KEY)).map(Integer::parseInt).ifPresent(headerSkipCount -> { + if (headerSkipCount > 1) { + throw new TrinoException(NOT_SUPPORTED, format("Optimizing Hive table %s with value of %s property greater than 1 is not supported", tableName, SKIP_HEADER_COUNT_KEY)); + } + }); + + if (table.getParameters().containsKey(SKIP_FOOTER_COUNT_KEY)) { + throw new TrinoException(NOT_SUPPORTED, format("Optimizing Hive table %s with %s property not supported", tableName, SKIP_FOOTER_COUNT_KEY)); + } + LocationHandle locationHandle = locationService.forOptimize(metastore, session, table); + + DataSize fileSizeThreshold = (DataSize) executeProperties.get("file_size_threshold"); + + return Optional.of(new HiveTableExecuteHandle( + OptimizeTableProcedure.NAME, + Optional.empty(), + Optional.of(fileSizeThreshold.toBytes()), + tableName.getSchemaName(), + tableName.getTableName(), + columns, + metastore.generatePageSinkMetadata(identity, tableName), + locationHandle, + table.getStorage().getBucketProperty(), + tableStorageFormat, + // TODO: test with multiple partitions using different storage format + tableStorageFormat, + NO_ACID_TRANSACTION)); + } + + @Override + public BeginTableExecuteResult beginTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorTableHandle updatedSourceTableHandle) + { + String procedureName = ((HiveTableExecuteHandle) tableExecuteHandle).getProcedureName(); + + if (procedureName.equals(OptimizeTableProcedure.NAME)) { + return beginOptimize(session, tableExecuteHandle, updatedSourceTableHandle); + } + throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'"); + } + + private BeginTableExecuteResult beginOptimize(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorTableHandle sourceTableHandle) + { + HiveTableExecuteHandle hiveExecuteHandle = (HiveTableExecuteHandle) tableExecuteHandle; + HiveTableHandle hiveSourceTableHandle = (HiveTableHandle) sourceTableHandle; + + WriteInfo writeInfo = locationService.getQueryWriteInfo(hiveExecuteHandle.getLocationHandle()); + String writeDeclarationId = metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), hiveExecuteHandle.getSchemaTableName()); + + return new BeginTableExecuteResult<>( + hiveExecuteHandle + .withWriteDeclarationId(writeDeclarationId), + hiveSourceTableHandle + .withMaxScannedFileSize(hiveExecuteHandle.getMaxScannedFileSize()) + .withRecordScannedFiles(true)); + } + + @Override + public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) + { + String procedureName = ((HiveTableExecuteHandle) tableExecuteHandle).getProcedureName(); + + if (procedureName.equals(OptimizeTableProcedure.NAME)) { + finishOptimize(session, tableExecuteHandle, fragments, splitSourceInfo); + return; + } + throw new IllegalArgumentException("Unknown procedure '" + procedureName + "'"); + } + + private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List splitSourceInfo) + { + // TODO lots of that is copied from finishInsert; rafactoring opportunity + + HiveTableExecuteHandle handle = (HiveTableExecuteHandle) tableExecuteHandle; + checkArgument(handle.getWriteDeclarationId().isPresent(), "no write declaration id present in tableExecuteHandle"); + + List partitionUpdates = fragments.stream() + .map(Slice::getBytes) + .map(partitionUpdateCodec::fromJson) + .collect(toImmutableList()); + + HiveStorageFormat tableStorageFormat = handle.getTableStorageFormat(); + partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); + + Table table = metastore.getTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(handle.getSchemaTableName())); + if (!table.getStorage().getStorageFormat().getInputFormat().equals(tableStorageFormat.getInputFormat()) && isRespectTableFormat(session)) { + throw new TrinoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Table format changed during optimize"); + } + + // Support for bucketed tables disabled mostly so we do not need to think about grouped execution in an initial version. Possibly no change apart from testing required. + verify(handle.getBucketProperty().isEmpty(), "bucketed table not supported"); + + for (PartitionUpdate partitionUpdate : partitionUpdates) { + verify(partitionUpdate.getUpdateMode() == APPEND, "Expected partionUpdate mode to be APPEND but got %s", partitionUpdate.getUpdateMode()); // sanity check + + if (partitionUpdate.getName().isEmpty()) { + // operating on an unpartitioned table + if (!table.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) { + throw new TrinoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Table format changed during optimize"); + } + + metastore.finishInsertIntoExistingTable( + session, + handle.getSchemaName(), + handle.getTableName(), + partitionUpdate.getWritePath(), + partitionUpdate.getFileNames(), + PartitionStatistics.empty()); + } + else { + // operating on a partition + List partitionValues = toPartitionValues(partitionUpdate.getName()); + metastore.finishInsertIntoExistingPartition( + session, + handle.getSchemaName(), + handle.getTableName(), + partitionValues, + partitionUpdate.getWritePath(), + partitionUpdate.getFileNames(), + PartitionStatistics.empty()); + } + } + + // get filesystem + FileSystem fs; + try { + fs = hdfsEnvironment.getFileSystem(new HdfsContext(session), new Path(table.getStorage().getLocation())); + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, e); + } + + // path to be deleted + Set scannedPaths = splitSourceInfo.stream() + .map(file -> new Path((String) file)) + .collect(toImmutableSet()); + // track remaining files to be delted for error reporting + Set remainingFilesToDelete = new HashSet<>(scannedPaths); + + // delete loop + boolean someDeleted = false; + Optional firstScannedPath = Optional.empty(); + try { + for (Path scannedPath : scannedPaths) { + if (firstScannedPath.isEmpty()) { + firstScannedPath = Optional.of(scannedPath); + } + retry().run("delete " + scannedPath, () -> fs.delete(scannedPath, false)); + remainingFilesToDelete.remove(scannedPath); + } + } + catch (Exception e) { + if (!someDeleted && (firstScannedPath.isEmpty() || exists(fs, firstScannedPath.get()))) { + // we are good - we did not delete any source files so we can just throw error and allow rollback to happend + // if someDeleted flag is false we do extra checkig if first file we tried to delete is still there. There is a chance that + // fs.delete above could throw exception but file was actually deleted. + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting original files", e); + } + + // If we already deleted some original files we disable rollback routine so written files are not deleted. + // The reported exception message and log entry lists files which need to be cleaned up by user manually. + // Until table is cleaned up there will duplicate rows present. + metastore.dropDeclaredIntentionToWrite(handle.getWriteDeclarationId().get()); + String errorMessage = "Error while deleting data files in FINISH phase of OPTIMIZE for table " + table.getTableName() + "; remaining files need to be deleted manually: " + remainingFilesToDelete; + log.error(e, errorMessage); + throw new TrinoException(HIVE_FILESYSTEM_ERROR, errorMessage, e); + } + } + + private boolean exists(FileSystem fs, Path path) + { + try { + return fs.exists(path); + } + catch (IOException e) { + // on failure pessimistically assume file does not exist + return false; + } + } + @Override public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace) { @@ -2520,7 +2754,9 @@ public ConnectorTableHandle makeCompatiblePartitioning(ConnectorSession session, hiveTable.getAnalyzeColumnNames(), Optional.empty(), Optional.empty(), // Projected columns is used only during optimization phase of planning - hiveTable.getTransaction()); + hiveTable.getTransaction(), + hiveTable.isRecordScannedFiles(), + hiveTable.getMaxScannedFileSize()); } @VisibleForTesting diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java index 2588be4b8898..d632c2a13c3a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java @@ -32,6 +32,7 @@ import io.trino.spi.connector.ConnectorPageSink; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.type.TypeManager; import org.joda.time.DateTimeZone; @@ -121,6 +122,13 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transaction, return createPageSink(handle, false, session, ImmutableMap.of() /* for insert properties are taken from metastore */); } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + HiveTableExecuteHandle handle = (HiveTableExecuteHandle) tableExecuteHandle; + return createPageSink(handle, false, session, ImmutableMap.of()); + } + private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean isCreateTable, ConnectorSession session, Map additionalTableParameters) { OptionalInt bucketCount = OptionalInt.empty(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java index 7f8fc751cb0a..beb34811f4ce 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePartitionManager.java @@ -193,7 +193,9 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio handle.getAnalyzeColumnNames(), Optionals.combine(handle.getConstraintColumns(), columns, Sets::union), handle.getProjectedColumns(), - handle.getTransaction()); + handle.getTransaction(), + handle.isRecordScannedFiles(), + handle.getMaxScannedFileSize()); } public List getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveIdentity identity, HiveTableHandle table) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index b26a670db96f..9b863a13976e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -207,6 +207,9 @@ public ConnectorSplitSource getSplits( // short circuit if we don't have any partitions if (partitions.isEmpty()) { + if (hiveTable.isRecordScannedFiles()) { + return new FixedSplitSource(ImmutableList.of(), ImmutableList.of()); + } return new FixedSplitSource(ImmutableList.of()); } @@ -245,7 +248,8 @@ public ConnectorSplitSource getSplits( !hiveTable.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session), isOptimizeSymlinkListing(session), metastore.getValidWriteIds(session, hiveTable) - .map(validTxnWriteIdList -> validTxnWriteIdList.getTableValidWriteIdList(table.getDatabaseName() + "." + table.getTableName()))); + .map(validTxnWriteIdList -> validTxnWriteIdList.getTableValidWriteIdList(table.getDatabaseName() + "." + table.getTableName())), + hiveTable.getMaxScannedFileSize()); HiveSplitSource splitSource; switch (splitSchedulingStrategy) { @@ -260,7 +264,8 @@ public ConnectorSplitSource getSplits( maxSplitsPerSecond, hiveSplitLoader, executor, - highMemorySplitSourceCounter); + highMemorySplitSourceCounter, + hiveTable.isRecordScannedFiles()); break; case GROUPED_SCHEDULING: splitSource = HiveSplitSource.bucketed( @@ -273,7 +278,8 @@ public ConnectorSplitSource getSplits( maxSplitsPerSecond, hiveSplitLoader, executor, - highMemorySplitSourceCounter); + highMemorySplitSourceCounter, + hiveTable.isRecordScannedFiles()); break; default: throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingStrategy); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java index 379bcdeb5931..4a5a458f4d1b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java @@ -34,6 +34,7 @@ import java.io.FileNotFoundException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -93,6 +94,9 @@ class HiveSplitSource private final AtomicBoolean loggedHighMemoryWarning = new AtomicBoolean(); private final HiveSplitWeightProvider splitWeightProvider; + private final boolean recordScannedFiles; + private final ImmutableList.Builder scannedFilePaths = ImmutableList.builder(); + private HiveSplitSource( ConnectorSession session, String databaseName, @@ -102,7 +106,8 @@ private HiveSplitSource( DataSize maxOutstandingSplitsSize, HiveSplitLoader splitLoader, AtomicReference stateReference, - CounterStat highMemorySplitSourceCounter) + CounterStat highMemorySplitSourceCounter, + boolean recordScannedFiles) { requireNonNull(session, "session is null"); this.queryId = session.getQueryId(); @@ -119,6 +124,7 @@ private HiveSplitSource( this.remainingInitialSplits = new AtomicInteger(maxInitialSplits); this.numberOfProcessedSplits = new AtomicLong(0); this.splitWeightProvider = isSizeBasedSplitWeightsEnabled(session) ? new SizeBasedSplitWeightProvider(getMinimumAssignedSplitWeight(session), maxSplitSize) : HiveSplitWeightProvider.uniformStandardWeightProvider(); + this.recordScannedFiles = recordScannedFiles; } public static HiveSplitSource allAtOnce( @@ -131,7 +137,8 @@ public static HiveSplitSource allAtOnce( int maxSplitsPerSecond, HiveSplitLoader splitLoader, Executor executor, - CounterStat highMemorySplitSourceCounter) + CounterStat highMemorySplitSourceCounter, + boolean recordScannedFiles) { AtomicReference stateReference = new AtomicReference<>(State.initial()); return new HiveSplitSource( @@ -173,7 +180,8 @@ public boolean isFinished(OptionalInt bucketNumber) maxOutstandingSplitsSize, splitLoader, stateReference, - highMemorySplitSourceCounter); + highMemorySplitSourceCounter, + recordScannedFiles); } public static HiveSplitSource bucketed( @@ -186,7 +194,8 @@ public static HiveSplitSource bucketed( int maxSplitsPerSecond, HiveSplitLoader splitLoader, Executor executor, - CounterStat highMemorySplitSourceCounter) + CounterStat highMemorySplitSourceCounter, + boolean recordScannedFiles) { AtomicReference stateReference = new AtomicReference<>(State.initial()); return new HiveSplitSource( @@ -248,7 +257,8 @@ public AsyncQueue queueFor(OptionalInt bucketNumber) maxOutstandingSplitsSize, splitLoader, stateReference, - highMemorySplitSourceCounter); + highMemorySplitSourceCounter, + recordScannedFiles); } /** @@ -412,6 +422,9 @@ else if (maxSplitBytes * 2 >= remainingBlockBytes) { ListenableFuture transform = Futures.transform(future, splits -> { requireNonNull(splits, "splits is null"); + if (recordScannedFiles) { + splits.forEach(split -> scannedFilePaths.add(((HiveSplit) split).getPath())); + } if (noMoreSplits) { // Checking splits.isEmpty() here is required for thread safety. // Let's say there are 10 splits left, and max number of splits per batch is 5. @@ -452,6 +465,16 @@ public boolean isFinished() } } + @Override + public Optional> getTableExecuteSplitsInfo() + { + checkState(isFinished(), "HiveSplitSource must be finished before TableExecuteSplitsInfo is read"); + if (!recordScannedFiles) { + return Optional.empty(); + } + return Optional.of(scannedFilePaths.build()); + } + @Override public void close() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java new file mode 100644 index 000000000000..543e10e770f8 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableExecuteHandle.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.plugin.hive.metastore.HivePageSinkMetadata; +import io.trino.spi.connector.ConnectorTableExecuteHandle; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class HiveTableExecuteHandle + extends HiveWritableTableHandle + implements ConnectorTableExecuteHandle +{ + private final String procedureName; + private final Optional writeDeclarationId; + private final Optional maxScannedFileSize; + + @JsonCreator + public HiveTableExecuteHandle( + @JsonProperty("procedureName") String procedureName, + @JsonProperty("writeDeclarationId") Optional writeDeclarationId, + @JsonProperty("maxScannedFileSize") Optional maxScannedFileSize, + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata, + @JsonProperty("locationHandle") LocationHandle locationHandle, + @JsonProperty("bucketProperty") Optional bucketProperty, + @JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat, + @JsonProperty("partitionStorageFormat") HiveStorageFormat partitionStorageFormat, + @JsonProperty("transaction") AcidTransaction transaction) + { + super( + schemaName, + tableName, + inputColumns, + pageSinkMetadata, + locationHandle, + bucketProperty, + tableStorageFormat, + partitionStorageFormat, + transaction); + + // todo to be added soon + verify(bucketProperty.isEmpty(), "bucketed tables not supported yet"); + + this.procedureName = requireNonNull(procedureName, "procedureName is null"); + this.writeDeclarationId = requireNonNull(writeDeclarationId, "writeDeclarationId is null"); + this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); + } + + @JsonProperty + public String getProcedureName() + { + return procedureName; + } + + @JsonProperty + public Optional getWriteDeclarationId() + { + return writeDeclarationId; + } + + @JsonProperty + public Optional getMaxScannedFileSize() + { + return maxScannedFileSize; + } + + public HiveTableExecuteHandle withWriteDeclarationId(String writeDeclarationId) + { + return new HiveTableExecuteHandle( + procedureName, + Optional.of(writeDeclarationId), + maxScannedFileSize, + getSchemaName(), + getTableName(), + getInputColumns(), + getPageSinkMetadata(), + getLocationHandle(), + getBucketProperty(), + getTableStorageFormat(), + getPartitionStorageFormat(), + getTransaction()); + } + + @Override + public String toString() + { + return procedureName + "(" + getSchemaName() + "." + getTableName() + ")"; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java index a5185c57c726..84ef23315525 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveTableHandle.java @@ -56,6 +56,8 @@ public class HiveTableHandle private final Optional> constraintColumns; private final Optional> projectedColumns; private final AcidTransaction transaction; + private final boolean recordScannedFiles; + private final Optional maxScannedFileSize; @JsonCreator public HiveTableHandle( @@ -86,7 +88,9 @@ public HiveTableHandle( analyzeColumnNames, Optional.empty(), Optional.empty(), - transaction); + transaction, + false, + Optional.empty()); } public HiveTableHandle( @@ -112,7 +116,9 @@ public HiveTableHandle( Optional.empty(), Optional.empty(), Optional.empty(), - NO_ACID_TRANSACTION); + NO_ACID_TRANSACTION, + false, + Optional.empty()); } public HiveTableHandle( @@ -130,7 +136,9 @@ public HiveTableHandle( Optional> analyzeColumnNames, Optional> constraintColumns, Optional> projectedColumns, - AcidTransaction transaction) + AcidTransaction transaction, + boolean recordScannedFiles, + Optional maxSplitFileSize) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -147,6 +155,8 @@ public HiveTableHandle( this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null"); this.projectedColumns = requireNonNull(projectedColumns, "projectedColumns is null"); this.transaction = requireNonNull(transaction, "transaction is null"); + this.recordScannedFiles = recordScannedFiles; + this.maxScannedFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null"); } public HiveTableHandle withAnalyzePartitionValues(List> analyzePartitionValues) @@ -166,7 +176,9 @@ public HiveTableHandle withAnalyzePartitionValues(List> analyzePart analyzeColumnNames, constraintColumns, projectedColumns, - transaction); + transaction, + recordScannedFiles, + maxScannedFileSize); } public HiveTableHandle withAnalyzeColumnNames(Set analyzeColumnNames) @@ -186,7 +198,9 @@ public HiveTableHandle withAnalyzeColumnNames(Set analyzeColumnNames) Optional.of(analyzeColumnNames), constraintColumns, projectedColumns, - transaction); + transaction, + recordScannedFiles, + maxScannedFileSize); } public HiveTableHandle withTransaction(AcidTransaction transaction) @@ -206,7 +220,9 @@ public HiveTableHandle withTransaction(AcidTransaction transaction) analyzeColumnNames, constraintColumns, projectedColumns, - transaction); + transaction, + recordScannedFiles, + maxScannedFileSize); } public HiveTableHandle withUpdateProcessor(AcidTransaction transaction, HiveUpdateProcessor updateProcessor) @@ -227,7 +243,9 @@ public HiveTableHandle withUpdateProcessor(AcidTransaction transaction, HiveUpda analyzeColumnNames, constraintColumns, projectedColumns, - transaction); + transaction, + recordScannedFiles, + maxScannedFileSize); } public HiveTableHandle withProjectedColumns(Set projectedColumns) @@ -247,7 +265,53 @@ public HiveTableHandle withProjectedColumns(Set projectedColumns) analyzeColumnNames, constraintColumns, Optional.of(projectedColumns), - transaction); + transaction, + recordScannedFiles, + maxScannedFileSize); + } + + public HiveTableHandle withRecordScannedFiles(boolean recordScannedFiles) + { + return new HiveTableHandle( + schemaName, + tableName, + tableParameters, + partitionColumns, + dataColumns, + partitions, + compactEffectivePredicate, + enforcedConstraint, + bucketHandle, + bucketFilter, + analyzePartitionValues, + analyzeColumnNames, + constraintColumns, + projectedColumns, + transaction, + recordScannedFiles, + maxScannedFileSize); + } + + public HiveTableHandle withMaxScannedFileSize(Optional maxScannedFileSize) + { + return new HiveTableHandle( + schemaName, + tableName, + tableParameters, + partitionColumns, + dataColumns, + partitions, + compactEffectivePredicate, + enforcedConstraint, + bucketHandle, + bucketFilter, + analyzePartitionValues, + analyzeColumnNames, + constraintColumns, + projectedColumns, + transaction, + recordScannedFiles, + maxScannedFileSize); } @JsonProperty @@ -387,6 +451,18 @@ public long getWriteId() return transaction.getWriteId(); } + @JsonIgnore + public boolean isRecordScannedFiles() + { + return recordScannedFiles; + } + + @JsonIgnore + public Optional getMaxScannedFileSize() + { + return maxScannedFileSize; + } + @Override public boolean equals(Object o) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java index 0bfd5a8e880c..39efaeb050a4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/InternalHiveConnectorFactory.java @@ -58,6 +58,7 @@ import io.trino.spi.connector.ConnectorPageSourceProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.eventlistener.EventListener; import io.trino.spi.procedure.Procedure; import org.weakref.jmx.guice.MBeanModule; @@ -133,6 +134,7 @@ public static Connector createConnector(String catalogName, Map HiveMaterializedViewPropertiesProvider hiveMaterializedViewPropertiesProvider = injector.getInstance(HiveMaterializedViewPropertiesProvider.class); ConnectorAccessControl accessControl = new ClassLoaderSafeConnectorAccessControl(injector.getInstance(SystemTableAwareAccessControl.class), classLoader); Set procedures = injector.getInstance(Key.get(new TypeLiteral>() {})); + Set tableProcedures = injector.getInstance(Key.get(new TypeLiteral>() {})); Set systemTables = injector.getInstance(Key.get(new TypeLiteral>() {})); Set eventListeners = injector.getInstance(Key.get(new TypeLiteral>() {})) .stream() @@ -149,6 +151,7 @@ public static Connector createConnector(String catalogName, Map new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader), systemTables, procedures, + tableProcedures, eventListeners, sessionPropertiesProviders, HiveSchemaProperties.SCHEMA_PROPERTIES, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java index 74e4e6a7f2f8..2dab8e14dfed 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java @@ -29,6 +29,8 @@ public interface LocationService LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table); + LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table); + /** * targetPath and writePath will be root directory of all partition and table paths * that may be returned by {@link #getTableWriteInfo(LocationHandle, boolean)} and {@link #getPartitionWriteInfo(LocationHandle, Optional, String)} method. diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index c57ea719b7ff..3fd17b67ba4f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -142,6 +142,8 @@ public class SemiTransactionalHiveMetastore @GuardedBy("this") private final Map, Action>> partitionActions = new HashMap<>(); @GuardedBy("this") + private long declaredIntentionsToWriteCounter; + @GuardedBy("this") private final List declaredIntentionsToWrite = new ArrayList<>(); @GuardedBy("this") private ExclusiveOperation bufferedExclusiveOperation; @@ -1138,7 +1140,7 @@ public synchronized void revokeTablePrivileges(HiveIdentity identity, String dat setExclusive((delegate, hdfsEnvironment) -> delegate.revokeTablePrivileges(databaseName, tableName, getTableOwner(identity, databaseName, tableName), grantee, privileges)); } - public synchronized void declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Path stagingPathRoot, SchemaTableName schemaTableName) + public synchronized String declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Path stagingPathRoot, SchemaTableName schemaTableName) { setShared(); if (writeMode == WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY) { @@ -1149,7 +1151,19 @@ public synchronized void declareIntentionToWrite(ConnectorSession session, Write } HdfsContext hdfsContext = new HdfsContext(session); HiveIdentity identity = new HiveIdentity(session); - declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, hdfsContext, identity, session.getQueryId(), stagingPathRoot, schemaTableName)); + String queryId = session.getQueryId(); + String declarationId = queryId + "_" + declaredIntentionsToWriteCounter; + declaredIntentionsToWriteCounter++; + declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(declarationId, writeMode, hdfsContext, identity, queryId, stagingPathRoot, schemaTableName)); + return declarationId; + } + + public synchronized void dropDeclaredIntentionToWrite(String declarationId) + { + boolean removed = declaredIntentionsToWrite.removeIf(intention -> intention.getDeclarationId().equals(declarationId)); + if (!removed) { + throw new IllegalArgumentException("Declaration with id " + declarationId + " not found"); + } } public boolean isFinished() @@ -2865,6 +2879,7 @@ public String toString() private static class DeclaredIntentionToWrite { + private final String declarationId; private final WriteMode mode; private final HdfsContext hdfsContext; private final HiveIdentity identity; @@ -2872,8 +2887,9 @@ private static class DeclaredIntentionToWrite private final Path rootPath; private final SchemaTableName schemaTableName; - public DeclaredIntentionToWrite(WriteMode mode, HdfsContext hdfsContext, HiveIdentity identity, String queryId, Path stagingPathRoot, SchemaTableName schemaTableName) + public DeclaredIntentionToWrite(String declarationId, WriteMode mode, HdfsContext hdfsContext, HiveIdentity identity, String queryId, Path stagingPathRoot, SchemaTableName schemaTableName) { + this.declarationId = requireNonNull(declarationId, "declarationId is null"); this.mode = requireNonNull(mode, "mode is null"); this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); this.identity = requireNonNull(identity, "identity is null"); @@ -2882,6 +2898,11 @@ public DeclaredIntentionToWrite(WriteMode mode, HdfsContext hdfsContext, HiveIde this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); } + public String getDeclarationId() + { + return declarationId; + } + public WriteMode getMode() { return mode; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/HiveProcedureModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/HiveProcedureModule.java index 7e1aa2090eb9..465d4d70ebe0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/HiveProcedureModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/HiveProcedureModule.java @@ -17,6 +17,7 @@ import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.multibindings.Multibinder; +import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.procedure.Procedure; import static com.google.inject.multibindings.Multibinder.newSetBinder; @@ -33,5 +34,8 @@ public void configure(Binder binder) procedures.addBinding().toProvider(UnregisterPartitionProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(SyncPartitionMetadataProcedure.class).in(Scopes.SINGLETON); procedures.addBinding().toProvider(DropStatsProcedure.class).in(Scopes.SINGLETON); + + Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); + tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/OptimizeTableProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/OptimizeTableProcedure.java new file mode 100644 index 000000000000..56229bface78 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/OptimizeTableProcedure.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.procedure; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import io.trino.spi.connector.TableProcedureMetadata; + +import javax.inject.Provider; + +import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; +import static io.trino.spi.connector.TableProcedureExecutionMode.distributedWithFilteringAndRepartitioning; + +public class OptimizeTableProcedure + implements Provider +{ + public static final String NAME = "OPTIMIZE"; + + @Override + public TableProcedureMetadata get() + { + return new TableProcedureMetadata( + NAME, + distributedWithFilteringAndRepartitioning(), + ImmutableList.of( + dataSizeProperty( + "file_size_threshold", + "Only compact files smaller than given threshold in bytes", + DataSize.of(100, DataSize.Unit.MEGABYTE), + false))); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java index d3e69503609a..2c006f54fd38 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java @@ -69,6 +69,7 @@ public class InternalHiveSplitFactory private final Optional bucketConversion; private final Optional bucketValidation; private final long minimumTargetSplitSizeInBytes; + private final Optional maxSplitFileSize; private final boolean forceLocalScheduling; private final boolean s3SelectPushdownEnabled; private final Map bucketStatementCounters = new ConcurrentHashMap<>(); @@ -87,7 +88,8 @@ public InternalHiveSplitFactory( DataSize minimumTargetSplitSize, boolean forceLocalScheduling, boolean s3SelectPushdownEnabled, - AcidTransaction transaction) + AcidTransaction transaction, + Optional maxSplitFileSize) { this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.partitionName = requireNonNull(partitionName, "partitionName is null"); @@ -102,6 +104,7 @@ public InternalHiveSplitFactory( this.forceLocalScheduling = forceLocalScheduling; this.s3SelectPushdownEnabled = s3SelectPushdownEnabled; this.minimumTargetSplitSizeInBytes = requireNonNull(minimumTargetSplitSize, "minimumTargetSplitSize is null").toBytes(); + this.maxSplitFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null"); checkArgument(minimumTargetSplitSizeInBytes > 0, "minimumTargetSplitSize must be > 0, found: %s", minimumTargetSplitSize); } @@ -166,6 +169,10 @@ private Optional createInternalHiveSplit( return Optional.empty(); } + if (maxSplitFileSize.isPresent() && estimatedFileSize > maxSplitFileSize.get()) { + return Optional.empty(); + } + boolean forceLocalScheduling = this.forceLocalScheduling; // For empty files, some filesystem (e.g. LocalFileSystem) produce one empty block diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index c038b8a668e0..fe1e76f9b919 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -553,6 +553,7 @@ public HivePartitionMetadata next() false, false, true, + Optional.empty(), Optional.empty()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -1085,7 +1086,8 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( false, false, true, - validWriteIds); + validWriteIds, + Optional.empty()); } private BackgroundHiveSplitLoader backgroundHiveSplitLoader(List files, DirectoryLister directoryLister) @@ -1117,6 +1119,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(List initialFiles = getTableFiles(tableName); + assertThat(initialFiles).hasSize(10); + + assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"); + assertNationNTimes(tableName, 10); + + Set compactedFiles = getTableFiles(tableName); + // we expect at most 3 files due to write parallelism + assertThat(compactedFiles).hasSizeLessThanOrEqualTo(3); + assertThat(intersection(initialFiles, compactedFiles)).isEmpty(); + + // compact with low threshold; nothing should change + assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')"); + + assertThat(getTableFiles(tableName)).hasSameElementsAs(compactedFiles); + } + + @Test + public void testOptimizeWithWriterScaling() + { + String tableName = "test_optimize_witer_scaling" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.nation WITH NO DATA", 0); + + insertNationNTimes(tableName, 4); + assertNationNTimes(tableName, 4); + + Set initialFiles = getTableFiles(tableName); + assertThat(initialFiles).hasSize(4); + + Session writerScalingSession = Session.builder(getSession()) + .setSystemProperty("scale_writers", "true") + .setSystemProperty("writer_min_size", "100GB") + .build(); + + assertUpdate(writerScalingSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"); + assertNationNTimes(tableName, 4); + + Set compactedFiles = getTableFiles(tableName); + assertThat(compactedFiles).hasSize(1); + assertThat(intersection(initialFiles, compactedFiles)).isEmpty(); + } + + @Test + public void testOptimizeWithPartitioning() + { + int insertCount = 4; + int partitionsCount = 5; + + String tableName = "test_optimize_with_partitioning_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(" + + " nationkey BIGINT, " + + " name VARCHAR, " + + " comment VARCHAR, " + + " regionkey BIGINT" + + ")" + + "WITH (partitioned_by = ARRAY['regionkey'])"); + + insertNationNTimes(tableName, insertCount); + assertNationNTimes(tableName, insertCount); + + Set initialFiles = getTableFiles(tableName); + assertThat(initialFiles).hasSize(insertCount * partitionsCount); + + Session writerScalingSession = Session.builder(getSession()) + .setSystemProperty("scale_writers", "true") + .setSystemProperty("writer_min_size", "100GB") + .build(); + + // optimize with unsupported WHERE + assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1")) + .hasMessageContaining("Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression"); + assertNationNTimes(tableName, insertCount); + assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); + + // optimize using predicate on on partition key but not matching any partitions + assertUpdate(writerScalingSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE regionkey > 5"); + assertNationNTimes(tableName, insertCount); + assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); + + // optimize two partitions; also use positional argument + assertUpdate(writerScalingSession, "ALTER TABLE " + tableName + " EXECUTE optimize('10kB') WHERE regionkey IN (1,2)"); + assertNationNTimes(tableName, insertCount); + assertThat(getTableFiles(tableName)).hasSize(2 + 3 * insertCount); + + // optimize one more partition; default file_size_threshold + assertUpdate(writerScalingSession, "ALTER TABLE " + tableName + " EXECUTE optimize WHERE regionkey > 3"); + assertNationNTimes(tableName, insertCount); + assertThat(getTableFiles(tableName)).hasSize(3 + 2 * insertCount); + + // optimize remaining partitions + assertUpdate(writerScalingSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"); + assertNationNTimes(tableName, insertCount); + + Set compactedFiles = getTableFiles(tableName); + assertThat(compactedFiles).hasSize(partitionsCount); + assertThat(intersection(initialFiles, compactedFiles)).isEmpty(); + } + + @Test + public void testOptimizeWithBucketing() + { + int insertCount = 4; + String tableName = "test_optimize_with_bucketing_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(" + + " nationkey BIGINT, " + + " name VARCHAR, " + + " comment VARCHAR, " + + " regionkey BIGINT" + + ")" + + "WITH (bucketed_by = ARRAY['regionkey'], bucket_count = 4)"); + + insertNationNTimes(tableName, insertCount); + assertNationNTimes(tableName, insertCount); + Set initialFiles = getTableFiles(tableName); + + assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")) + .hasMessageMatching("Optimizing bucketed Hive table .* is not supported"); + + assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); + assertNationNTimes(tableName, insertCount); + } + + @Test + public void testOptimizeHiveInformationSchema() + { + assertThatThrownBy(() -> computeActual("ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')")) + .hasMessage("This connector does not support table procedures"); + } + + @Test + public void testOptimizeHiveSystemTable() + { + String tableName = "test_optimize_system_table_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + "(a bigint, b bigint) WITH (partitioned_by = ARRAY['b'])"); + + assertQuery("SELECT count(*) FROM " + tableName, "SELECT 0"); + + assertThatThrownBy(() -> computeActual(format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName))) + .hasMessage("This connector does not support table procedures"); + } + + private void insertNationNTimes(String tableName, int times) + { + assertUpdate("INSERT INTO " + tableName + "(nationkey, name, regionkey, comment) " + join(" UNION ALL ", nCopies(times, "SELECT * FROM tpch.sf1.nation")), times * 25); + } + + private void assertNationNTimes(String tableName, int times) + { + String verifyQuery = join(" UNION ALL ", nCopies(times, "SELECT * FROM nation")); + assertQuery("SELECT nationkey, name, regionkey, comment FROM " + tableName, verifyQuery); + } + + private Set getTableFiles(String tableName) + { + return computeActual("SELECT DISTINCT \"$path\" FROM " + tableName).getOnlyColumn() + .map(String.class::cast) + .collect(toSet()); + } + @Test public void testTimestampPrecisionInsert() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java index 7b2b615abcfd..fddbe34a73d6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java @@ -57,7 +57,8 @@ public void testOutstandingSplitCount() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); // add 10 splits for (int i = 0; i < 10; i++) { @@ -91,7 +92,8 @@ public void testCorrectlyGeneratingInitialRowId() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); // add 10 splits for (int i = 0; i < 10; i++) { @@ -119,7 +121,8 @@ public void testEvenlySizedSplitRemainder() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newSingleThreadExecutor(), - new CounterStat()); + new CounterStat(), + false); // One byte larger than the initial split max size DataSize fileSize = DataSize.ofBytes(initialSplitSize.toBytes() + 1); @@ -146,7 +149,8 @@ public void testFail() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); // add some splits for (int i = 0; i < 5; i++) { @@ -196,7 +200,8 @@ public void testReaderWaitsForSplits() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); SettableFuture splits = SettableFuture.create(); @@ -250,7 +255,8 @@ public void testOutstandingSplitSize() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); int testSplitSizeInBytes = new TestSplit(0).getEstimatedSizeInBytes(); int maxSplitCount = toIntExact(maxOutstandingSplitsSize.toBytes()) / testSplitSizeInBytes; @@ -283,7 +289,8 @@ public void testEmptyBucket() Integer.MAX_VALUE, new TestingHiveSplitLoader(), Executors.newFixedThreadPool(5), - new CounterStat()); + new CounterStat(), + false); hiveSplitSource.addToQueue(new TestSplit(0, OptionalInt.of(2))); hiveSplitSource.noMoreSplits(); assertEquals(getSplits(hiveSplitSource, OptionalInt.of(0), 10).size(), 0);