diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java index 828a906dbc0d..a67022e27421 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PagesIndex.java @@ -15,6 +15,7 @@ import com.facebook.presto.Session; import com.facebook.presto.geospatial.Rectangle; +import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.metadata.FunctionManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.metadata.MetadataManager; @@ -464,11 +465,12 @@ public PagesSpatialIndexSupplier createPagesSpatialIndex( SpatialPredicate spatialRelationshipTest, Optional filterFunctionFactory, List outputChannels, - Map partitions) + Map partitions, + LocalMemoryContext localUserMemoryContext) { // TODO probably shouldn't copy to reduce memory and for memory accounting's sake List> channels = ImmutableList.copyOf(this.channels); - return new PagesSpatialIndexSupplier(session, valueAddresses, types, outputChannels, channels, geometryChannel, radiusChannel, partitionChannel, spatialRelationshipTest, filterFunctionFactory, partitions); + return new PagesSpatialIndexSupplier(session, valueAddresses, types, outputChannels, channels, geometryChannel, radiusChannel, partitionChannel, spatialRelationshipTest, filterFunctionFactory, partitions, localUserMemoryContext); } public LookupSourceSupplier createLookupSourceSupplier( diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PagesSpatialIndexSupplier.java b/presto-main/src/main/java/com/facebook/presto/operator/PagesSpatialIndexSupplier.java index 4da8b62c9a6e..5812eb2cb516 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PagesSpatialIndexSupplier.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PagesSpatialIndexSupplier.java @@ -21,6 +21,7 @@ import com.facebook.presto.Session; import com.facebook.presto.geospatial.Rectangle; import com.facebook.presto.geospatial.rtree.Flatbush; +import com.facebook.presto.memory.context.LocalMemoryContext; import com.facebook.presto.operator.PagesRTreeIndex.GeometryWithPosition; import com.facebook.presto.operator.SpatialIndexBuilderOperator.SpatialPredicate; import com.facebook.presto.spi.block.Block; @@ -46,11 +47,13 @@ import static com.google.common.base.Verify.verify; import static io.airlift.units.DataSize.Unit.BYTE; import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; public class PagesSpatialIndexSupplier implements Supplier { private static final int INSTANCE_SIZE = ClassLayout.parseClass(PagesSpatialIndexSupplier.class).instanceSize(); + private static final int MEMORY_USAGE_UPDATE_INCREMENT_BYTES = 100 * 1024 * 1024; // 100 MB private final Session session; private final LongArrayList addresses; @@ -75,8 +78,10 @@ public PagesSpatialIndexSupplier( Optional partitionChannel, SpatialPredicate spatialRelationshipTest, Optional filterFunctionFactory, - Map partitions) + Map partitions, + LocalMemoryContext localUserMemoryContext) { + requireNonNull(localUserMemoryContext, "localUserMemoryContext is null"); this.session = session; this.addresses = addresses; this.types = types; @@ -86,16 +91,20 @@ public PagesSpatialIndexSupplier( this.filterFunctionFactory = filterFunctionFactory; this.partitions = partitions; - this.rtree = buildRTree(addresses, channels, geometryChannel, radiusChannel, partitionChannel); + this.rtree = buildRTree(addresses, channels, geometryChannel, radiusChannel, partitionChannel, localUserMemoryContext); this.radiusChannel = radiusChannel; this.memorySizeInBytes = INSTANCE_SIZE + rtree.getEstimatedSizeInBytes(); } - private static Flatbush buildRTree(LongArrayList addresses, List> channels, int geometryChannel, Optional radiusChannel, Optional partitionChannel) + private static Flatbush buildRTree(LongArrayList addresses, List> channels, int geometryChannel, Optional radiusChannel, Optional partitionChannel, LocalMemoryContext localUserMemoryContext) { Operator relateOperator = OperatorFactoryLocal.getInstance().getOperator(Operator.Type.Relate); ObjectArrayList geometries = new ObjectArrayList<>(); + + long recordedSizeInBytes = localUserMemoryContext.getBytes(); + long addedSizeInBytes = 0; + for (int position = 0; position < addresses.size(); position++) { long pageAddress = addresses.getLong(position); int blockIndex = decodeSliceIndex(pageAddress); @@ -130,7 +139,16 @@ private static Flatbush buildRTree(LongArrayList addresses partition = toIntExact(INTEGER.getLong(partitionBlock, blockPosition)); } - geometries.add(new GeometryWithPosition(ogcGeometry, partition, position, radius)); + GeometryWithPosition geometryWithPosition = new GeometryWithPosition(ogcGeometry, partition, position, radius); + geometries.add(geometryWithPosition); + + addedSizeInBytes += geometryWithPosition.getEstimatedSizeInBytes(); + + if (addedSizeInBytes >= MEMORY_USAGE_UPDATE_INCREMENT_BYTES) { + localUserMemoryContext.setBytes(recordedSizeInBytes + addedSizeInBytes); + recordedSizeInBytes += addedSizeInBytes; + addedSizeInBytes = 0; + } } return new Flatbush<>(geometries.toArray(new GeometryWithPosition[] {})); diff --git a/presto-main/src/main/java/com/facebook/presto/operator/SpatialIndexBuilderOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/SpatialIndexBuilderOperator.java index fd0d43f0dd59..afa09927ffef 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/SpatialIndexBuilderOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/SpatialIndexBuilderOperator.java @@ -230,7 +230,8 @@ public void finish() } finishing = true; - PagesSpatialIndexSupplier spatialIndex = index.createPagesSpatialIndex(operatorContext.getSession(), indexChannel, radiusChannel, partitionChannel, spatialRelationshipTest, filterFunctionFactory, outputChannels, partitions); + localUserMemoryContext.setBytes(index.getEstimatedSize().toBytes()); + PagesSpatialIndexSupplier spatialIndex = index.createPagesSpatialIndex(operatorContext.getSession(), indexChannel, radiusChannel, partitionChannel, spatialRelationshipTest, filterFunctionFactory, outputChannels, partitions, localUserMemoryContext); localUserMemoryContext.setBytes(index.getEstimatedSize().toBytes() + spatialIndex.getEstimatedSize().toBytes()); indexNotNeeded = pagesSpatialIndexFactory.lendPagesSpatialIndex(spatialIndex); }