Skip to content

Commit

Permalink
Add Hive OPTIMIZE table procedure
Browse files Browse the repository at this point in the history
Add support for compacting small files for non-transactional,
non-bucketed Hive tables.

ALTER TABLE xxxxx EXECUTE OPTIMIZE WITH(file_size_threshold = ...)
  • Loading branch information
losipiuk committed Oct 25, 2021
1 parent 2b9cdfe commit 57e48d2
Show file tree
Hide file tree
Showing 20 changed files with 791 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public class BackgroundHiveSplitLoader
private final ConcurrentLazyQueue<HivePartitionMetadata> partitions;
private final Deque<Iterator<InternalHiveSplit>> fileIterators = new ConcurrentLinkedDeque<>();
private final Optional<ValidWriteIdList> validWriteIds;
private final Optional<Long> maxSplitFileSize;

// Purpose of this lock:
// * Write lock: when you need a consistent view across partitions, fileIterators, and hiveSplitSource.
Expand Down Expand Up @@ -204,7 +205,8 @@ public BackgroundHiveSplitLoader(
boolean recursiveDirWalkerEnabled,
boolean ignoreAbsentPartitions,
boolean optimizeSymlinkListing,
Optional<ValidWriteIdList> validWriteIds)
Optional<ValidWriteIdList> validWriteIds,
Optional<Long> maxSplitFileSize)
{
this.table = table;
this.transaction = requireNonNull(transaction, "transaction is null");
Expand All @@ -226,6 +228,7 @@ public BackgroundHiveSplitLoader(
this.partitions = new ConcurrentLazyQueue<>(partitions);
this.hdfsContext = new HdfsContext(session);
this.validWriteIds = requireNonNull(validWriteIds, "validWriteIds is null");
this.maxSplitFileSize = requireNonNull(maxSplitFileSize, "maxSplitFileSize is null");
}

@Override
Expand Down Expand Up @@ -465,7 +468,8 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
getMaxInitialSplitSize(session),
isForceLocalScheduling(session),
s3SelectPushdownEnabled,
transaction);
transaction,
maxSplitFileSize);

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
Expand Down Expand Up @@ -653,7 +657,8 @@ private ListenableFuture<Void> createHiveSymlinkSplits(
getMaxInitialSplitSize(session),
isForceLocalScheduling(session),
s3SelectPushdownEnabled,
transaction);
transaction,
maxSplitFileSize);
lastResult = addSplitsToSource(targetSplits, splitFactory);
if (stopped) {
return COMPLETED_FUTURE;
Expand Down Expand Up @@ -709,7 +714,8 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
getMaxInitialSplitSize(session),
isForceLocalScheduling(session),
s3SelectPushdownEnabled,
transaction);
transaction,
maxSplitFileSize);
return Optional.of(locatedFileStatuses.stream()
.map(locatedFileStatus -> splitFactory.createInternalHiveSplit(locatedFileStatus, OptionalInt.empty(), splittable, Optional.empty()))
.filter(Optional::isPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
Expand All @@ -55,6 +56,7 @@ public class HiveConnector
private final ConnectorNodePartitioningProvider nodePartitioningProvider;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Set<EventListener> eventListeners;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> schemaProperties;
Expand All @@ -77,6 +79,7 @@ public HiveConnector(
ConnectorNodePartitioningProvider nodePartitioningProvider,
Set<SystemTable> systemTables,
Set<Procedure> procedures,
Set<TableProcedureMetadata> tableProcedures,
Set<EventListener> eventListeners,
Set<SessionPropertiesProvider> sessionPropertiesProviders,
List<PropertyMetadata<?>> schemaProperties,
Expand All @@ -95,6 +98,7 @@ public HiveConnector(
this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableProcedures = ImmutableSet.copyOf(requireNonNull(tableProcedures, "tableProcedures is null"));
this.eventListeners = ImmutableSet.copyOf(requireNonNull(eventListeners, "eventListeners is null"));
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
Expand Down Expand Up @@ -241,4 +245,10 @@ public final void shutdown()
{
lifeCycleManager.stop();
}

@Override
public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;

Expand Down Expand Up @@ -55,6 +56,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return HiveInsertTableHandle.class;
}

@Override
public Class<? extends ConnectorTableExecuteHandle> getTableExecuteHandleClass()
{
return HiveTableExecuteHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
}
}

@Override
public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table)
{
// For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute
Path targetPath = new Path(table.getStorage().getLocation());
return new LocationHandle(targetPath, targetPath, true, DIRECT_TO_TARGET_EXISTING_DIRECTORY);
}

private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path, Optional<Path> externalLocation)
{
return isTemporaryStagingDirectoryEnabled(session)
Expand Down
Loading

0 comments on commit 57e48d2

Please sign in to comment.