Skip to content

Commit

Permalink
Arrow/Parquet GetNextArrowArray(): implement full spatial filtering (…
Browse files Browse the repository at this point in the history
…not just bbox intersection) (fixes OSGeo#8347)
  • Loading branch information
rouault committed Sep 6, 2023
1 parent 6e0e237 commit cab18ad
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
100 changes: 100 additions & 0 deletions autotest/ogr/ogr_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,106 @@ def test_ogr_parquet_arrow_stream_numpy_fast_spatial_filter():
assert len(batches) == 0


###############################################################################


def test_ogr_parquet_arrow_stream_numpy_detailed_spatial_filter(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

filename = str(
tmp_vsimem
/ "test_ogr_parquet_arrow_stream_numpy_detailed_spatial_filter.parquet"
)
ds = ogr.GetDriverByName("Parquet").CreateDataSource(filename)
lyr = ds.CreateLayer("test", options=["FID=fid"])
for idx, wkt in enumerate(
[
"POINT(1 2)",
"MULTIPOINT(0 0,1 2)",
"LINESTRING(3 4,5 6)",
"MULTILINESTRING((7 8,7.5 8.5),(3 4,5 6))",
"POLYGON((10 20,10 30,20 30,10 20),(11 21,11 29,19 29,11 21))",
"MULTIPOLYGON(((100 100,100 200,200 200,100 100)),((10 20,10 30,20 30,10 20),(11 21,11 29,19 29,11 21)))",
"LINESTRING EMPTY",
"MULTILINESTRING EMPTY",
"POLYGON EMPTY",
"MULTIPOLYGON EMPTY",
"GEOMETRYCOLLECTION EMPTY",
]
):
f = ogr.Feature(lyr.GetLayerDefn())
f.SetFID(idx)
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(wkt))
lyr.CreateFeature(f)
ds = None

ds = ogr.Open(filename)
lyr = ds.GetLayer(0)

eps = 1e-1

# Select nothing
with ogrtest.spatial_filter(lyr, 6, 0, 8, 1):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 0

# Select POINT and MULTIPOINT
with ogrtest.spatial_filter(lyr, 1 - eps, 2 - eps, 1 + eps, 2 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 1
assert list(batches[0]["fid"]) == [0, 1]
assert [f.GetFID() for f in lyr] == [0, 1]

# Select LINESTRING and MULTILINESTRING due to point falling in bbox
with ogrtest.spatial_filter(lyr, 3 - eps, 4 - eps, 3 + eps, 4 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 1
assert list(batches[0]["fid"]) == [2, 3]
assert [f.GetFID() for f in lyr] == [2, 3]

# Select LINESTRING and MULTILINESTRING due to point falling in bbox
with ogrtest.spatial_filter(lyr, 5 - eps, 6 - eps, 5 + eps, 6 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 1
assert list(batches[0]["fid"]) == [2, 3]
assert [f.GetFID() for f in lyr] == [2, 3]

# Select LINESTRING and MULTILINESTRING due to more generic intersection
with ogrtest.spatial_filter(lyr, 4 - eps, 5 - eps, 4 + eps, 5 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 1
assert list(batches[0]["fid"]) == [2, 3]
assert [f.GetFID() for f in lyr] == [2, 3]

# Select POLYGON and MULTIPOLYGON due to point falling in bbox
with ogrtest.spatial_filter(lyr, 10 - eps, 20 - eps, 10 + eps, 20 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
assert len(batches) == 1
assert list(batches[0]["fid"]) == [4, 5]
assert [f.GetFID() for f in lyr] == [4, 5]

# bbox with polygon hole
with ogrtest.spatial_filter(lyr, 12 - eps, 20.5 - eps, 12 + eps, 20.5 + eps):
stream = lyr.GetArrowStreamAsNumPy(options=["USE_MASKED_ARRAYS=NO"])
batches = [batch for batch in stream]
if ogrtest.have_geos():
assert len(batches) == 0
else:
assert len(batches) == 1
assert list(batches[0]["fid"]) == [4, 5]
assert [f.GetFID() for f in lyr] == [4, 5]

ds = None
gdal.Unlink(filename)


###############################################################################
# Test SetAttributeFilter() and arrow stream interface

Expand Down
20 changes: 9 additions & 11 deletions ogr/ogrsf_frmts/generic/ogrlayerarrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2379,8 +2379,7 @@ CompactFixedWidthArray(struct ArrowArray *array, int nWidth,

template <class OffsetType>
static size_t
FillValidityArrayFromWKBArray(struct ArrowArray *array,
const OGREnvelope &sFilterEnvelope,
FillValidityArrayFromWKBArray(struct ArrowArray *array, const OGRLayer *poLayer,
std::vector<bool> &abyValidityFromFilters)
{
const size_t nLength = static_cast<size_t>(array->length);
Expand All @@ -2399,11 +2398,10 @@ FillValidityArrayFromWKBArray(struct ArrowArray *array,
{
if (!pabyValidity || TestBit(pabyValidity, i + nOffset))
{
if (OGRWKBGetBoundingBox(
pabyData + panOffsets[i],
static_cast<size_t>(panOffsets[i + 1] - panOffsets[i]),
sEnvelope) &&
sFilterEnvelope.Intersects(sEnvelope))
const GByte *pabyWKB = pabyData + panOffsets[i];
const size_t nWKBSize =
static_cast<size_t>(panOffsets[i + 1] - panOffsets[i]);
if (poLayer->FilterWKBGeometry(pabyWKB, nWKBSize, sEnvelope))
{
abyValidityFromFilters[i] = true;
nCountIntersecting++;
Expand Down Expand Up @@ -2880,11 +2878,11 @@ void OGRLayer::PostFilterArrowArray(const struct ArrowSchema *schema,
const size_t nCountIntersectingGeom =
m_poFilterGeom ? (strcmp(schema->children[iGeomField]->format, "z") == 0
? FillValidityArrayFromWKBArray<uint32_t>(
array->children[iGeomField],
m_sFilterEnvelope, abyValidityFromFilters)
array->children[iGeomField], this,
abyValidityFromFilters)
: FillValidityArrayFromWKBArray<uint64_t>(
array->children[iGeomField],
m_sFilterEnvelope, abyValidityFromFilters))
array->children[iGeomField], this,
abyValidityFromFilters))
: nLength;
if (!m_poFilterGeom)
abyValidityFromFilters.resize(nLength, true);
Expand Down

0 comments on commit cab18ad

Please sign in to comment.