Skip to content

Commit

Permalink
(dsl): Support 'max' aggregation (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulaja98 authored May 2, 2023
1 parent 16452cd commit fe310db
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package zio.elasticsearch

import zio.Chunk
import zio.elasticsearch.ElasticAggregation.{multipleAggregations, termsAggregation}
import zio.elasticsearch.ElasticAggregation.{maxAggregation, multipleAggregations, termsAggregation}
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery._
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.domain.{TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
import zio.elasticsearch.executor.response.MaxAggregationResponse
import zio.elasticsearch.query.sort.SortMode.Max
import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
Expand All @@ -44,6 +45,31 @@ object HttpExecutorSpec extends IntegrationSpec {
suite("Executor")(
suite("HTTP Executor")(
suite("aggregation")(
test("aggregate using max aggregation") {
val expectedResponse = ("aggregationInt", MaxAggregationResponse(20.0))
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 20))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 10))
.refreshTrue
)
aggregation = maxAggregation(name = "aggregationInt", field = TestDocument.intField)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes.head)(equalTo(expectedResponse))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand All @@ -58,15 +84,9 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
aggregation =
termsAggregation(
name = "aggregationString",
field = TestDocument.stringField.keyword
)
termsAggregation(name = "aggregationString", field = TestDocument.stringField.keyword)
aggsRes <- Executor
.execute(
ElasticRequest
.aggregate(index = firstSearchIndex, aggregation = aggregation)
)
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes)(isNonEmpty)
}
Expand Down Expand Up @@ -106,7 +126,7 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation with nested terms aggregation") {
test("aggregate using terms aggregation with nested max aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
Expand All @@ -123,7 +143,7 @@ object HttpExecutorSpec extends IntegrationSpec {
name = "aggregationString",
field = TestDocument.stringField.keyword
)
.withSubAgg(termsAggregation(name = "aggregationInt", field = "intField.keyword"))
.withSubAgg(maxAggregation(name = "aggregationInt", field = "intField"))
aggsRes <- Executor
.execute(
ElasticRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,37 @@ object ElasticAggregation {
* @param name
* aggregation name
* @param field
* field for which terms aggregation will be executed
* the field for which terms aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.TermsAggregation]] that represents terms aggregation to be
* performed.
*/
final def termsAggregation(name: String, field: String): TermsAggregation =
Terms(name = name, field = field, order = Set.empty, subAggregations = Nil, size = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which max aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] that represents max aggregation to be performed.
*/
final def maxAggregation(name: String, field: Field[_, Any]): MaxAggregation =
Max(name = name, field = field.toString, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which max aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MaxAggregation]] that represents max aggregation to be performed.
*/
final def maxAggregation(name: String, field: String): MaxAggregation =
Max(name = name, field = field, missing = None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ sealed trait ElasticAggregation { self =>

sealed trait SingleElasticAggregation extends ElasticAggregation

sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg

private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])
extends MaxAggregation { self =>
def missing(value: Double): MaxAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def paramsToJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(name -> Obj("max" -> (Obj("field" -> field.toJson) merge missingJson)))
}
}

sealed trait MultipleAggregations extends ElasticAggregation with WithAgg {
def aggregations(aggregations: SingleElasticAggregation*): MultipleAggregations
}
Expand All @@ -40,7 +57,7 @@ private[elasticsearch] final case class Multiple(aggregations: List[SingleElasti
def aggregations(aggregations: SingleElasticAggregation*): MultipleAggregations =
self.copy(aggregations = self.aggregations ++ aggregations)

def paramsToJson: Json =
private[elasticsearch] def paramsToJson: Json =
aggregations.map(_.paramsToJson).reduce(_ merge _)

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
Expand Down Expand Up @@ -75,7 +92,7 @@ private[elasticsearch] final case class Terms(
def orderBy(order: AggregationOrder, orders: AggregationOrder*): TermsAggregation =
self.copy(order = self.order + order ++ orders.toSet)

def paramsToJson: Json =
private[elasticsearch] def paramsToJson: Json =
Obj(name -> paramsToJsonHelper)

def size(value: Int): TermsAggregation =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 LambdaWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.elasticsearch.aggregation.options

private[elasticsearch] trait HasMissing[A <: HasMissing[A]] {

/**
* Sets the `missing` parameter for the [[zio.elasticsearch.aggregation.ElasticAggregation]]. The`missing` parameter
* provides a value to use when a document is missing the field that the aggregation is running on.
*
* @param value
* the value to use for missing documents
* @return
* an instance of the [[zio.elasticsearch.aggregation.ElasticAggregation]] enriched with the `missing` parameter.
*/
def missing(value: Double): A
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

sealed trait AggregationResponse

private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MaxAggregationResponse {
implicit val decoder: JsonDecoder[MaxAggregationResponse] = DeriveJsonDecoder.gen[MaxAggregationResponse]
}

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -65,6 +71,10 @@ private[elasticsearch] object TermsAggregationBucket {
.map(_.unsafeAs[TermsAggregationBucket](TermsAggregationBucket.decoder))
)
)
case str if str.contains("max#") =>
Some(
field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double])
)
}
}
}.toMap
Expand All @@ -76,6 +86,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field: @unchecked) match {
case str if str.contains("terms#") =>
(field.split("#")(1), data.asInstanceOf[TermsAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
TermsAggregationResponse.decoder
.decodeJson(data.toString)
.map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)

}
)
}
Expand Down
Loading

0 comments on commit fe310db

Please sign in to comment.