Skip to content

Commit

Permalink
Port AssignUniqueIdOperator to work processor
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Feb 20, 2020
1 parent 73b542c commit 26ad3fb
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,153 +14,150 @@
package io.prestosql.operator;

import io.prestosql.execution.TaskId;
import io.prestosql.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.prestosql.operator.WorkProcessor.TransformationState;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.sql.planner.plan.PlanNodeId;

import javax.annotation.Nullable;

import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.prestosql.operator.BasicWorkProcessorOperatorAdapter.createAdapterOperatorFactory;
import static io.prestosql.operator.WorkProcessor.TransformationState.finished;
import static io.prestosql.operator.WorkProcessor.TransformationState.ofResult;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;

public class AssignUniqueIdOperator
implements Operator
implements WorkProcessorOperator
{
private static final long ROW_IDS_PER_REQUEST = 1L << 20L;
private static final long MAX_ROW_ID = 1L << 40L;

public static class AssignUniqueIdOperatorFactory
implements OperatorFactory
public static OperatorFactory createOperatorFactory(int operatorId, PlanNodeId planNodeId)
{
return createAdapterOperatorFactory(new Factory(operatorId, planNodeId));
}

private static class Factory
implements BasicAdapterWorkProcessorOperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private boolean closed;
private final AtomicLong valuePool = new AtomicLong();

public AssignUniqueIdOperatorFactory(
int operatorId,
PlanNodeId planNodeId)
private Factory(int operatorId, PlanNodeId planNodeId)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
}

@Override
public Operator createOperator(DriverContext driverContext)
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
{
checkState(!closed, "Factory is already closed");

OperatorContext operatorContext = driverContext.addOperatorContext(
operatorId,
planNodeId,
AssignUniqueIdOperator.class.getSimpleName());
return new AssignUniqueIdOperator(operatorContext, valuePool);
return new AssignUniqueIdOperator(processorContext, sourcePages, valuePool);
}

@Override
public void noMoreOperators()
public int getOperatorId()
{
closed = true;
return operatorId;
}

@Override
public OperatorFactory duplicate()
public PlanNodeId getPlanNodeId()
{
return new AssignUniqueIdOperatorFactory(operatorId, planNodeId);
return planNodeId;
}
}

private final OperatorContext operatorContext;
private boolean finishing;
private final AtomicLong rowIdPool;
private final long uniqueValueMask;

private Page inputPage;
private long rowIdCounter;
private long maxRowIdCounterValue;

public AssignUniqueIdOperator(OperatorContext operatorContext, AtomicLong rowIdPool)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.rowIdPool = requireNonNull(rowIdPool, "rowIdPool is null");
@Override
public String getOperatorType()
{
return AssignUniqueIdOperator.class.getSimpleName();
}

TaskId fullTaskId = operatorContext.getDriverContext().getTaskId();
uniqueValueMask = (((long) fullTaskId.getStageId().getId()) << 54) | (((long) fullTaskId.getId()) << 40);
@Override
public void close()
{
closed = true;
}

requestValues();
@Override
public Factory duplicate()
{
return new Factory(operatorId, planNodeId);
}
}

private void requestValues()
{
rowIdCounter = rowIdPool.getAndAdd(ROW_IDS_PER_REQUEST);
maxRowIdCounterValue = Math.min(rowIdCounter + ROW_IDS_PER_REQUEST, MAX_ROW_ID);
checkState(rowIdCounter < MAX_ROW_ID, "Unique row id exceeds a limit: %s", MAX_ROW_ID);
}
private final WorkProcessor<Page> pages;

@Override
public OperatorContext getOperatorContext()
private AssignUniqueIdOperator(ProcessorContext context, WorkProcessor<Page> sourcePages, AtomicLong rowIdPool)
{
return operatorContext;
pages = sourcePages
.transform(new AssignUniqueId(
context.getTaskId(),
rowIdPool));
}

@Override
public void finish()
public WorkProcessor<Page> getOutputPages()
{
finishing = true;
return pages;
}

@Override
public boolean isFinished()
private static class AssignUniqueId
implements WorkProcessor.Transformation<Page, Page>
{
return finishing && inputPage == null;
}
private final AtomicLong rowIdPool;
private final long uniqueValueMask;

@Override
public boolean needsInput()
{
return !finishing && inputPage == null;
}
private long rowIdCounter;
private long maxRowIdCounterValue;

@Override
public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
requireNonNull(page, "page is null");
checkState(inputPage == null);
inputPage = page;
}
private AssignUniqueId(TaskId taskId, AtomicLong rowIdPool)
{
this.rowIdPool = requireNonNull(rowIdPool, "rowIdPool is null");

@Override
public Page getOutput()
{
if (inputPage == null) {
return null;
uniqueValueMask = (((long) taskId.getStageId().getId()) << 54) | (((long) taskId.getId()) << 40);
requestValues();
}

Page outputPage = processPage();
inputPage = null;
return outputPage;
}
@Override
public TransformationState<Page> process(@Nullable Page inputPage)
{
if (inputPage == null) {
return finished();
}

private Page processPage()
{
return inputPage.appendColumn(generateIdColumn());
}
return ofResult(inputPage.appendColumn(generateIdColumn(inputPage.getPositionCount())));
}

private Block generateIdColumn()
{
BlockBuilder block = BIGINT.createFixedSizeBlockBuilder(inputPage.getPositionCount());
for (int currentPosition = 0; currentPosition < inputPage.getPositionCount(); currentPosition++) {
if (rowIdCounter >= maxRowIdCounterValue) {
requestValues();
private Block generateIdColumn(int positionCount)
{
BlockBuilder block = BIGINT.createFixedSizeBlockBuilder(positionCount);
for (int currentPosition = 0; currentPosition < positionCount; currentPosition++) {
if (rowIdCounter >= maxRowIdCounterValue) {
requestValues();
}
long rowId = rowIdCounter++;
verify((rowId & uniqueValueMask) == 0, "RowId and uniqueValue mask overlaps");
BIGINT.writeLong(block, uniqueValueMask | rowId);
}
long rowId = rowIdCounter++;
verify((rowId & uniqueValueMask) == 0, "RowId and uniqueValue mask overlaps");
BIGINT.writeLong(block, uniqueValueMask | rowId);
return block.build();
}

private void requestValues()
{
rowIdCounter = rowIdPool.getAndAdd(ROW_IDS_PER_REQUEST);
maxRowIdCounterValue = Math.min(rowIdCounter + ROW_IDS_PER_REQUEST, MAX_ROW_ID);
checkState(rowIdCounter < MAX_ROW_ID, "Unique row id exceeds a limit: %s", MAX_ROW_ID);
}
return block.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2419,7 +2419,7 @@ public PhysicalOperation visitAssignUniqueId(AssignUniqueId node, LocalExecution
{
PhysicalOperation source = node.getSource().accept(this, context);

OperatorFactory operatorFactory = new AssignUniqueIdOperator.AssignUniqueIdOperatorFactory(
OperatorFactory operatorFactory = AssignUniqueIdOperator.createOperatorFactory(
context.getNextOperatorId(),
node.getId());
return new PhysicalOperation(operatorFactory, makeLayout(node), context, source);
Expand Down

0 comments on commit 26ad3fb

Please sign in to comment.