From 73b542c7c5dad945183096a97ecaba1fd04a430b Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 6 Jan 2020 00:08:38 +0100 Subject: [PATCH] Add BasicWorkProcessorOperatorAdapter BasicWorkProcessorOperatorAdapter removes input page buffering responsibility from work processor operators. --- .../BasicWorkProcessorOperatorAdapter.java | 155 ++++++++++++++++++ .../operator/FilterAndProjectOperator.java | 52 ++---- .../operator/HashSemiJoinOperator.java | 41 +---- 3 files changed, 179 insertions(+), 69 deletions(-) create mode 100644 presto-main/src/main/java/io/prestosql/operator/BasicWorkProcessorOperatorAdapter.java diff --git a/presto-main/src/main/java/io/prestosql/operator/BasicWorkProcessorOperatorAdapter.java b/presto-main/src/main/java/io/prestosql/operator/BasicWorkProcessorOperatorAdapter.java new file mode 100644 index 000000000000..3eb71803fe9a --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/operator/BasicWorkProcessorOperatorAdapter.java @@ -0,0 +1,155 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.operator; + +import io.prestosql.execution.Lifespan; +import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator; +import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory; +import io.prestosql.spi.Page; +import io.prestosql.sql.planner.plan.PlanNodeId; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * This {@link WorkProcessorOperator} adapter allows to adapt {@link WorkProcessor} operators + * that do not require special input handling (e.g streaming operators). + */ +public class BasicWorkProcessorOperatorAdapter + implements AdapterWorkProcessorOperator +{ + public interface BasicAdapterWorkProcessorOperatorFactory + extends WorkProcessorOperatorFactory + { + default WorkProcessorOperator createAdapterOperator(ProcessorContext processorContext, WorkProcessor sourcePages) + { + return create(processorContext, sourcePages); + } + + BasicAdapterWorkProcessorOperatorFactory duplicate(); + } + + public static OperatorFactory createAdapterOperatorFactory(BasicAdapterWorkProcessorOperatorFactory operatorFactory) + { + return WorkProcessorOperatorAdapter.createAdapterOperatorFactory(new Factory(operatorFactory)); + } + + private static class Factory + implements AdapterWorkProcessorOperatorFactory + { + private final BasicAdapterWorkProcessorOperatorFactory operatorFactory; + + Factory(BasicAdapterWorkProcessorOperatorFactory operatorFactory) + { + this.operatorFactory = requireNonNull(operatorFactory, "operatorFactory is null"); + } + + @Override + public AdapterWorkProcessorOperatorFactory duplicate() + { + return new Factory(operatorFactory.duplicate()); + } + + @Override + public int getOperatorId() + { + return operatorFactory.getOperatorId(); + } + + @Override + public PlanNodeId getPlanNodeId() + { + return operatorFactory.getPlanNodeId(); + } + + @Override + public String getOperatorType() + { + return operatorFactory.getOperatorType(); + } + + @Override + public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor sourcePages) + { + return operatorFactory.create(processorContext, sourcePages); + } + + @Override + public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext) + { + return new BasicWorkProcessorOperatorAdapter(processorContext, operatorFactory); + } + + @Override + public void lifespanFinished(Lifespan lifespan) + { + operatorFactory.lifespanFinished(lifespan); + } + + @Override + public void close() + { + operatorFactory.close(); + } + } + + private final PageBuffer pageBuffer; + private final WorkProcessorOperator operator; + + private BasicWorkProcessorOperatorAdapter( + ProcessorContext processorContext, + BasicAdapterWorkProcessorOperatorFactory operatorFactory) + { + this.pageBuffer = new PageBuffer(); + this.operator = requireNonNull(operatorFactory, "operator is null").createAdapterOperator(processorContext, pageBuffer.pages()); + } + + @Override + public void finish() + { + pageBuffer.finish(); + } + + @Override + public boolean needsInput() + { + return pageBuffer.isEmpty() && !pageBuffer.isFinished(); + } + + @Override + public void addInput(Page page) + { + pageBuffer.add(page); + } + + @Override + public WorkProcessor getOutputPages() + { + return operator.getOutputPages(); + } + + @Override + public Optional getOperatorInfo() + { + return operator.getOperatorInfo(); + } + + @Override + public void close() + throws Exception + { + operator.close(); + } +} diff --git a/presto-main/src/main/java/io/prestosql/operator/FilterAndProjectOperator.java b/presto-main/src/main/java/io/prestosql/operator/FilterAndProjectOperator.java index 608e84d2bf49..9b20cbe12327 100644 --- a/presto-main/src/main/java/io/prestosql/operator/FilterAndProjectOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/FilterAndProjectOperator.java @@ -19,71 +19,51 @@ import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.memory.context.LocalMemoryContext; import io.prestosql.memory.context.MemoryTrackingContext; -import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator; -import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory; +import io.prestosql.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory; import io.prestosql.operator.project.PageProcessor; import io.prestosql.spi.Page; import io.prestosql.spi.type.Type; import io.prestosql.sql.planner.plan.PlanNodeId; import java.util.List; -import java.util.Optional; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkState; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.prestosql.operator.WorkProcessorOperatorAdapter.createAdapterOperatorFactory; +import static io.prestosql.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory; import static io.prestosql.operator.project.MergePages.mergePages; import static java.util.Objects.requireNonNull; public class FilterAndProjectOperator - implements AdapterWorkProcessorOperator + implements WorkProcessorOperator { - private final PageBuffer pageBuffer = new PageBuffer(); private final WorkProcessor pages; private FilterAndProjectOperator( Session session, MemoryTrackingContext memoryTrackingContext, DriverYieldSignal yieldSignal, - Optional> sourcePages, + WorkProcessor sourcePages, PageProcessor pageProcessor, List types, DataSize minOutputPageSize, - int minOutputPageRowCount) + int minOutputPageRowCount, + boolean avoidPageMaterialization) { AggregatedMemoryContext localAggregatedMemoryContext = newSimpleAggregatedMemoryContext(); LocalMemoryContext outputMemoryContext = localAggregatedMemoryContext.newLocalMemoryContext(FilterAndProjectOperator.class.getSimpleName()); - this.pages = sourcePages.orElse(pageBuffer.pages()) + this.pages = sourcePages .flatMap(page -> pageProcessor.createWorkProcessor( session.toConnectorSession(), yieldSignal, outputMemoryContext, page, - sourcePages.isPresent())) + avoidPageMaterialization)) .transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext)) .withProcessStateMonitor(state -> memoryTrackingContext.localSystemMemoryContext().setBytes(localAggregatedMemoryContext.getBytes())); } - @Override - public final void finish() - { - pageBuffer.finish(); - } - - @Override - public boolean needsInput() - { - return pageBuffer.isEmpty() && !pageBuffer.isFinished(); - } - - @Override - public final void addInput(Page page) - { - pageBuffer.add(page); - } - @Override public WorkProcessor getOutputPages() { @@ -108,7 +88,7 @@ public static OperatorFactory createOperatorFactory( } private static class Factory - implements AdapterWorkProcessorOperatorFactory + implements BasicAdapterWorkProcessorOperatorFactory { private final int operatorId; private final PlanNodeId planNodeId; @@ -142,26 +122,28 @@ public WorkProcessorOperator create(ProcessorContext processorContext, WorkProce processorContext.getSession(), processorContext.getMemoryTrackingContext(), processorContext.getDriverYieldSignal(), - Optional.of(sourcePages), + sourcePages, processor.get(), types, minOutputPageSize, - minOutputPageRowCount); + minOutputPageRowCount, + true); } @Override - public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext) + public WorkProcessorOperator createAdapterOperator(ProcessorContext processorContext, WorkProcessor sourcePages) { checkState(!closed, "Factory is already closed"); return new FilterAndProjectOperator( processorContext.getSession(), processorContext.getMemoryTrackingContext(), processorContext.getDriverYieldSignal(), - Optional.empty(), + sourcePages, processor.get(), types, minOutputPageSize, - minOutputPageRowCount); + minOutputPageRowCount, + false); } @Override @@ -189,7 +171,7 @@ public void close() } @Override - public AdapterWorkProcessorOperatorFactory duplicate() + public BasicAdapterWorkProcessorOperatorFactory duplicate() { return new Factory(operatorId, planNodeId, processor, types, minOutputPageSize, minOutputPageRowCount); } diff --git a/presto-main/src/main/java/io/prestosql/operator/HashSemiJoinOperator.java b/presto-main/src/main/java/io/prestosql/operator/HashSemiJoinOperator.java index 2138ef294a73..fae6ed65df8a 100644 --- a/presto-main/src/main/java/io/prestosql/operator/HashSemiJoinOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/HashSemiJoinOperator.java @@ -18,10 +18,9 @@ import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.memory.context.LocalMemoryContext; import io.prestosql.memory.context.MemoryTrackingContext; +import io.prestosql.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory; import io.prestosql.operator.SetBuilderOperator.SetSupplier; import io.prestosql.operator.WorkProcessor.TransformationState; -import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator; -import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory; import io.prestosql.spi.Page; import io.prestosql.spi.block.Block; import io.prestosql.spi.block.BlockBuilder; @@ -37,16 +36,16 @@ import static com.google.common.base.Preconditions.checkState; import static io.airlift.concurrent.MoreFutures.checkSuccess; import static io.airlift.concurrent.MoreFutures.getFutureValue; +import static io.prestosql.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory; import static io.prestosql.operator.WorkProcessor.TransformationState.blocked; import static io.prestosql.operator.WorkProcessor.TransformationState.finished; import static io.prestosql.operator.WorkProcessor.TransformationState.ofResult; -import static io.prestosql.operator.WorkProcessorOperatorAdapter.createAdapterOperatorFactory; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.BooleanType.BOOLEAN; import static java.util.Objects.requireNonNull; public class HashSemiJoinOperator - implements AdapterWorkProcessorOperator + implements WorkProcessorOperator { public static OperatorFactory createOperatorFactory( int operatorId, @@ -60,7 +59,7 @@ public static OperatorFactory createOperatorFactory( } private static class Factory - implements AdapterWorkProcessorOperatorFactory + implements BasicAdapterWorkProcessorOperatorFactory { private final int operatorId; private final PlanNodeId planNodeId; @@ -85,14 +84,7 @@ private Factory(int operatorId, PlanNodeId planNodeId, SetSupplier setSupplier, public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor sourcePages) { checkState(!closed, "Factory is already closed"); - return new HashSemiJoinOperator(Optional.of(sourcePages), setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext()); - } - - @Override - public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext) - { - checkState(!closed, "Factory is already closed"); - return new HashSemiJoinOperator(Optional.empty(), setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext()); + return new HashSemiJoinOperator(sourcePages, setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext()); } @Override @@ -127,16 +119,15 @@ public Factory duplicate() } private final WorkProcessor pages; - private final PageBuffer pageBuffer = new PageBuffer(); private HashSemiJoinOperator( - Optional> sourcePages, + WorkProcessor sourcePages, SetSupplier channelSetFuture, int probeJoinChannel, Optional probeHashChannel, MemoryTrackingContext memoryTrackingContext) { - pages = sourcePages.orElse(pageBuffer.pages()) + pages = sourcePages .transform(new SemiJoinPages( channelSetFuture, probeJoinChannel, @@ -144,24 +135,6 @@ private HashSemiJoinOperator( requireNonNull(memoryTrackingContext, "memoryTrackingContext is null").aggregateUserMemoryContext())); } - @Override - public void finish() - { - pageBuffer.finish(); - } - - @Override - public boolean needsInput() - { - return pageBuffer.isEmpty() && !pageBuffer.isFinished(); - } - - @Override - public void addInput(Page page) - { - pageBuffer.add(page); - } - @Override public WorkProcessor getOutputPages() {