diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java index 86fd777a9897..a9446d3aba04 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java @@ -169,11 +169,16 @@ private long adjustEagerlyReportedBytesWithBufferedBytesOnRelease(long bufferedB // adjust the amount to eagerly report as output by the amount already eagerly reported if the new value // is larger, since this indicates that no data was flushed and only the delta between the two values should // be reported eagerly - if (outputSizeReportedBeforeRelease > 0 && bufferedBytesOnRelease >= outputSizeReportedBeforeRelease) { - bufferedBytesOnRelease -= outputSizeReportedBeforeRelease; - outputSizeReportedBeforeRelease += bufferedBytesOnRelease; + if (bufferedBytesOnRelease > outputSizeReportedBeforeRelease) { + long additionalBufferedBytes = bufferedBytesOnRelease - outputSizeReportedBeforeRelease; + outputSizeReportedBeforeRelease = bufferedBytesOnRelease; + return additionalBufferedBytes; + } + else { + // buffered size is unchanged or reduced (as a result of flushing) since last release, so + // do not report any additional bytes as output eagerly + return 0; } - return bufferedBytesOnRelease; } /** diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java index c16c6c200c7b..898f5802e0f6 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java @@ -523,6 +523,33 @@ public void testOutputWithMixedRowWiseAndColumnarPartitioning() testOutputEqualsInput(IPADDRESS, PartitioningMode.ROW_WISE, PartitioningMode.COLUMNAR); } + @Test + public void testOutputBytesWhenReused() + { + TestOutputBuffer outputBuffer = new TestOutputBuffer(); + PagePartitioner pagePartitioner = pagePartitioner(outputBuffer, BIGINT).build(); + OperatorContext operatorContext = operatorContext(); + + Page page = new Page(createLongsBlock(1, 1, 1, 1, 1, 1)); + + pagePartitioner.partitionPage(page, operatorContext); + assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(0); + pagePartitioner.prepareForRelease(operatorContext); + assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes()); + // release again with no additional input, size should not change + pagePartitioner.prepareForRelease(operatorContext); + assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes()); + + pagePartitioner.partitionPage(page, operatorContext); + pagePartitioner.prepareForRelease(operatorContext); + assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes() * 2); + + pagePartitioner.close(); + List output = outputBuffer.getEnqueued(); + // only a single page was flushed after the partitioner is closed, all output bytes were reported eagerly on release + assertThat(output.size()).isEqualTo(1); + } + @Test public void testMemoryReleased() {