diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java index bddac77eac6bc..03832dc615248 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java @@ -14,6 +14,7 @@ package com.facebook.presto.operator; import com.facebook.presto.Session; +import com.facebook.presto.array.ReferenceCountMap; import com.facebook.presto.metadata.FunctionRegistry; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; @@ -213,13 +214,19 @@ public void addPage(Page page) positionCount += page.getPositionCount(); int pageIndex = (channels.length > 0) ? channels[0].size() : 0; + ReferenceCountMap referenceCountMap = new ReferenceCountMap(); for (int i = 0; i < channels.length; i++) { Block block = page.getBlock(i); if (eagerCompact) { block = block.copyRegion(0, block.getPositionCount()); } channels[i].add(block); - pagesMemorySize += block.getRetainedSizeInBytes(); + + block.retainedBytesForEachPart((object, size) -> { + if (referenceCountMap.incrementAndGet(object) == 1) { + pagesMemorySize += size; + } + }); } for (int position = 0; position < page.getPositionCount(); position++) { @@ -239,6 +246,8 @@ public void compact() if (eagerCompact) { return; } + ReferenceCountMap decrementReferenceCountMap = new ReferenceCountMap(); + ReferenceCountMap incrementReferenceCountMap = new ReferenceCountMap(); for (int channel = 0; channel < types.size(); channel++) { ObjectArrayList blocks = channels[channel]; for (int i = nextBlockToCompact; i < blocks.size(); i++) { @@ -247,8 +256,18 @@ public void compact() // Copy the block to compact its size Block compactedBlock = block.copyRegion(0, block.getPositionCount()); blocks.set(i, compactedBlock); - pagesMemorySize -= block.getRetainedSizeInBytes(); - pagesMemorySize += compactedBlock.getRetainedSizeInBytes(); + + block.retainedBytesForEachPart((object, size) -> { + if (decrementReferenceCountMap.incrementAndGet(object) == 1) { + pagesMemorySize -= size; + } + }); + + compactedBlock.retainedBytesForEachPart((object, size) -> { + if (incrementReferenceCountMap.incrementAndGet(object) == 1) { + pagesMemorySize += size; + } + }); } } nextBlockToCompact = channels[0].size(); @@ -562,10 +581,12 @@ protected Page computeNext() // TODO: This is similar to what OrderByOperator does, look into reusing this logic in OrderByOperator as well. public Iterator getSortedPages() { - return new AbstractIterator() { + return new AbstractIterator() + { private int currentPosition; private PageBuilder pageBuilder = new PageBuilder(types); private int[] outputChannels = new int[types.size()]; + { Arrays.setAll(outputChannels, IntUnaryOperator.identity()); }