From 329ddfacd46029da075cc8ce9ec85cfd3128276a Mon Sep 17 00:00:00 2001 From: James Petty Date: Thu, 20 Apr 2023 11:40:53 -0400 Subject: [PATCH] Use Page#getColumns in TableWriterOperator Avoids an additional unnecessary copy of the Block[] array for each input page in TableWriter by using the Page helper method instead of constructing the block array externally (which incurs an defensive copy when passed into the Page constructor). --- .../io/trino/operator/TableWriterOperator.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java index fbefc6f65f57..bf15e46339de 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableWriterOperator.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; @@ -154,7 +155,7 @@ private enum State private final OperatorContext operatorContext; private final LocalMemoryContext pageSinkMemoryContext; private final ConnectorPageSink pageSink; - private final List columnChannels; + private final int[] columnChannels; private final AtomicLong pageSinkPeakMemoryUsage = new AtomicLong(); private final Operator statisticAggregationOperator; private final List types; @@ -183,7 +184,7 @@ public TableWriterOperator( this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.pageSinkMemoryContext = operatorContext.newLocalUserMemoryContext(TableWriterOperator.class.getSimpleName()); this.pageSink = requireNonNull(pageSink, "pageSink is null"); - this.columnChannels = requireNonNull(columnChannels, "columnChannels is null"); + this.columnChannels = Ints.toArray(requireNonNull(columnChannels, "columnChannels is null")); this.statisticAggregationOperator = requireNonNull(statisticAggregationOperator, "statisticAggregationOperator is null"); this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled; @@ -244,18 +245,14 @@ public void addInput(Page page) requireNonNull(page, "page is null"); checkState(needsInput(), "Operator does not need input"); - Block[] blocks = new Block[columnChannels.size()]; - for (int outputChannel = 0; outputChannel < columnChannels.size(); outputChannel++) { - Block block = page.getBlock(columnChannels.get(outputChannel)); - blocks[outputChannel] = block; - } - OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled); statisticAggregationOperator.addInput(page); timer.end(statisticsTiming); + page = page.getColumns(columnChannels); + ListenableFuture blockedOnAggregation = statisticAggregationOperator.isBlocked(); - CompletableFuture future = pageSink.appendPage(new Page(blocks)); + CompletableFuture future = pageSink.appendPage(page); updateMemoryUsage(); ListenableFuture blockedOnWrite = toListenableFuture(future); blocked = asVoid(allAsList(blockedOnAggregation, blockedOnWrite));