Skip to content

Commit

Permalink
Refactor TableFinishOperator and TableWriterOperator
Browse files Browse the repository at this point in the history
- Rename methods
- Extract common fields and some utility methods into an utility class
  • Loading branch information
arhimondr committed Sep 3, 2019
1 parent 643ee08 commit f0009dc
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
import static com.facebook.presto.operator.TableWriterOperator.CONTEXT_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.FRAGMENT_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.ROW_COUNT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.FRAGMENT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.ROW_COUNT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.extractStatisticsRows;
import static com.facebook.presto.operator.TableWriterUtils.getTableCommitContext;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void addInput(Page page)
requireNonNull(page, "page is null");
checkState(state == State.RUNNING, "Operator is %s", state);

TableCommitContext tableCommitContext = getTableCommitContext(page);
TableCommitContext tableCommitContext = getTableCommitContext(page, tableCommitContextCodec);
lifespanAndStageStateTracker.update(page, tableCommitContext);
lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> {
OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
Expand All @@ -211,79 +212,6 @@ public void addInput(Page page)
});
}

private TableCommitContext getTableCommitContext(Page page)
{
checkState(page.getPositionCount() > 0, "TableFinishOperator receives empty page");
Block operatorExecutionContextBlock = page.getBlock(CONTEXT_CHANNEL);
return tableCommitContextCodec.fromJson(operatorExecutionContextBlock.getSlice(0, 0, operatorExecutionContextBlock.getSliceLength(0)).getBytes());
}

private static Optional<Page> extractStatisticsRows(Page page)
{
int statisticsPositionCount = 0;
for (int position = 0; position < page.getPositionCount(); position++) {
if (isStatisticsPosition(page, position)) {
statisticsPositionCount++;
}
}

if (statisticsPositionCount == 0) {
return Optional.empty();
}

if (statisticsPositionCount == page.getPositionCount()) {
return Optional.of(page);
}

int selectedPositionsIndex = 0;
int[] selectedPositions = new int[statisticsPositionCount];
for (int position = 0; position < page.getPositionCount(); position++) {
if (isStatisticsPosition(page, position)) {
selectedPositions[selectedPositionsIndex] = position;
selectedPositionsIndex++;
}
}

Block[] blocks = new Block[page.getChannelCount()];
for (int channel = 0; channel < page.getChannelCount(); channel++) {
blocks[channel] = page.getBlock(channel).getPositions(selectedPositions, 0, statisticsPositionCount);
}
return Optional.of(new Page(statisticsPositionCount, blocks));
}

/**
* Both the statistics and the row_count + fragments are transferred over the same communication
* link between the TableWriterOperator and the TableFinishOperator. Thus the multiplexing is needed.
* <p>
* The transferred page layout looks like:
* <p>
* [[row_count_channel], [fragment_channel], [statistic_channel_1] ... [statistic_channel_N]]
* <p>
* [row_count_channel] - contains number of rows processed by a TableWriterOperator instance
* [fragment_channel] - contains arbitrary binary data provided by the ConnectorPageSink#finish for
* the further post processing on the coordinator
* <p>
* [statistic_channel_1] ... [statistic_channel_N] - contain pre-aggregated statistics computed by the
* statistics aggregation operator within the
* TableWriterOperator
* <p>
* Since the final aggregation operator in the TableFinishOperator doesn't know what to do with the
* first two channels, those must be pruned. For the convenience we never set both, the
* [row_count_channel] + [fragment_channel] and the [statistic_channel_1] ... [statistic_channel_N].
* <p>
* If this is a row that holds statistics - the [row_count_channel] + [fragment_channel] will be NULL.
* <p>
* It this is a row that holds the row count or the fragment - all the statistics channels will be set
* to NULL.
* <p>
* Since neither [row_count_channel] or [fragment_channel] cannot hold the NULL value naturally, by
* checking isNull on these two channels we can determine if this is a row that contains statistics.
*/
private static boolean isStatisticsPosition(Page page, int position)
{
return page.getBlock(ROW_COUNT_CHANNEL).isNull(position) && page.getBlock(FRAGMENT_CHANNEL).isNull(position);
}

@Override
public Page getOutput()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
import static com.facebook.presto.operator.TableWriterUtils.STATS_START_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.createStatisticsPage;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.sql.planner.plan.TableWriterNode.CreateHandle;
Expand All @@ -62,11 +64,6 @@
public class TableWriterOperator
implements Operator
{
public static final int ROW_COUNT_CHANNEL = 0;
public static final int FRAGMENT_CHANNEL = 1;
public static final int CONTEXT_CHANNEL = 2;
public static final int STATS_START_CHANNEL = 3;

public static class TableWriterOperatorFactory
implements OperatorFactory
{
Expand Down Expand Up @@ -284,7 +281,7 @@ public Page getOutput()
if (aggregationOutput == null) {
return null;
}
return createStatisticsPage(aggregationOutput);
return createStatisticsPage(types, aggregationOutput, createTableCommitContext(false));
}

if (state != State.FINISHING) {
Expand All @@ -307,35 +304,6 @@ public Page getOutput()
return new Page(positionCount, outputBlocks);
}

// Statistics page layout:
//
// row fragments context stats1 stats2 ...
// null null X X X
// null null X X X
// null null X X X
// null null X X X
// ...
private Page createStatisticsPage(Page aggregationOutput)
{
int positionCount = aggregationOutput.getPositionCount();
Block[] outputBlocks = new Block[types.size()];
for (int channel = 0; channel < types.size(); channel++) {
if (channel < STATS_START_CHANNEL) {
// Include table commit context into statistics page to allow TableFinishOperator publish correct statistics for recoverable grouped execution.
if (channel == CONTEXT_CHANNEL) {
outputBlocks[channel] = RunLengthEncodedBlock.create(types.get(channel), getTableCommitContext(false), positionCount);
}
else {
outputBlocks[channel] = RunLengthEncodedBlock.create(types.get(channel), null, positionCount);
}
}
else {
outputBlocks[channel] = aggregationOutput.getBlock(channel - STATS_START_CHANNEL);
}
}
return new Page(positionCount, outputBlocks);
}

// Fragments page layout:
//
// row fragments context
Expand Down Expand Up @@ -366,10 +334,10 @@ private Page createFragmentsPage()
VARBINARY.writeSlice(fragmentBuilder, fragment);
}

return new Page(positionCount, rowsBuilder.build(), fragmentBuilder.build(), RunLengthEncodedBlock.create(VARBINARY, getTableCommitContext(true), positionCount));
return new Page(positionCount, rowsBuilder.build(), fragmentBuilder.build(), RunLengthEncodedBlock.create(VARBINARY, createTableCommitContext(true), positionCount));
}

private Slice getTableCommitContext(boolean lastPage)
private Slice createTableCommitContext(boolean lastPage)
{
TaskId taskId = operatorContext.getDriverContext().getPipelineContext().getTaskId();
return wrappedBuffer(tableCommitContextCodec.toJsonBytes(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.RunLengthEncodedBlock;
import com.facebook.presto.spi.type.Type;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public final class TableWriterUtils
{
public static final int ROW_COUNT_CHANNEL = 0;
public static final int FRAGMENT_CHANNEL = 1;
public static final int CONTEXT_CHANNEL = 2;
public static final int STATS_START_CHANNEL = 3;

private TableWriterUtils() {}

public static Optional<Page> extractStatisticsRows(Page page)
{
int statisticsPositionCount = 0;
for (int position = 0; position < page.getPositionCount(); position++) {
if (isStatisticsPosition(page, position)) {
statisticsPositionCount++;
}
}

if (statisticsPositionCount == 0) {
return Optional.empty();
}

if (statisticsPositionCount == page.getPositionCount()) {
return Optional.of(page);
}

int selectedPositionsIndex = 0;
int[] selectedPositions = new int[statisticsPositionCount];
for (int position = 0; position < page.getPositionCount(); position++) {
if (isStatisticsPosition(page, position)) {
selectedPositions[selectedPositionsIndex] = position;
selectedPositionsIndex++;
}
}

Block[] blocks = new Block[page.getChannelCount()];
for (int channel = 0; channel < page.getChannelCount(); channel++) {
blocks[channel] = page.getBlock(channel).getPositions(selectedPositions, 0, statisticsPositionCount);
}
return Optional.of(new Page(statisticsPositionCount, blocks));
}

/**
* Both the statistics and the row_count + fragments are transferred over the same communication
* link between the TableWriterOperator and the TableFinishOperator. Thus the multiplexing is needed.
* <p>
* The transferred page layout looks like:
* <p>
* [[row_count_channel], [fragment_channel], [statistic_channel_1] ... [statistic_channel_N]]
* <p>
* [row_count_channel] - contains number of rows processed by a TableWriterOperator instance
* [fragment_channel] - contains arbitrary binary data provided by the ConnectorPageSink#finish for
* the further post processing on the coordinator
* <p>
* [statistic_channel_1] ... [statistic_channel_N] - contain pre-aggregated statistics computed by the
* statistics aggregation operator within the
* TableWriterOperator
* <p>
* Since the final aggregation operator in the TableFinishOperator doesn't know what to do with the
* first two channels, those must be pruned. For the convenience we never set both, the
* [row_count_channel] + [fragment_channel] and the [statistic_channel_1] ... [statistic_channel_N].
* <p>
* If this is a row that holds statistics - the [row_count_channel] + [fragment_channel] will be NULL.
* <p>
* It this is a row that holds the row count or the fragment - all the statistics channels will be set
* to NULL.
* <p>
* Since neither [row_count_channel] or [fragment_channel] cannot hold the NULL value naturally, by
* checking isNull on these two channels we can determine if this is a row that contains statistics.
*/
private static boolean isStatisticsPosition(Page page, int position)
{
return page.getBlock(ROW_COUNT_CHANNEL).isNull(position) && page.getBlock(FRAGMENT_CHANNEL).isNull(position);
}

// Statistics page layout:
//
// row fragments context stats1 stats2 ...
// null null X X X
// null null X X X
// null null X X X
// null null X X X
// ...
public static Page createStatisticsPage(List<Type> types, Page aggregationOutput, Slice tableCommitContext)
{
int positionCount = aggregationOutput.getPositionCount();
Block[] outputBlocks = new Block[types.size()];
for (int channel = 0; channel < types.size(); channel++) {
if (channel < STATS_START_CHANNEL) {
// Include table commit context into statistics page to allow TableFinishOperator publish correct statistics for recoverable grouped execution.
if (channel == CONTEXT_CHANNEL) {
outputBlocks[channel] = RunLengthEncodedBlock.create(types.get(channel), tableCommitContext, positionCount);
}
else {
outputBlocks[channel] = RunLengthEncodedBlock.create(types.get(channel), null, positionCount);
}
}
else {
outputBlocks[channel] = aggregationOutput.getBlock(channel - STATS_START_CHANNEL);
}
}
return new Page(positionCount, outputBlocks);
}

public static TableCommitContext getTableCommitContext(Page page, JsonCodec<TableCommitContext> tableCommitContextCodec)
{
checkState(page.getPositionCount() > 0, "page is empty");
Block operatorExecutionContextBlock = page.getBlock(CONTEXT_CHANNEL);
TableCommitContext context = tableCommitContextCodec.fromJson(operatorExecutionContextBlock.getSlice(0, 0, operatorExecutionContextBlock.getSliceLength(0)).getBytes());
return requireNonNull(context, "context is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@
import static com.facebook.presto.operator.PipelineExecutionStrategy.UNGROUPED_EXECUTION;
import static com.facebook.presto.operator.TableFinishOperator.TableFinishOperatorFactory;
import static com.facebook.presto.operator.TableFinishOperator.TableFinisher;
import static com.facebook.presto.operator.TableWriterOperator.CONTEXT_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.FRAGMENT_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.ROW_COUNT_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.STATS_START_CHANNEL;
import static com.facebook.presto.operator.TableWriterOperator.TableWriterOperatorFactory;
import static com.facebook.presto.operator.TableWriterUtils.CONTEXT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.FRAGMENT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.ROW_COUNT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.STATS_START_CHANNEL;
import static com.facebook.presto.operator.UnnestOperator.UnnestOperatorFactory;
import static com.facebook.presto.operator.WindowFunctionDefinition.window;
import static com.facebook.presto.spi.StandardErrorCode.COMPILER_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static com.facebook.presto.block.BlockAssertions.assertBlockEquals;
import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager;
import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
import static com.facebook.presto.operator.TableWriterOperator.STATS_START_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.STATS_START_CHANNEL;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.MAX_VALUE;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
Expand Down

0 comments on commit f0009dc

Please sign in to comment.