Skip to content

Commit

Permalink
Add getCompletedPositions to ConnectorPageSource
Browse files Browse the repository at this point in the history
This allows connectors to report input rows processed
by ConnectorPageSource rather than relying solely on
the positions count of the page returned from getNextPage
  • Loading branch information
raunaqmorarka authored and sopel39 committed Jul 19, 2021
1 parent 7651769 commit fb10b54
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class ScanFilterAndProjectOperator
private long processedPositions;
private long processedBytes;
private long physicalBytes;
private long physicalPositions;
private long readTimeNanos;
private long dynamicFilterSplitsProcessed;
private Metrics metrics = Metrics.EMPTY;
Expand Down Expand Up @@ -135,7 +136,7 @@ public DataSize getPhysicalInputDataSize()
@Override
public long getPhysicalInputPositions()
{
return processedPositions;
return physicalPositions;
}

@Override
Expand Down Expand Up @@ -339,6 +340,7 @@ public ProcessState<Page> process()
// TODO: derive better values for cursors
processedBytes = cursor.getCompletedBytes();
physicalBytes = cursor.getCompletedBytes();
physicalPositions = processedPositions;
readTimeNanos = cursor.getReadTimeNanos();
if (output.isNoMoreRows()) {
finished = true;
Expand Down Expand Up @@ -402,6 +404,7 @@ public ProcessState<Page> process()
// update operator stats
processedPositions += page.getPositionCount();
physicalBytes = pageSource.getCompletedBytes();
physicalPositions = pageSource.getCompletedPositions().orElse(processedPositions);
readTimeNanos = pageSource.getReadTimeNanos();
metrics = pageSource.getMetrics();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void noMoreOperators()
private boolean finished;

private long completedBytes;
private long completedPositions;
private long readTimeNanos;

public TableScanOperator(
Expand Down Expand Up @@ -315,9 +316,15 @@ public Page getOutput()
// update operator stats
long endCompletedBytes = source.getCompletedBytes();
long endReadTimeNanos = source.getReadTimeNanos();
operatorContext.recordPhysicalInputWithTiming(endCompletedBytes - completedBytes, page.getPositionCount(), endReadTimeNanos - readTimeNanos);
operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount());
long positionCount = page.getPositionCount();
long endCompletedPositions = source.getCompletedPositions().orElse(completedPositions + positionCount);
operatorContext.recordPhysicalInputWithTiming(
endCompletedBytes - completedBytes,
endCompletedPositions - completedPositions,
endReadTimeNanos - readTimeNanos);
operatorContext.recordProcessedInput(page.getSizeInBytes(), positionCount);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ DataSize getPhysicalInputDataSize()

long getPhysicalInputPositions()
{
return processedPositions;
if (source == null) {
return 0;
}
return source.getCompletedPositions().orElse(processedPositions);
}

DataSize getInputDataSize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

public interface ConnectorPageSource
Expand All @@ -31,6 +32,16 @@ public interface ConnectorPageSource
*/
long getCompletedBytes();

/**
* Gets the number of input rows processed by this page source so far.
* By default, the positions count of the page returned from getNextPage
* is used to calculate the number of input rows.
*/
default OptionalLong getCompletedPositions()
{
return OptionalLong.empty();
}

/**
* Gets the wall time this page source spent reading data from the input.
* If read time is not available, this method should return zero.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -257,6 +258,12 @@ public long getCompletedBytes()
return delegate.getCompletedBytes();
}

@Override
public OptionalLong getCompletedPositions()
{
return delegate.getCompletedPositions();
}

@Override
public long getReadTimeNanos()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class OrcPageSource
// Row ID relative to all the original files of the same bucket ID before this file in lexicographic order
private Optional<Long> originalFileRowId = Optional.empty();

private long completedPositions;

public OrcPageSource(
OrcRecordReader recordReader,
List<ColumnAdaptation> columnAdaptations,
Expand All @@ -100,6 +102,12 @@ public long getCompletedBytes()
return orcDataSource.getReadBytes();
}

@Override
public OptionalLong getCompletedPositions()
{
return OptionalLong.of(completedPositions);
}

@Override
public long getReadTimeNanos()
{
Expand Down Expand Up @@ -134,6 +142,8 @@ public Page getNextPage()
return null;
}

completedPositions += page.getPositionCount();

OptionalLong startRowId = originalFileRowId.isPresent() ?
OptionalLong.of(originalFileRowId.get() + recordReader.getFilePosition()) : OptionalLong.empty();

Expand Down

0 comments on commit fb10b54

Please sign in to comment.