Skip to content

Commit

Permalink
Extract TopNProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed May 1, 2019
1 parent 9459241 commit afd93f2
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 53 deletions.
66 changes: 13 additions & 53 deletions presto-main/src/main/java/io/prestosql/operator/TopNOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
package io.prestosql.operator;

import com.google.common.collect.ImmutableList;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.SortOrder;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.plan.PlanNodeId;

import java.util.Iterator;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.util.Collections.emptyIterator;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -89,35 +84,26 @@ public OperatorFactory duplicate()
}

private final OperatorContext operatorContext;
private final LocalMemoryContext localUserMemoryContext;

private GroupedTopNBuilder topNBuilder;
private final TopNProcessor topNProcessor;
private boolean finishing;

private Iterator<Page> outputIterator;

public TopNOperator(
OperatorContext operatorContext,
List<Type> types,
int n,
List<Integer> sortChannels,
List<SortOrder> sortOrders)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.localUserMemoryContext = operatorContext.localUserMemoryContext();
checkArgument(n >= 0, "n must be positive");
this.operatorContext = operatorContext;
this.topNProcessor = new TopNProcessor(
requireNonNull(operatorContext, "operatorContext is null").aggregateUserMemoryContext(),
types,
n,
sortChannels,
sortOrders);

if (n == 0) {
finishing = true;
outputIterator = emptyIterator();
}
else {
topNBuilder = new GroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, sortChannels, sortOrders),
n,
false,
new NoChannelGroupByHash());
}
}

Expand All @@ -136,55 +122,29 @@ public void finish()
@Override
public boolean isFinished()
{
return finishing && noMoreOutput();
return finishing && topNProcessor.noMoreOutput();
}

@Override
public boolean needsInput()
{
return !finishing && !noMoreOutput();
return !finishing && !topNProcessor.noMoreOutput();
}

@Override
public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
boolean done = topNBuilder.processPage(requireNonNull(page, "page is null")).process();
// there is no grouping so work will always be done
verify(done);
updateMemoryReservation();
topNProcessor.addInput(page);
}

@Override
public Page getOutput()
{
if (!finishing || noMoreOutput()) {
if (!finishing || topNProcessor.noMoreOutput()) {
return null;
}

if (outputIterator == null) {
// start flushing
outputIterator = topNBuilder.buildResult();
}

Page output = null;
if (outputIterator.hasNext()) {
output = outputIterator.next();
}
else {
outputIterator = emptyIterator();
}
updateMemoryReservation();
return output;
}

private void updateMemoryReservation()
{
localUserMemoryContext.setBytes(topNBuilder.getEstimatedSizeInBytes());
}

private boolean noMoreOutput()
{
return outputIterator != null && !outputIterator.hasNext();
return topNProcessor.getOutput();
}
}
104 changes: 104 additions & 0 deletions presto-main/src/main/java/io/prestosql/operator/TopNProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.operator;

import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.memory.context.LocalMemoryContext;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.SortOrder;
import io.prestosql.spi.type.Type;

import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static java.util.Collections.emptyIterator;
import static java.util.Objects.requireNonNull;

/**
* Returns the top N rows from the source sorted according to the specified ordering in the keyChannelIndex channel.
*/
public class TopNProcessor
{
private final LocalMemoryContext localUserMemoryContext;

@Nullable
private GroupedTopNBuilder topNBuilder;
private Iterator<Page> outputIterator;

public TopNProcessor(
AggregatedMemoryContext aggregatedMemoryContext,
List<Type> types,
int n,
List<Integer> sortChannels,
List<SortOrder> sortOrders)
{
requireNonNull(aggregatedMemoryContext, "aggregatedMemoryContext is null");
this.localUserMemoryContext = aggregatedMemoryContext.newLocalMemoryContext(TopNProcessor.class.getSimpleName());
checkArgument(n >= 0, "n must be positive");

if (n == 0) {
outputIterator = emptyIterator();
}
else {
topNBuilder = new GroupedTopNBuilder(
types,
new SimplePageWithPositionComparator(types, sortChannels, sortOrders),
n,
false,
new NoChannelGroupByHash());
}
}

public void addInput(Page page)
{
requireNonNull(topNBuilder, "topNBuilder is null");
boolean done = topNBuilder.processPage(requireNonNull(page, "page is null")).process();
// there is no grouping so work will always be done
verify(done);
updateMemoryReservation();
}

public Page getOutput()
{
if (outputIterator == null) {
// start flushing
outputIterator = topNBuilder.buildResult();
}

Page output = null;
if (outputIterator.hasNext()) {
output = outputIterator.next();
}
else {
outputIterator = emptyIterator();
}
updateMemoryReservation();
return output;
}

public boolean noMoreOutput()
{
return outputIterator != null && !outputIterator.hasNext();
}

private void updateMemoryReservation()
{
requireNonNull(topNBuilder, "topNBuilder is null");
localUserMemoryContext.setBytes(topNBuilder.getEstimatedSizeInBytes());
}
}

0 comments on commit afd93f2

Please sign in to comment.