From e9394c3b1068b97712e1bf806e44ce64f49c4a9e Mon Sep 17 00:00:00 2001 From: Dimitrije Bulaja Date: Fri, 12 May 2023 11:40:36 +0200 Subject: [PATCH] (dsl): Support 'bucketSelectorPipeline' aggregation (#217) --- .../zio/elasticsearch/HttpExecutorSpec.scala | 39 +++++++---- .../elasticsearch/ElasticAggregation.scala | 22 +++++++ .../aggregation/Aggregations.scala | 21 +++++- .../zio/elasticsearch/script/Script.scala | 4 +- .../script/options/HasParams.scala | 2 +- .../ElasticAggregationSpec.scala | 64 ++++++++++++++++++- .../elasticsearch/ElasticRequestDSLSpec.scala | 2 +- .../HttpElasticExecutorSpec.scala | 2 +- .../scala/zio/elasticsearch/ScriptSpec.scala | 8 +-- .../scala/zio/elasticsearch/SortSpec.scala | 8 +-- 10 files changed, 143 insertions(+), 29 deletions(-) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index f7a3e697d..8f185e51e 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -322,27 +322,36 @@ object HttpExecutorSpec extends IntegrationSpec { Executor.execute(ElasticRequest.createIndex(firstSearchIndex)), Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), - test("search using match all query with terms aggregations with nested terms aggregation") { + test( + "search using match all query with terms aggregations, nested max aggregation and nested bucketSelector aggregation" + ) { checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) { (firstDocumentId, firstDocument, secondDocumentId, secondDocument) => for { _ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) - _ <- Executor.execute( - ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument) - ) _ <- Executor.execute( ElasticRequest - .upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument) - .refreshTrue + .upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 5)) ) + _ <- + Executor.execute( + ElasticRequest + .upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 100)) + .refreshTrue + ) query = matchAll aggregation = termsAggregation( name = "aggregationString", field = TestDocument.stringField.keyword - ).withSubAgg( - termsAggregation(name = "aggregationInt", field = "intField") - ) + ).withSubAgg(maxAggregation(name = "aggregationInt", field = TestDocument.intField)) + .withSubAgg( + bucketSelectorAggregation( + name = "aggregationSelector", + script = Script("params.aggregation_int > 10"), + bucketsPath = Map("aggregation_int" -> "aggregationInt") + ) + ) res <- Executor.execute( ElasticRequest .search( @@ -353,7 +362,9 @@ object HttpExecutorSpec extends IntegrationSpec { ) docs <- res.documentAs[TestDocument] aggs <- res.aggregations - } yield assert(docs)(isNonEmpty) && assert(aggs)(isNonEmpty) + } yield assert(docs)(isNonEmpty) && assert( + aggs("aggregationString").asInstanceOf[TermsAggregationResponse].buckets.size + )(equalTo(1)) } } @@ around( Executor.execute(ElasticRequest.createIndex(firstSearchIndex)), @@ -1502,7 +1513,7 @@ object HttpExecutorSpec extends IntegrationSpec { req6 = ElasticRequest.updateByScript( index, firstDocumentId, - Script("ctx._source.intField = params['factor']").withParams("factor" -> 100) + Script("ctx._source.intField = params['factor']").params("factor" -> 100) ) req7 = ElasticRequest @@ -1536,7 +1547,7 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticRequest.updateByScript( index, documentId, - Script("ctx._source.intField += params['factor']").withParams("factor" -> factor) + Script("ctx._source.intField += params['factor']").params("factor" -> factor) ) ) doc <- Executor.execute(ElasticRequest.getById(index, documentId)).documentAs[TestDocument] @@ -1551,7 +1562,7 @@ object HttpExecutorSpec extends IntegrationSpec { .updateByScript( index, documentId, - Script("ctx._source.intField += params['factor']").withParams("factor" -> 2) + Script("ctx._source.intField += params['factor']").params("factor" -> 2) ) .orCreate(document) ) @@ -1582,7 +1593,7 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticRequest .updateAllByQuery( updateByQueryIndex, - Script("ctx._source['stringField'] = params['str']").withParams("str" -> stringField) + Script("ctx._source['stringField'] = params['str']").params("str" -> stringField) ) .refreshTrue ) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala index 30bcef48a..18690e035 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala @@ -18,9 +18,31 @@ package zio.elasticsearch import zio.Chunk import zio.elasticsearch.aggregation._ +import zio.elasticsearch.script.Script object ElasticAggregation { + /** + * Constructs an instance of [[zio.elasticsearch.aggregation.BucketSelectorAggregation]] using the specified + * parameters. + * + * @param name + * aggregation name + * @param script + * The script to run for this aggregation. The script can be inline, file or indexed + * @param bucketsPath + * A map of script variables and their associated path to the buckets we wish to use for the variable + * @return + * an instance of [[zio.elasticsearch.aggregation.BucketSelectorAggregation]] that represents bucket selector + * aggregation to be performed. + */ + final def bucketSelectorAggregation( + name: String, + script: Script, + bucketsPath: Map[String, String] + ): BucketSelectorAggregation = + BucketSelector(name = name, script = script, bucketsPath = bucketsPath) + /** * Constructs a type-safe instance of [[zio.elasticsearch.aggregation.CardinalityAggregation]] using the specified * parameters. It calculates an approximate count of distinct values. diff --git a/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala b/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala index cd5f40735..fdbd91206 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala @@ -21,6 +21,7 @@ import zio.elasticsearch.ElasticAggregation.multipleAggregations import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps import zio.elasticsearch.aggregation.options._ import zio.elasticsearch.query.sort.Sort +import zio.elasticsearch.script.Script import zio.json.ast.Json import zio.json.ast.Json.{Arr, Obj} @@ -33,6 +34,23 @@ sealed trait ElasticAggregation { self => sealed trait SingleElasticAggregation extends ElasticAggregation +sealed trait BucketSelectorAggregation extends SingleElasticAggregation with WithAgg + +private[elasticsearch] final case class BucketSelector(name: String, script: Script, bucketsPath: Map[String, String]) + extends BucketSelectorAggregation { self => + + def withAgg(agg: SingleElasticAggregation): MultipleAggregations = + multipleAggregations.aggregations(self, agg) + + private[elasticsearch] def paramsToJson: Json = { + val bucketsPathJson: Json = Obj("buckets_path" -> bucketsPath.collect { case (scriptVal, path) => + Obj(scriptVal -> path.toJson) + }.reduce(_ merge _)) + + Obj(name -> Obj("bucket_selector" -> (bucketsPathJson merge Obj("script" -> script.toJson)))) + } +} + sealed trait BucketSortAggregation extends SingleElasticAggregation with HasSize[BucketSortAggregation] with WithAgg { /** @@ -62,7 +80,8 @@ private[elasticsearch] final case class BucketSort( sortBy: Chunk[Sort], from: Option[Int], size: Option[Int] -) extends BucketSortAggregation { self => +) extends BucketSortAggregation { + self => def from(value: Int): BucketSortAggregation = self.copy(from = Some(value)) diff --git a/modules/library/src/main/scala/zio/elasticsearch/script/Script.scala b/modules/library/src/main/scala/zio/elasticsearch/script/Script.scala index 01ce51551..0aad8ea08 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/script/Script.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/script/Script.scala @@ -30,10 +30,10 @@ private[elasticsearch] final case class Script( def lang(value: String): Script = self.copy(lang = Some(value)) - def withParams(values: (String, Any)*): Script = + def params(values: (String, Any)*): Script = self.copy(params = params ++ values.toList) - def toJson: Json = + private[elasticsearch] def toJson: Json = Obj( List( self.lang.map(lang => "lang" -> lang.toJson), diff --git a/modules/library/src/main/scala/zio/elasticsearch/script/options/HasParams.scala b/modules/library/src/main/scala/zio/elasticsearch/script/options/HasParams.scala index 590488f6c..a10c2b465 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/script/options/HasParams.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/script/options/HasParams.scala @@ -26,5 +26,5 @@ private[elasticsearch] trait HasParams[S <: HasParams[S]] { * @return * an instance of the [[zio.elasticsearch.script.Script]] enriched with the `params` parameter. */ - def withParams(values: (String, Any)*): S + def params(values: (String, Any)*): S } diff --git a/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala index 5f5074d7d..deb98dee7 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala @@ -6,6 +6,7 @@ import zio.elasticsearch.aggregation._ import zio.elasticsearch.domain.{TestDocument, TestSubDocument} import zio.elasticsearch.query.sort.SortOrder.{Asc, Desc} import zio.elasticsearch.query.sort.{SortByFieldOptions, SortOrder} +import zio.elasticsearch.script.Script import zio.elasticsearch.utils._ import zio.test.Assertion.equalTo import zio.test._ @@ -14,6 +15,37 @@ object ElasticAggregationSpec extends ZIOSpecDefault { def spec: Spec[TestEnvironment, Any] = suite("ElasticAggregation")( suite("constructing")( + test("bucketSelector") { + val aggregation1 = bucketSelectorAggregation( + name = "aggregation", + script = Script("params.agg1 > 10"), + bucketsPath = Map("agg1" -> "aggregation1") + ) + val aggregation2 = bucketSelectorAggregation( + name = "aggregation", + script = Script("params.agg1 + params.agg2 > 10"), + bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2") + ) + + assert(aggregation1)( + equalTo( + BucketSelector( + name = "aggregation", + script = Script("params.agg1 > 10"), + bucketsPath = Map("agg1" -> "aggregation1") + ) + ) + ) && + assert(aggregation2)( + equalTo( + BucketSelector( + name = "aggregation", + script = Script("params.agg1 + params.agg2 > 10"), + bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2") + ) + ) + ) + }, test("bucketSort") { val aggregationWithFrom = bucketSortAggregation("aggregation").from(5) val aggregationWithSize = bucketSortAggregation("aggregation").size(5) @@ -67,7 +99,6 @@ object ElasticAggregationSpec extends ZIOSpecDefault { ) ) ) - }, test("cardinality") { val aggregation = cardinalityAggregation("aggregation", "testField") @@ -292,6 +323,37 @@ object ElasticAggregationSpec extends ZIOSpecDefault { } ), suite("encoding as JSON")( + test("bucketSelector") { + val aggregation1 = bucketSelectorAggregation( + name = "aggregation", + script = Script("params.agg1 > 10"), + bucketsPath = Map("agg1" -> "aggregation1") + ) + val aggregation2 = bucketSelectorAggregation( + name = "aggregation", + script = Script("params.agg1 + params.agg2 > 10"), + bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2") + ) + + assert(aggregation1)( + equalTo( + BucketSelector( + name = "aggregation", + script = Script("params.agg1 > 10"), + bucketsPath = Map("agg1" -> "aggregation1") + ) + ) + ) && + assert(aggregation2)( + equalTo( + BucketSelector( + name = "aggregation", + script = Script("params.agg1 + params.agg2 > 10"), + bucketsPath = Map("agg1" -> "aggregation1", "agg2" -> "aggregation2") + ) + ) + ) + }, test("bucketSort") { val aggregationWithFrom = bucketSortAggregation("aggregation").from(5) val aggregationWithSize = bucketSortAggregation("aggregation").size(5) diff --git a/modules/library/src/test/scala/zio/elasticsearch/ElasticRequestDSLSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/ElasticRequestDSLSpec.scala index 53ce1c31f..b4434cad4 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/ElasticRequestDSLSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/ElasticRequestDSLSpec.scala @@ -326,7 +326,7 @@ object ElasticRequestDSLSpec extends ZIOSpecDefault { val jsonRequest = updateByScript( index = Index, id = DocId, - script = Script("ctx._source.intField += params['factor']").withParams("factor" -> 2) + script = Script("ctx._source.intField += params['factor']").params("factor" -> 2) ).orCreate[TestDocument]( TestDocument( stringField = "stringField", diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 8d73b2e4d..0206b6688 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -210,7 +210,7 @@ object HttpElasticExecutorSpec extends SttpBackendStubSpec { .updateByScript( index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), - script = Script("ctx._source.intField += params['factor']").withParams("factor" -> 2) + script = Script("ctx._source.intField += params['factor']").params("factor" -> 2) ) .orCreate(doc = secondDoc) .routing(Routing("routing")) diff --git a/modules/library/src/test/scala/zio/elasticsearch/ScriptSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/ScriptSpec.scala index 226e783ce..c92a75e39 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/ScriptSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/ScriptSpec.scala @@ -14,7 +14,7 @@ object ScriptSpec extends ZIOSpecDefault { assert(Script("doc['day_of_week'].value"))(equalTo(Script("doc['day_of_week'].value", Map.empty, None))) }, test("successfully create Script with source and params") { - assert(Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2))( + assert(Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2))( equalTo(Script("doc['day_of_week'].value * params['factor']", Map("factor" -> 2), None)) ) }, @@ -24,7 +24,7 @@ object ScriptSpec extends ZIOSpecDefault { ) }, test("successfully create Script with source, params and lang") { - assert(Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"))( + assert(Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"))( equalTo(Script("doc['day_of_week'].value * params['factor']", Map("factor" -> 2), Some("painless"))) ) } @@ -44,7 +44,7 @@ object ScriptSpec extends ZIOSpecDefault { ), suite("encoding Script as JSON")( test("properly encode Script with source and params") { - val script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2) + val script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2) val expected = """ |{ @@ -70,7 +70,7 @@ object ScriptSpec extends ZIOSpecDefault { assert(script.toJson)(equalTo(expected.toJson)) }, test("properly encode Script with source, params and lang") { - val script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless") + val script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless") val expected = """ |{ diff --git a/modules/library/src/test/scala/zio/elasticsearch/SortSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/SortSpec.scala index 1ddf47b99..595217513 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/SortSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/SortSpec.scala @@ -177,7 +177,7 @@ object SortSpec extends ZIOSpecDefault { }, test("successfully create SortByScript with given `mode`") { assert( - sortBy(Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2), NumberType) + sortBy(Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2), NumberType) .mode(Avg) )( equalTo( @@ -209,7 +209,7 @@ object SortSpec extends ZIOSpecDefault { test("successfully create SortByScript with given `mode` and `order`") { assert( sortBy( - Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"), + Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"), NumberType ) .mode(Avg) @@ -414,7 +414,7 @@ object SortSpec extends ZIOSpecDefault { }, test("properly encode SortByScript with given `mode`") { val sort = sortBy( - script = Script("doc['day_of_week'].value * params['factor']").withParams("factor" -> 2), + script = Script("doc['day_of_week'].value * params['factor']").params("factor" -> 2), sourceType = NumberType ) .mode(Avg) @@ -465,7 +465,7 @@ object SortSpec extends ZIOSpecDefault { }, test("properly encode SortByScript with `mode` and `order`") { val sort = sortBy( - Script(source = "doc['day_of_week'].value * params['factor']").withParams("factor" -> 2).lang("painless"), + Script(source = "doc['day_of_week'].value * params['factor']").params("factor" -> 2).lang("painless"), NumberType ) .mode(Avg)