Skip to content

Commit

Permalink
Use Flatbush instead of JTS STR RTree for spatial joins
Browse files Browse the repository at this point in the history
There are memory problems with spatial joins, potentially due to the
large number of objects created by the JTS RTree.  The JTS tree used for
the build side of spatial joins creates many objects and may not report
its memory precisely.  A Flatbush creates very few objects, with a low
memory footprint, as well as faster build/query times.
  • Loading branch information
jagill committed Aug 20, 2019
1 parent c1d6b4d commit c68c0f1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 58 deletions.
5 changes: 0 additions & 5 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
</properties>

<dependencies>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</dependency>

<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import com.esri.core.geometry.ogc.OGCGeometry;
import com.esri.core.geometry.ogc.OGCPoint;
import com.facebook.presto.Session;
import com.facebook.presto.geospatial.GeometryUtils;
import com.facebook.presto.geospatial.Rectangle;
import com.facebook.presto.geospatial.rtree.Flatbush;
import com.facebook.presto.geospatial.rtree.HasExtent;
import com.facebook.presto.operator.SpatialIndexBuilderOperator.SpatialPredicate;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
Expand All @@ -26,15 +29,14 @@
import io.airlift.slice.Slice;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.index.strtree.STRtree;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;

import static com.facebook.presto.geospatial.GeometryUtils.getExtent;
import static com.facebook.presto.geospatial.serde.GeometrySerde.deserialize;
import static com.facebook.presto.operator.JoinUtils.channelsToPages;
import static com.facebook.presto.operator.SyntheticAddress.decodePosition;
Expand All @@ -54,25 +56,33 @@ public class PagesRTreeIndex
private final List<Type> types;
private final List<Integer> outputChannels;
private final List<List<Block>> channels;
private final STRtree rtree;
private final Flatbush<GeometryWithPosition> rtree;
private final int radiusChannel;
private final SpatialPredicate spatialRelationshipTest;
private final JoinFilterFunction filterFunction;
private final Map<Integer, Rectangle> partitions;

public static final class GeometryWithPosition
implements HasExtent
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(GeometryWithPosition.class).instanceSize();

private final OGCGeometry ogcGeometry;
private final int partition;
private final int position;
private final Rectangle extent;

public GeometryWithPosition(OGCGeometry ogcGeometry, int partition, int position)
{
this(ogcGeometry, partition, position, 0.0f);
}

public GeometryWithPosition(OGCGeometry ogcGeometry, int partition, int position, double radius)
{
this.ogcGeometry = requireNonNull(ogcGeometry, "ogcGeometry is null");
this.partition = partition;
this.position = position;
this.extent = GeometryUtils.getExtent(ogcGeometry, radius);
}

public OGCGeometry getGeometry()
Expand All @@ -90,9 +100,16 @@ public int getPosition()
return position;
}

public long getEstimatedMemorySizeInBytes()
@Override
public Rectangle getExtent()
{
return extent;
}

@Override
public long getEstimatedSizeInBytes()
{
return INSTANCE_SIZE + ogcGeometry.estimateMemorySize();
return INSTANCE_SIZE + ogcGeometry.estimateMemorySize() + extent.getEstimatedSizeInBytes();
}
}

Expand All @@ -102,7 +119,7 @@ public PagesRTreeIndex(
List<Type> types,
List<Integer> outputChannels,
List<List<Block>> channels,
STRtree rtree,
Flatbush<GeometryWithPosition> rtree,
Optional<Integer> radiusChannel,
SpatialPredicate spatialRelationshipTest,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Expand All @@ -119,14 +136,6 @@ public PagesRTreeIndex(
this.partitions = requireNonNull(partitions, "partitions is null");
}

private static Envelope getEnvelope(OGCGeometry ogcGeometry)
{
com.esri.core.geometry.Envelope env = new com.esri.core.geometry.Envelope();
ogcGeometry.getEsriGeometry().queryEnvelope(env);

return new Envelope(env.getXMin(), env.getXMax(), env.getYMin(), env.getYMax());
}

/**
* Returns an array of addresses from {@link PagesIndex#valueAddresses} corresponding
* to rows with matching geometries.
Expand Down Expand Up @@ -155,11 +164,10 @@ public int[] findJoinPositions(int probePosition, Page probe, int probeGeometryC

IntArrayList matchingPositions = new IntArrayList();

Envelope envelope = getEnvelope(probeGeometry);
rtree.query(envelope, item -> {
GeometryWithPosition geometryWithPosition = (GeometryWithPosition) item;
Rectangle queryRectangle = getExtent(probeGeometry);
rtree.findIntersections(queryRectangle, geometryWithPosition -> {
OGCGeometry buildGeometry = geometryWithPosition.getGeometry();
if (partitions.isEmpty() || (probePartition == geometryWithPosition.getPartition() && (probeIsPoint || (buildGeometry instanceof OGCPoint) || testReferencePoint(envelope, buildGeometry, probePartition)))) {
if (partitions.isEmpty() || (probePartition == geometryWithPosition.getPartition() && (probeIsPoint || (buildGeometry instanceof OGCPoint) || testReferencePoint(queryRectangle, buildGeometry, probePartition)))) {
if (radiusChannel == -1) {
if (spatialRelationshipTest.apply(buildGeometry, probeGeometry, OptionalDouble.empty())) {
matchingPositions.add(geometryWithPosition.getPosition());
Expand All @@ -176,18 +184,18 @@ public int[] findJoinPositions(int probePosition, Page probe, int probeGeometryC
return matchingPositions.toIntArray(null);
}

private boolean testReferencePoint(Envelope probeEnvelope, OGCGeometry buildGeometry, int partition)
private boolean testReferencePoint(Rectangle probeEnvelope, OGCGeometry buildGeometry, int partition)
{
Envelope buildEnvelope = getEnvelope(buildGeometry);
Envelope intersection = buildEnvelope.intersection(probeEnvelope);
if (intersection.isNull()) {
Rectangle buildEnvelope = getExtent(buildGeometry);
Rectangle intersection = buildEnvelope.intersection(probeEnvelope);
if (intersection == null) {
return false;
}

Rectangle extent = partitions.get(partition);

double x = intersection.getMinX();
double y = intersection.getMinY();
double x = intersection.getXMin();
double y = intersection.getYMin();
return x >= extent.getXMin() && x < extent.getXMax() && y >= extent.getYMin() && y < extent.getYMax();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.esri.core.geometry.ogc.OGCGeometry;
import com.facebook.presto.Session;
import com.facebook.presto.geospatial.Rectangle;
import com.facebook.presto.geospatial.rtree.Flatbush;
import com.facebook.presto.operator.PagesRTreeIndex.GeometryWithPosition;
import com.facebook.presto.operator.SpatialIndexBuilderOperator.SpatialPredicate;
import com.facebook.presto.spi.block.Block;
Expand All @@ -28,18 +29,14 @@
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.index.strtree.AbstractNode;
import org.locationtech.jts.index.strtree.ItemBoundable;
import org.locationtech.jts.index.strtree.STRtree;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static com.facebook.presto.geospatial.GeometryUtils.getJtsEnvelope;
import static com.facebook.presto.geospatial.serde.GeometrySerde.deserialize;
import static com.facebook.presto.operator.PagesSpatialIndex.EMPTY_INDEX;
import static com.facebook.presto.operator.SyntheticAddress.decodePosition;
Expand All @@ -54,9 +51,6 @@ public class PagesSpatialIndexSupplier
implements Supplier<PagesSpatialIndex>
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(PagesSpatialIndexSupplier.class).instanceSize();
private static final int ENVELOPE_INSTANCE_SIZE = ClassLayout.parseClass(Envelope.class).instanceSize();
private static final int STRTREE_INSTANCE_SIZE = ClassLayout.parseClass(STRtree.class).instanceSize();
private static final int ABSTRACT_NODE_INSTANCE_SIZE = ClassLayout.parseClass(AbstractNode.class).instanceSize();

private final Session session;
private final LongArrayList addresses;
Expand All @@ -66,7 +60,7 @@ public class PagesSpatialIndexSupplier
private final Optional<Integer> radiusChannel;
private final SpatialPredicate spatialRelationshipTest;
private final Optional<JoinFilterFunctionCompiler.JoinFilterFunctionFactory> filterFunctionFactory;
private final STRtree rtree;
private final Flatbush<GeometryWithPosition> rtree;
private final Map<Integer, Rectangle> partitions;
private final long memorySizeInBytes;

Expand Down Expand Up @@ -94,15 +88,14 @@ public PagesSpatialIndexSupplier(

this.rtree = buildRTree(addresses, channels, geometryChannel, radiusChannel, partitionChannel);
this.radiusChannel = radiusChannel;
this.memorySizeInBytes = INSTANCE_SIZE +
(rtree.isEmpty() ? 0 : STRTREE_INSTANCE_SIZE + computeMemorySizeInBytes(rtree.getRoot()));
this.memorySizeInBytes = INSTANCE_SIZE + rtree.getEstimatedSizeInBytes();
}

private static STRtree buildRTree(LongArrayList addresses, List<List<Block>> channels, int geometryChannel, Optional<Integer> radiusChannel, Optional<Integer> partitionChannel)
private static Flatbush<GeometryWithPosition> buildRTree(LongArrayList addresses, List<List<Block>> channels, int geometryChannel, Optional<Integer> radiusChannel, Optional<Integer> partitionChannel)
{
STRtree rtree = new STRtree();
Operator relateOperator = OperatorFactoryLocal.getInstance().getOperator(Operator.Type.Relate);

ObjectArrayList<GeometryWithPosition> geometries = new ObjectArrayList<>();
for (int position = 0; position < addresses.size(); position++) {
long pageAddress = addresses.getLong(position);
int blockIndex = decodeSliceIndex(pageAddress);
Expand Down Expand Up @@ -137,11 +130,10 @@ private static STRtree buildRTree(LongArrayList addresses, List<List<Block>> cha
partition = toIntExact(INTEGER.getLong(partitionBlock, blockPosition));
}

rtree.insert(getJtsEnvelope(ogcGeometry, radius), new GeometryWithPosition(ogcGeometry, partition, position));
geometries.add(new GeometryWithPosition(ogcGeometry, partition, position, radius));
}

rtree.build();
return rtree;
return new Flatbush<>(geometries.toArray(new GeometryWithPosition[] {}));
}

private static void accelerateGeometry(OGCGeometry ogcGeometry, Operator relateOperator)
Expand All @@ -157,19 +149,6 @@ private static void accelerateGeometry(OGCGeometry ogcGeometry, Operator relateO
}
}

private long computeMemorySizeInBytes(AbstractNode root)
{
if (root.getLevel() == 0) {
return ABSTRACT_NODE_INSTANCE_SIZE + ENVELOPE_INSTANCE_SIZE + root.getChildBoundables().stream().mapToLong(child -> computeMemorySizeInBytes((ItemBoundable) child)).sum();
}
return ABSTRACT_NODE_INSTANCE_SIZE + ENVELOPE_INSTANCE_SIZE + root.getChildBoundables().stream().mapToLong(child -> computeMemorySizeInBytes((AbstractNode) child)).sum();
}

private long computeMemorySizeInBytes(ItemBoundable item)
{
return ENVELOPE_INSTANCE_SIZE + ((GeometryWithPosition) item.getItem()).getEstimatedMemorySizeInBytes();
}

// doesn't include memory used by channels and addresses which are shared with PagesIndex
public DataSize getEstimatedSize()
{
Expand Down

0 comments on commit c68c0f1

Please sign in to comment.