Skip to content

Commit

Permalink
Add BasicWorkProcessorOperatorAdapter
Browse files Browse the repository at this point in the history
BasicWorkProcessorOperatorAdapter removes input
page buffering responsibility from work processor
operators.
  • Loading branch information
sopel39 committed Feb 20, 2020
1 parent 73e1a0e commit 73b542c
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.operator;

import io.prestosql.execution.Lifespan;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.prestosql.spi.Page;
import io.prestosql.sql.planner.plan.PlanNodeId;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

/**
* This {@link WorkProcessorOperator} adapter allows to adapt {@link WorkProcessor} operators
* that do not require special input handling (e.g streaming operators).
*/
public class BasicWorkProcessorOperatorAdapter
implements AdapterWorkProcessorOperator
{
public interface BasicAdapterWorkProcessorOperatorFactory
extends WorkProcessorOperatorFactory
{
default WorkProcessorOperator createAdapterOperator(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
{
return create(processorContext, sourcePages);
}

BasicAdapterWorkProcessorOperatorFactory duplicate();
}

public static OperatorFactory createAdapterOperatorFactory(BasicAdapterWorkProcessorOperatorFactory operatorFactory)
{
return WorkProcessorOperatorAdapter.createAdapterOperatorFactory(new Factory(operatorFactory));
}

private static class Factory
implements AdapterWorkProcessorOperatorFactory
{
private final BasicAdapterWorkProcessorOperatorFactory operatorFactory;

Factory(BasicAdapterWorkProcessorOperatorFactory operatorFactory)
{
this.operatorFactory = requireNonNull(operatorFactory, "operatorFactory is null");
}

@Override
public AdapterWorkProcessorOperatorFactory duplicate()
{
return new Factory(operatorFactory.duplicate());
}

@Override
public int getOperatorId()
{
return operatorFactory.getOperatorId();
}

@Override
public PlanNodeId getPlanNodeId()
{
return operatorFactory.getPlanNodeId();
}

@Override
public String getOperatorType()
{
return operatorFactory.getOperatorType();
}

@Override
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
{
return operatorFactory.create(processorContext, sourcePages);
}

@Override
public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext)
{
return new BasicWorkProcessorOperatorAdapter(processorContext, operatorFactory);
}

@Override
public void lifespanFinished(Lifespan lifespan)
{
operatorFactory.lifespanFinished(lifespan);
}

@Override
public void close()
{
operatorFactory.close();
}
}

private final PageBuffer pageBuffer;
private final WorkProcessorOperator operator;

private BasicWorkProcessorOperatorAdapter(
ProcessorContext processorContext,
BasicAdapterWorkProcessorOperatorFactory operatorFactory)
{
this.pageBuffer = new PageBuffer();
this.operator = requireNonNull(operatorFactory, "operator is null").createAdapterOperator(processorContext, pageBuffer.pages());
}

@Override
public void finish()
{
pageBuffer.finish();
}

@Override
public boolean needsInput()
{
return pageBuffer.isEmpty() && !pageBuffer.isFinished();
}

@Override
public void addInput(Page page)
{
pageBuffer.add(page);
}

@Override
public WorkProcessor<Page> getOutputPages()
{
return operator.getOutputPages();
}

@Override
public Optional<OperatorInfo> getOperatorInfo()
{
return operator.getOperatorInfo();
}

@Override
public void close()
throws Exception
{
operator.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,51 @@
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.memory.context.MemoryTrackingContext;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.prestosql.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.prestosql.operator.project.PageProcessor;
import io.prestosql.spi.Page;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.prestosql.operator.WorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.prestosql.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.prestosql.operator.project.MergePages.mergePages;
import static java.util.Objects.requireNonNull;

public class FilterAndProjectOperator
implements AdapterWorkProcessorOperator
implements WorkProcessorOperator
{
private final PageBuffer pageBuffer = new PageBuffer();
private final WorkProcessor<Page> pages;

private FilterAndProjectOperator(
Session session,
MemoryTrackingContext memoryTrackingContext,
DriverYieldSignal yieldSignal,
Optional<WorkProcessor<Page>> sourcePages,
WorkProcessor<Page> sourcePages,
PageProcessor pageProcessor,
List<Type> types,
DataSize minOutputPageSize,
int minOutputPageRowCount)
int minOutputPageRowCount,
boolean avoidPageMaterialization)
{
AggregatedMemoryContext localAggregatedMemoryContext = newSimpleAggregatedMemoryContext();
LocalMemoryContext outputMemoryContext = localAggregatedMemoryContext.newLocalMemoryContext(FilterAndProjectOperator.class.getSimpleName());

this.pages = sourcePages.orElse(pageBuffer.pages())
this.pages = sourcePages
.flatMap(page -> pageProcessor.createWorkProcessor(
session.toConnectorSession(),
yieldSignal,
outputMemoryContext,
page,
sourcePages.isPresent()))
avoidPageMaterialization))
.transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext))
.withProcessStateMonitor(state -> memoryTrackingContext.localSystemMemoryContext().setBytes(localAggregatedMemoryContext.getBytes()));
}

@Override
public final void finish()
{
pageBuffer.finish();
}

@Override
public boolean needsInput()
{
return pageBuffer.isEmpty() && !pageBuffer.isFinished();
}

@Override
public final void addInput(Page page)
{
pageBuffer.add(page);
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand All @@ -108,7 +88,7 @@ public static OperatorFactory createOperatorFactory(
}

private static class Factory
implements AdapterWorkProcessorOperatorFactory
implements BasicAdapterWorkProcessorOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
Expand Down Expand Up @@ -142,26 +122,28 @@ public WorkProcessorOperator create(ProcessorContext processorContext, WorkProce
processorContext.getSession(),
processorContext.getMemoryTrackingContext(),
processorContext.getDriverYieldSignal(),
Optional.of(sourcePages),
sourcePages,
processor.get(),
types,
minOutputPageSize,
minOutputPageRowCount);
minOutputPageRowCount,
true);
}

@Override
public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext)
public WorkProcessorOperator createAdapterOperator(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
{
checkState(!closed, "Factory is already closed");
return new FilterAndProjectOperator(
processorContext.getSession(),
processorContext.getMemoryTrackingContext(),
processorContext.getDriverYieldSignal(),
Optional.empty(),
sourcePages,
processor.get(),
types,
minOutputPageSize,
minOutputPageRowCount);
minOutputPageRowCount,
false);
}

@Override
Expand Down Expand Up @@ -189,7 +171,7 @@ public void close()
}

@Override
public AdapterWorkProcessorOperatorFactory duplicate()
public BasicAdapterWorkProcessorOperatorFactory duplicate()
{
return new Factory(operatorId, planNodeId, processor, types, minOutputPageSize, minOutputPageRowCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.memory.context.MemoryTrackingContext;
import io.prestosql.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.prestosql.operator.SetBuilderOperator.SetSupplier;
import io.prestosql.operator.WorkProcessor.TransformationState;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator;
import io.prestosql.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
Expand All @@ -37,16 +36,16 @@
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.concurrent.MoreFutures.checkSuccess;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.prestosql.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.prestosql.operator.WorkProcessor.TransformationState.blocked;
import static io.prestosql.operator.WorkProcessor.TransformationState.finished;
import static io.prestosql.operator.WorkProcessor.TransformationState.ofResult;
import static io.prestosql.operator.WorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static java.util.Objects.requireNonNull;

public class HashSemiJoinOperator
implements AdapterWorkProcessorOperator
implements WorkProcessorOperator
{
public static OperatorFactory createOperatorFactory(
int operatorId,
Expand All @@ -60,7 +59,7 @@ public static OperatorFactory createOperatorFactory(
}

private static class Factory
implements AdapterWorkProcessorOperatorFactory
implements BasicAdapterWorkProcessorOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
Expand All @@ -85,14 +84,7 @@ private Factory(int operatorId, PlanNodeId planNodeId, SetSupplier setSupplier,
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
{
checkState(!closed, "Factory is already closed");
return new HashSemiJoinOperator(Optional.of(sourcePages), setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext());
}

@Override
public AdapterWorkProcessorOperator createAdapterOperator(ProcessorContext processorContext)
{
checkState(!closed, "Factory is already closed");
return new HashSemiJoinOperator(Optional.empty(), setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext());
return new HashSemiJoinOperator(sourcePages, setSupplier, probeJoinChannel, probeJoinHashChannel, processorContext.getMemoryTrackingContext());
}

@Override
Expand Down Expand Up @@ -127,41 +119,22 @@ public Factory duplicate()
}

private final WorkProcessor<Page> pages;
private final PageBuffer pageBuffer = new PageBuffer();

private HashSemiJoinOperator(
Optional<WorkProcessor<Page>> sourcePages,
WorkProcessor<Page> sourcePages,
SetSupplier channelSetFuture,
int probeJoinChannel,
Optional<Integer> probeHashChannel,
MemoryTrackingContext memoryTrackingContext)
{
pages = sourcePages.orElse(pageBuffer.pages())
pages = sourcePages
.transform(new SemiJoinPages(
channelSetFuture,
probeJoinChannel,
probeHashChannel,
requireNonNull(memoryTrackingContext, "memoryTrackingContext is null").aggregateUserMemoryContext()));
}

@Override
public void finish()
{
pageBuffer.finish();
}

@Override
public boolean needsInput()
{
return pageBuffer.isEmpty() && !pageBuffer.isFinished();
}

@Override
public void addInput(Page page)
{
pageBuffer.add(page);
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down

0 comments on commit 73b542c

Please sign in to comment.