Skip to content

Commit

Permalink
Fix page partitioner output bytes on release handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pettyjamesm committed Dec 20, 2023
1 parent cf77edb commit f7369f6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,16 @@ private long adjustEagerlyReportedBytesWithBufferedBytesOnRelease(long bufferedB
// adjust the amount to eagerly report as output by the amount already eagerly reported if the new value
// is larger, since this indicates that no data was flushed and only the delta between the two values should
// be reported eagerly
if (outputSizeReportedBeforeRelease > 0 && bufferedBytesOnRelease >= outputSizeReportedBeforeRelease) {
bufferedBytesOnRelease -= outputSizeReportedBeforeRelease;
outputSizeReportedBeforeRelease += bufferedBytesOnRelease;
if (bufferedBytesOnRelease > outputSizeReportedBeforeRelease) {
long additionalBufferedBytes = bufferedBytesOnRelease - outputSizeReportedBeforeRelease;
outputSizeReportedBeforeRelease = bufferedBytesOnRelease;
return additionalBufferedBytes;
}
else {
// buffered size is unchanged or reduced (as a result of flushing) since last release, so
// do not report any additional bytes as output eagerly
return 0;
}
return bufferedBytesOnRelease;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,33 @@ public void testOutputWithMixedRowWiseAndColumnarPartitioning()
testOutputEqualsInput(IPADDRESS, PartitioningMode.ROW_WISE, PartitioningMode.COLUMNAR);
}

@Test
public void testOutputBytesWhenReused()
{
TestOutputBuffer outputBuffer = new TestOutputBuffer();
PagePartitioner pagePartitioner = pagePartitioner(outputBuffer, BIGINT).build();
OperatorContext operatorContext = operatorContext();

Page page = new Page(createLongsBlock(1, 1, 1, 1, 1, 1));

pagePartitioner.partitionPage(page, operatorContext);
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(0);
pagePartitioner.prepareForRelease(operatorContext);
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());
// release again with no additional input, size should not change
pagePartitioner.prepareForRelease(operatorContext);
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());

pagePartitioner.partitionPage(page, operatorContext);
pagePartitioner.prepareForRelease(operatorContext);
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes() * 2);

pagePartitioner.close();
List<Slice> output = outputBuffer.getEnqueued();
// only a single page was flushed after the partitioner is closed, all output bytes were reported eagerly on release
assertThat(output.size()).isEqualTo(1);
}

@Test
public void testMemoryReleased()
{
Expand Down

0 comments on commit f7369f6

Please sign in to comment.