diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloJoinIndexAdapter.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloJoinIndexAdapter.scala index d30466c44537..5f247b163cee 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloJoinIndexAdapter.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloJoinIndexAdapter.scala @@ -298,6 +298,11 @@ object AccumuloJoinIndexAdapter { } val toFeatures = AccumuloResultsToFeatures(recordIndex, resultSft) val reducer = new LocalTransformReducer(resultSft, None, None, None, hints) + if (hints.isSkipReduce) { + // override the return sft to reflect what we're actually returning, + // since the arrow sft is only created in the local reduce step + hints.hints.put(QueryHints.Internal.RETURN_SFT, resultSft) + } val recordTables = recordIndex.getTablesForQuery(filter.filter) val recordThreads = ds.config.queries.recordThreads diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/index/AttributeIndexStrategyTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/index/AttributeIndexStrategyTest.scala index 1855f08acd93..1d9ac359bcec 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/index/AttributeIndexStrategyTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/index/AttributeIndexStrategyTest.scala @@ -8,6 +8,7 @@ package org.locationtech.geomesa.accumulo.index +import com.google.gson.Gson import org.geotools.api.data._ import org.geotools.api.feature.simple.SimpleFeature import org.geotools.api.filter.Filter @@ -18,7 +19,9 @@ import org.geotools.geometry.jts.ReferencedEnvelope import org.geotools.util.Converters import org.junit.runner.RunWith import org.locationtech.geomesa.accumulo.TestWithFeatureType +import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams} import org.locationtech.geomesa.accumulo.data.AccumuloQueryPlan.{BatchScanPlan, JoinPlan} +import org.locationtech.geomesa.arrow.io.SimpleFeatureArrowFileReader import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.filter._ import org.locationtech.geomesa.index.api.FilterStrategy @@ -27,16 +30,20 @@ import org.locationtech.geomesa.index.index.attribute.AttributeIndex import org.locationtech.geomesa.index.iterators.DensityScan import org.locationtech.geomesa.index.planning.FilterSplitter import org.locationtech.geomesa.index.utils.{ExplainNull, Explainer} +import org.locationtech.geomesa.index.view.MergedDataStoreViewFactory import org.locationtech.geomesa.utils.bin.BinaryOutputEncoder import org.locationtech.geomesa.utils.collection.SelfClosingIterator -import org.locationtech.geomesa.utils.geotools.{CRS_EPSG_4326, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.{CRS_EPSG_4326, FeatureUtils, SimpleFeatureTypes} import org.locationtech.geomesa.utils.index.IndexMode import org.locationtech.geomesa.utils.io.WithClose import org.locationtech.geomesa.utils.text.WKTUtils +import org.locationtech.jts.geom.Point import org.specs2.matcher.Matcher import org.specs2.mutable.Specification import org.specs2.runner.JUnitRunner +import org.specs2.specification.core.Fragments +import java.io.ByteArrayInputStream import java.util.Date import scala.collection.JavaConverters._ @@ -81,6 +88,26 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType sft.getAttributeShards } + lazy val mergedViewDs = { + val newParams = Seq(s"${catalog}_01", s"${catalog}_02").map(c => dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> c)) + newParams.foreach { p => + WithClose(DataStoreFinder.getDataStore(p.asJava)) { ds => + ds.createSchema(sft) + WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer => + val feats = if (p(AccumuloDataStoreParams.CatalogParam.key).endsWith("1")) { features.take(2) } else { features.drop(2) } + feats.foreach(FeatureUtils.write(writer, _, useProvidedFid = true)) + } + } + } + val json = new Gson().toJson(newParams.map(_.asJava).asJava) + val params = Map(MergedDataStoreViewFactory.ConfigParam.key -> s"{stores=$json}") + DataStoreFinder.getDataStore(params.asJava) + } + + override def map(fragments: => Fragments): Fragments = super.map(fragments) ^ fragmentFactory.step { + mergedViewDs.dispose() + } + step { addFeatures(features) } @@ -102,6 +129,17 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()) } + def decodeArrow(reader: SimpleFeatureArrowFileReader): List[SimpleFeature] = { + SelfClosingIterator(reader.features()).map { f => + // round the points, as precision is lost due to the arrow encoding + val attributes = f.getAttributes.asScala.collect { + case p: Point => s"POINT (${Math.round(p.getX * 10) / 10d} ${Math.round(p.getY * 10) / 10d})" + case a => a + } + ScalaSimpleFeature.create(f.getFeatureType, f.getID, attributes.toSeq: _*) + }.toList + } + "AttributeIndexStrategy" should { "print values" in { skipped("used for debugging") @@ -194,6 +232,92 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType forall(bins.map(_.lon))(_ mustEqual 55f) } + "support arrow queries with join queries" in { + foreach(Seq(ds, mergedViewDs)) { ds => + val query = new Query(sftName, ECQL.toFilter("count>=2")) + query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE) + query.getHints.put(ARROW_SORT_FIELD, "dtg") + query.getHints.put(ARROW_DICTIONARY_FIELDS, "name") + val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty) + forall(plans)(_ must beAnInstanceOf[JoinPlan]) + val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList + forall(results)(_ must beAnInstanceOf[Array[Byte]]) + val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] } + def in() = new ByteArrayInputStream(arrows) + WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader => + val results = decodeArrow(reader) + results must haveSize(3) + results.map(_.getAttributeCount).distinct mustEqual Seq(sft.getAttributeCount) + results.map(_.getAttribute("name")) must containAllOf(Seq("bill", "bob", "charles")) + results.map(_.getAttribute(sft.indexOf("name"))) must containAllOf(Seq("bill", "bob", "charles")) + } + } + } + + "support arrow queries with join queries and transforms" in { + foreach(Seq(ds, mergedViewDs)) { ds => + val query = new Query(sftName, ECQL.toFilter("count>=2"), "dtg", "geom", "name") // note: swap order + query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE) + query.getHints.put(ARROW_SORT_FIELD, "dtg") + query.getHints.put(ARROW_DICTIONARY_FIELDS, "name") + val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty) + forall(plans)(_ must beAnInstanceOf[JoinPlan]) + val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList + forall(results)(_ must beAnInstanceOf[Array[Byte]]) + val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] } + def in() = new ByteArrayInputStream(arrows) + WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader => + val results = decodeArrow(reader) + results must haveSize(3) + results.map(_.getAttribute("dtg")) must containAllOf(Seq(billDate, bobDate, charlesDate)) + results.map(_.getAttribute(0)) must containAllOf(Seq(billDate, bobDate, charlesDate)) + results.map(_.getAttribute("geom")) must containAllOf(Seq(billGeom, bobGeom, charlesGeom)) + results.map(_.getAttribute(1)) must containAllOf(Seq(billGeom, bobGeom, charlesGeom)) + results.map(_.getAttribute("name")) must containAllOf(Seq("bill", "bob", "charles")) + results.map(_.getAttribute(2)) must containAllOf(Seq("bill", "bob", "charles")) + } + } + } + + "support arrow queries against index values" in { + foreach(Seq(ds, mergedViewDs)) { ds => + val query = new Query(sftName, ECQL.toFilter("count>=2"), "geom", "dtg") + query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE) + query.getHints.put(ARROW_SORT_FIELD, "dtg") + val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty) + forall(plans)(_ must beAnInstanceOf[BatchScanPlan]) + val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList + forall(results)(_ must beAnInstanceOf[Array[Byte]]) + val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] } + def in() = new ByteArrayInputStream(arrows) + WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader => + val results = decodeArrow(reader) + results must haveSize(3) + results.map(_.getAttribute("dtg")) must containAllOf(Seq(billDate, bobDate, charlesDate)) + } + } + } + + "support arrow queries against full values" in { + foreach(Seq(ds, mergedViewDs)) { ds => + val query = new Query(sftName, ECQL.toFilter("name>'amy'"), "geom", "dtg", "count") + query.getHints.put(ARROW_ENCODE, java.lang.Boolean.TRUE) + query.getHints.put(ARROW_SORT_FIELD, "dtg") + query.getHints.put(ARROW_DICTIONARY_FIELDS, "count") + val plans = Option(ds).collect { case ds: AccumuloDataStore => ds.getQueryPlan(query) }.getOrElse(Seq.empty) + forall(plans)(_ must beAnInstanceOf[BatchScanPlan]) + val results = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures(query).features()).map(_.getAttribute(0)).toList + forall(results)(_ must beAnInstanceOf[Array[Byte]]) + val arrows = results.foldLeft(Array.empty[Byte]) { case (res, bytes) => res ++ bytes.asInstanceOf[Array[Byte]] } + def in() = new ByteArrayInputStream(arrows) + WithClose(SimpleFeatureArrowFileReader.streaming(in)) { reader => + val results = decodeArrow(reader) + results must haveSize(3) + results.map(_.getAttribute("count")) must containAllOf(Seq(2, 3, 4)) + } + } + } + "correctly query equals with spatio-temporal filter" in { // height filter matches bob and charles, st filters only match bob val stFilters = Seq( @@ -313,21 +437,21 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType "support sampling" in { val query = new Query(sftName, ECQL.toFilter("name > 'a'")) - query.getHints.put(SAMPLING, new java.lang.Float(.5f)) + query.getHints.put(SAMPLING, Float.box(.5f)) val results = runQuery(query).toList results must haveLength(2) } "support sampling with cql" in { val query = new Query(sftName, ECQL.toFilter("name > 'a' AND track > 'track'")) - query.getHints.put(SAMPLING, new java.lang.Float(.5f)) + query.getHints.put(SAMPLING, Float.box(.5f)) val results = runQuery(query).toList results must haveLength(2) } "support sampling with transformations" in { val query = new Query(sftName, ECQL.toFilter("name > 'a'"), "name", "geom") - query.getHints.put(SAMPLING, new java.lang.Float(.5f)) + query.getHints.put(SAMPLING, Float.box(.5f)) val results = runQuery(query).toList results must haveLength(2) forall(results)(_.getAttributeCount mustEqual 2) @@ -335,7 +459,7 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType "support sampling with cql and transformations" in { val query = new Query(sftName, ECQL.toFilter("name > 'a' AND track > 'track'"), "name", "geom") - query.getHints.put(SAMPLING, new java.lang.Float(.2f)) + query.getHints.put(SAMPLING, Float.box(.2f)) val results = runQuery(query).toList results must haveLength(1) results.head.getAttributeCount mustEqual 2 @@ -343,7 +467,7 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType "support sampling by thread" in { val query = new Query(sftName, ECQL.toFilter("name > 'a'")) - query.getHints.put(SAMPLING, new java.lang.Float(.5f)) + query.getHints.put(SAMPLING, Float.box(.5f)) query.getHints.put(SAMPLE_BY, "track") val results = runQuery(query).toList results.length must beLessThan(4) // note: due to sharding and multiple ranges, we don't get exact sampling @@ -356,7 +480,7 @@ class AttributeIndexStrategyTest extends Specification with TestWithFeatureType val query = new Query(sftName, ECQL.toFilter("name > 'a'")) query.getHints.put(BIN_TRACK, "name") query.getHints.put(BIN_BATCH_SIZE, 1000) - query.getHints.put(SAMPLING, new java.lang.Float(.5f)) + query.getHints.put(SAMPLING, Float.box(.5f)) // have to evaluate attributes before pulling into collection, as the same sf is reused val results = runQuery(query).map(_.getAttribute(BIN_ATTRIBUTE_INDEX)).toList forall(results)(_ must beAnInstanceOf[Array[Byte]])