Skip to content

Commit

Permalink
GEOMESA-3440 Accumulo - Fix merged view Arrow join attribute index qu…
Browse files Browse the repository at this point in the history
…eries (#3267)
  • Loading branch information
elahrvivaz committed Jan 31, 2025
1 parent 1e28392 commit e0be030
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ object AccumuloJoinIndexAdapter {
}
val toFeatures = AccumuloResultsToFeatures(recordIndex, resultSft)
val reducer = new LocalTransformReducer(resultSft, None, None, None, hints)
if (hints.isSkipReduce) {
// override the return sft to reflect what we're actually returning,
// since the arrow sft is only created in the local reduce step
hints.hints.put(QueryHints.Internal.RETURN_SFT, resultSft)
}

val recordTables = recordIndex.getTablesForQuery(filter.filter)
val recordThreads = ds.config.queries.recordThreads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.locationtech.geomesa.accumulo.index

import com.google.gson.Gson
import org.geotools.api.data._
import org.geotools.api.feature.simple.SimpleFeature
import org.geotools.api.filter.Filter
Expand All @@ -18,7 +19,9 @@ import org.geotools.geometry.jts.ReferencedEnvelope
import org.geotools.util.Converters
import org.junit.runner.RunWith
import org.locationtech.geomesa.accumulo.TestWithFeatureType
import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams}
import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{BatchScanPlan, JoinPlan}
import org.locationtech.geomesa.arrow.io.SimpleFeatureArrowFileReader
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.filter._
import org.locationtech.geomesa.index.api.FilterStrategy
Expand All @@ -27,16 +30,20 @@ import org.locationtech.geomesa.index.index.attribute.AttributeIndex
import org.locationtech.geomesa.index.iterators.DensityScan
import org.locationtech.geomesa.index.planning.FilterSplitter
import org.locationtech.geomesa.index.utils.{ExplainNull, Explainer}
import org.locationtech.geomesa.index.view.MergedDataStoreViewFactory
import org.locationtech.geomesa.utils.bin.BinaryOutputEncoder
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{CRS_EPSG_4326, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.geotools.{CRS_EPSG_4326, FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.index.IndexMode
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.text.WKTUtils
import org.locationtech.jts.geom.Point
import org.specs2.matcher.Matcher
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner
import org.specs2.specification.core.Fragments

import java.io.ByteArrayInputStream
import java.util.Date
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -81,6 +88,26 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType
sft.getAttributeShards
}

lazy val mergedViewDs = {
val newParams = Seq(s"${catalog}_01", s"${catalog}_02").map(c => dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> c))
newParams.foreach { p =>
WithClose(DataStoreFinder.getDataStore(p.asJava)) { ds =>
ds.createSchema(sft)
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer =>
val feats = if (p(AccumuloDataStoreParams.CatalogParam.key).endsWith("1")) { features.take(2) } else { features.drop(2) }
feats.foreach(FeatureUtils.write(writer, _, useProvidedFid = true))
}
}
}
val json = new Gson().toJson(newParams.map(_.asJava).asJava)
val params = Map(MergedDataStoreViewFactory.ConfigParam.key -> s"{stores=$json}")
DataStoreFinder.getDataStore(params.asJava)
}

override def map(fragments: => Fragments): Fragments = super.map(fragments) ^ fragmentFactory.step {
mergedViewDs.dispose()
}

step {
addFeatures(features)
}
Expand All @@ -102,6 +129,17 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType
SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features())
}

def decodeArrow(reader: SimpleFeatureArrowFileReader): List[SimpleFeature] = {
SelfClosingIterator(reader.features()).map { f =>
// round the points, as precision is lost due to the arrow encoding
val attributes = f.getAttributes.asScala.collect {
case p: Point => s"POINT (${Math.round(p.getX * 10) / 10d} ${Math.round(p.getY * 10) / 10d})"
case a => a
}
ScalaSimpleFeature.create(f.getFeatureType, f.getID, attributes.toSeq: _*)
}.toList
}

"AttributeIndexStrategy" should {
"print values" in {
skipped("used for debugging")
Expand Down Expand Up @@ -194,6 +232,92 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType
forall(bins.map(_.lon))(_ mustEqual 55f)
}

"support arrow queries with join queries" in {
foreach(Seq(ds, mergedViewDs)) { ds =>
val query = new Query(sftName, ECQL.toFilter("count>=2"))
query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
query.getHints.put(ARROW_SORT_FIELD, "dtg")
query.getHints.put(ARROW_DICTIONARY_FIELDS, "name")
val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty)
forall(plans)(_ must beAnInstanceOf[JoinPlan])
val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList
forall(results)(_ must beAnInstanceOf[Array[Byte]])
val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] }
def in() = new ByteArrayInputStream(arrows)
WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader =>
val results = decodeArrow(reader)
results must haveSize(3)
results.map(_.getAttributeCount).distinct mustEqual Seq(sft.getAttributeCount)
results.map(_.getAttribute("name")) must containAllOf(Seq("bill", "bob", "charles"))
results.map(_.getAttribute(sft.indexOf("name"))) must containAllOf(Seq("bill", "bob", "charles"))
}
}
}

"support arrow queries with join queries and transforms" in {
foreach(Seq(ds, mergedViewDs)) { ds =>
val query = new Query(sftName, ECQL.toFilter("count>=2"), "dtg", "geom", "name") // note: swap order
query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
query.getHints.put(ARROW_SORT_FIELD, "dtg")
query.getHints.put(ARROW_DICTIONARY_FIELDS, "name")
val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty)
forall(plans)(_ must beAnInstanceOf[JoinPlan])
val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList
forall(results)(_ must beAnInstanceOf[Array[Byte]])
val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] }
def in() = new ByteArrayInputStream(arrows)
WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader =>
val results = decodeArrow(reader)
results must haveSize(3)
results.map(_.getAttribute("dtg")) must containAllOf(Seq(billDate, bobDate, charlesDate))
results.map(_.getAttribute(0)) must containAllOf(Seq(billDate, bobDate, charlesDate))
results.map(_.getAttribute("geom")) must containAllOf(Seq(billGeom, bobGeom, charlesGeom))
results.map(_.getAttribute(1)) must containAllOf(Seq(billGeom, bobGeom, charlesGeom))
results.map(_.getAttribute("name")) must containAllOf(Seq("bill", "bob", "charles"))
results.map(_.getAttribute(2)) must containAllOf(Seq("bill", "bob", "charles"))
}
}
}

"support arrow queries against index values" in {
foreach(Seq(ds, mergedViewDs)) { ds =>
val query = new Query(sftName, ECQL.toFilter("count>=2"), "geom", "dtg")
query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
query.getHints.put(ARROW_SORT_FIELD, "dtg")
val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty)
forall(plans)(_ must beAnInstanceOf[BatchScanPlan])
val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList
forall(results)(_ must beAnInstanceOf[Array[Byte]])
val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] }
def in() = new ByteArrayInputStream(arrows)
WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader =>
val results = decodeArrow(reader)
results must haveSize(3)
results.map(_.getAttribute("dtg")) must containAllOf(Seq(billDate, bobDate, charlesDate))
}
}
}

"support arrow queries against full values" in {
foreach(Seq(ds, mergedViewDs)) { ds =>
val query = new Query(sftName, ECQL.toFilter("name>'amy'"), "geom", "dtg", "count")
query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
query.getHints.put(ARROW_SORT_FIELD, "dtg")
query.getHints.put(ARROW_DICTIONARY_FIELDS, "count")
val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty)
forall(plans)(_ must beAnInstanceOf[BatchScanPlan])
val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList
forall(results)(_ must beAnInstanceOf[Array[Byte]])
val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] }
def in() = new ByteArrayInputStream(arrows)
WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader =>
val results = decodeArrow(reader)
results must haveSize(3)
results.map(_.getAttribute("count")) must containAllOf(Seq(2, 3, 4))
}
}
}

"correctly query equals with spatio-temporal filter" in {
// height filter matches bob and charles, st filters only match bob
val stFilters = Seq(
Expand Down Expand Up @@ -313,37 +437,37 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType

"support sampling" in {
val query = new Query(sftName, ECQL.toFilter("name > 'a'"))
query.getHints.put(SAMPLING, new java.lang.Float(.5f))
query.getHints.put(SAMPLING, Float.box(.5f))
val results = runQuery(query).toList
results must haveLength(2)
}

"support sampling with cql" in {
val query = new Query(sftName, ECQL.toFilter("name > 'a' AND track > 'track'"))
query.getHints.put(SAMPLING, new java.lang.Float(.5f))
query.getHints.put(SAMPLING, Float.box(.5f))
val results = runQuery(query).toList
results must haveLength(2)
}

"support sampling with transformations" in {
val query = new Query(sftName, ECQL.toFilter("name > 'a'"), "name", "geom")
query.getHints.put(SAMPLING, new java.lang.Float(.5f))
query.getHints.put(SAMPLING, Float.box(.5f))
val results = runQuery(query).toList
results must haveLength(2)
forall(results)(_.getAttributeCount mustEqual 2)
}

"support sampling with cql and transformations" in {
val query = new Query(sftName, ECQL.toFilter("name > 'a' AND track > 'track'"), "name", "geom")
query.getHints.put(SAMPLING, new java.lang.Float(.2f))
query.getHints.put(SAMPLING, Float.box(.2f))
val results = runQuery(query).toList
results must haveLength(1)
results.head.getAttributeCount mustEqual 2
}

"support sampling by thread" in {
val query = new Query(sftName, ECQL.toFilter("name > 'a'"))
query.getHints.put(SAMPLING, new java.lang.Float(.5f))
query.getHints.put(SAMPLING, Float.box(.5f))
query.getHints.put(SAMPLE_BY, "track")
val results = runQuery(query).toList
results.length must beLessThan(4) // note: due to sharding and multiple ranges, we don't get exact sampling
Expand All @@ -356,7 +480,7 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType
val query = new Query(sftName, ECQL.toFilter("name > 'a'"))
query.getHints.put(BIN_TRACK, "name")
query.getHints.put(BIN_BATCH_SIZE, 1000)
query.getHints.put(SAMPLING, new java.lang.Float(.5f))
query.getHints.put(SAMPLING, Float.box(.5f))
// have to evaluate attributes before pulling into collection, as the same sf is reused
val results = runQuery(query).map(_.getAttribute(BIN_ATTRIBUTE_INDEX)).toList
forall(results)(_ must beAnInstanceOf[Array[Byte]])
Expand Down

0 comments on commit e0be030

Please sign in to comment.