Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for min aggregation #257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_min.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
id: elastic_aggregation_min
title: "Min Aggregation"
---

The `Min` aggregation is a single-value metrics aggregation that keeps track and returns the minimum value among the numeric values extracted from the aggregated documents.

In order to use the `Min` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.MinAggregation
import zio.elasticsearch.ElasticAggregation.minAggregation
```

You can create a `Min` aggregation using the `minAggregation` method this way:
```scala
val aggregation: MinAggregation = minAggregation(name = "minAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Min` aggregation using the `minAggregation` method this way:
```scala
// Document.intField must be number value, because of Min aggregation
val aggregation: MinAggregation = minAggregation(name = "minAggregation", field = Document.intField)
```

If you want to change the `missing` parameter, you can use `missing` method:
```scala
val aggregationWithMissing: MinAggregation = minAggregation(name = "minAggregation", field = Document.intField).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = minAggregation(name = "minAggregation1", field = Document.intField).withAgg(minAggregation(name = "minAggregation2", field = Document.doubleField))
```

You can find more information about `Min` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics-min-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
import zio.elasticsearch.query.{FunctionScoreBoostMode, FunctionScoreFunction}
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
import zio.elasticsearch.result.{AvgAggregationResult, Item, MaxAggregationResult, UpdateByQueryResult}
import zio.elasticsearch.result._
import zio.elasticsearch.script.{Painless, Script}
import zio.json.ast.Json.{Arr, Str}
import zio.schema.codec.JsonCodec
Expand Down Expand Up @@ -68,9 +68,8 @@ object HttpExecutorSpec extends IntegrationSpec {
aggregation = avgAggregation(name = "aggregationDouble", field = TestDocument.doubleField)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
expectedResponse = ("aggregationDouble", AvgAggregationResult(value = 15.0))
} yield assert(aggsRes.head)(equalTo(expectedResponse))
.asAvgAggregation("aggregationDouble")
} yield assert(aggsRes.head.value)(equalTo(15.0))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Expand Down Expand Up @@ -126,6 +125,31 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using min aggregation") {
val expectedResponse = ("aggregationInt", MinAggregationResult(value = 23.0))
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 200))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 23))
.refreshTrue
)
aggregation = minAggregation(name = "aggregationInt", field = TestDocument.intField)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.asMinAggregation("aggregationInt")
} yield assert(aggsRes.head.value)(equalTo(23.0))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using terms aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,34 @@ object ElasticAggregation {
final def maxAggregation(name: String, field: String): MaxAggregation =
Max(name = name, field = field, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MinAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which min aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.MinAggregation]] that represents min aggregation to be performed.
*/
final def minAggregation[A: Numeric](name: String, field: Field[_, A]): MinAggregation =
Min(name = name, field = field.toString, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.MinAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which min aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MinAggregation]] that represents min aggregation to be performed.
*/
final def minAggregation(name: String, field: String): MinAggregation =
Min(name = name, field = field, missing = None)

/**
* Constructs an empty instance of the [[zio.elasticsearch.aggregation.MultipleAggregations]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,23 @@ private[elasticsearch] final case class Max(name: String, field: String, missing
}
}

sealed trait MinAggregation extends SingleElasticAggregation with HasMissing[MinAggregation] with WithAgg

private[elasticsearch] final case class Min(name: String, field: String, missing: Option[Double])
extends MinAggregation { self =>
def missing(value: Double): MinAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(name -> Obj("min" -> (Obj("field" -> field.toJson) merge missingJson)))
}
}

sealed trait MultipleAggregations extends ElasticAggregation with WithAgg {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object AggregationResponse {
CardinalityAggregationResult(value)
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
MinAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
TermsAggregationResult(
docErrorCount = docErrorCount,
Expand Down Expand Up @@ -69,6 +71,12 @@ private[elasticsearch] object MaxAggregationResponse {
implicit val decoder: JsonDecoder[MaxAggregationResponse] = DeriveJsonDecoder.gen[MaxAggregationResponse]
}

private[elasticsearch] final case class MinAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object MinAggregationResponse {
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
}

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -102,10 +110,14 @@ private[elasticsearch] object TermsAggregationBucket {
val objFields = data.unsafeAs[Obj].fields.toMap

(field: @unchecked) match {
case str if str.contains("avg#") =>
Some(field -> AvgAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("cardinality#") =>
Some(field -> CardinalityAggregationResponse(value = objFields("value").unsafeAs[Int]))
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Some(
field -> TermsAggregationResponse(
Expand All @@ -125,10 +137,14 @@ private[elasticsearch] object TermsAggregationBucket {
val subAggs = allFields.collect {
case (field, data) if field != "key" && field != "doc_count" =>
(field: @unchecked) match {
case str if str.contains("avg#") =>
(field.split("#")(1), data.asInstanceOf[AvgAggregationResponse])
case str if str.contains("cardinality#") =>
(field.split("#")(1), data.asInstanceOf[CardinalityAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("terms#") =>
(field.split("#")(1), data.asInstanceOf[TermsAggregationResponse])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
AvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("terms#") =>
Expand Down
33 changes: 25 additions & 8 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@

package zio

import zio.elasticsearch.result.{
AggregationResult,
CardinalityAggregationResult,
DocumentResult,
MaxAggregationResult,
ResultWithAggregation,
TermsAggregationResult
}
import zio.elasticsearch.result._
import zio.prelude.Newtype
import zio.schema.Schema

Expand Down Expand Up @@ -67,6 +60,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def aggregations: RIO[R, Map[String, AggregationResult]] =
zio.flatMap(_.aggregations)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.AvgAggregationResult]].
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
*/
def asAvgAggregation(name: String): RIO[R, Option[AvgAggregationResult]] =
aggregationAs[AvgAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand All @@ -91,6 +96,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.MinAggregationResult]].
*/
def asMinAggregation(name: String): RIO[R, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ final case class CardinalityAggregationResult private[elasticsearch] (value: Int

final case class MaxAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
docErrorCount: Int,
sumOtherDocCount: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ private[elasticsearch] sealed trait ResultWithAggregation {

def aggregations: Task[Map[String, AggregationResult]]

def asAvgAggregation(name: String): IO[DecodingException, Option[AvgAggregationResult]] =
aggregationAs[AvgAggregationResult](name)

def asCardinalityAggregation(name: String): IO[DecodingException, Option[CardinalityAggregationResult]] =
aggregationAs[CardinalityAggregationResult](name)

def asMaxAggregation(name: String): IO[DecodingException, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

def asTermsAggregation(name: String): IO[DecodingException, Option[TermsAggregationResult]] =
aggregationAs[TermsAggregationResult](name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
equalTo(Max(name = "aggregation", field = "intField", missing = Some(20.0)))
)
},
test("min") {
dbulaja98 marked this conversation as resolved.
Show resolved Hide resolved
val aggregation = minAggregation("aggregation", "testField")
val aggregationTs = minAggregation("aggregation", TestSubDocument.intField)
val aggregationTsRaw = minAggregation("aggregation", TestSubDocument.intField.raw)
val aggregationWithMissing = minAggregation("aggregation", TestSubDocument.intField).missing(20.0)

assert(aggregation)(equalTo(Min(name = "aggregation", field = "testField", missing = None))) &&
assert(aggregationTs)(equalTo(Min(name = "aggregation", field = "intField", missing = None))) &&
assert(aggregationTsRaw)(equalTo(Min(name = "aggregation", field = "intField.raw", missing = None))) &&
assert(aggregationWithMissing)(
equalTo(Min(name = "aggregation", field = "intField", missing = Some(20.0)))
)
},
test("multiple") {
val aggregation =
multipleAggregations.aggregations(
Expand Down Expand Up @@ -581,6 +594,49 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("min") {
val aggregation = minAggregation("aggregation", "testField")
val aggregationTs = minAggregation("aggregation", TestDocument.intField)
val aggregationWithMissing = minAggregation("aggregation", TestDocument.intField).missing(20.0)

val expected =
"""
|{
| "aggregation": {
| "min": {
| "field": "testField"
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "min": {
| "field": "intField"
| }
| }
|}
|""".stripMargin

val expectedWithMissing =
"""
|{
| "aggregation": {
| "min": {
| "field": "intField",
| "missing": 20.0
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("multiple") {
val aggregation =
multipleAggregations.aggregations(
Expand Down
2 changes: 2 additions & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ module.exports = {
label: 'Elastic Aggregation',
items: [
'overview/elastic_aggregation',
'overview/aggregations/elastic_aggregation_avg',
'overview/aggregations/elastic_aggregation_bucket_selector',
'overview/aggregations/elastic_aggregation_bucket_sort',
'overview/aggregations/elastic_aggregation_cardinality',
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_min',
'overview/aggregations/elastic_aggregation_terms',
],
},
Expand Down