Skip to content

Commit

Permalink
Add support for percentiles aggregation (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
milicns authored Aug 23, 2023
1 parent 59e58bd commit 0ad2161
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 1 deletion.
41 changes: 41 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_percentiles.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
id: elastic_aggregation_percentiles
title: "Percentiles Aggregation"
---

The `Percentiles` aggregation is a multi-value metrics aggregation that calculates one or more percentiles over numeric values extracted from the aggregated documents.

In order to use the `Percentiles` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.PercentilesAggregation
import zio.elasticsearch.ElasticAggregation.percentilesAggregation
```

You can create a `Percentiles` aggregation using the `percentilesAggregation` method this way:
```scala
val aggregation: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Percentiles` aggregation using the `percentilesAggregation` method this way:
```scala
// Document.intField must be number value
val aggregation: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField)
```

If you want to specify the percentiles you want to calculate, you can use `percents` method:
```scala
val aggregationWithPercents: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField).percents(15, 50, 70)
```

If you want to change the `missing`, you can use `missing` method:
```scala
val aggregationWithMissing: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = percentilesAggregation(name = "percentilesAggregation1", field = Document.intField).withAgg(percentilesAggregation(name = "percentilesAggregation2", field = Document.doubleField))
```

You can find more information about `Percentiles` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics-percentile-aggregation.html).

Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,56 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentiles aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
)
aggregation =
percentilesAggregation(name = "aggregationInt", field = TestDocument.intField).percents(25, 50, 90)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.asPercentilesAggregation("aggregationInt")
} yield assert(aggsRes.head.values.size)(equalTo(3))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentiles aggregation as sub aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
)
aggregation =
termsAggregation(name = "first", field = TestDocument.stringField.keyword)
.withSubAgg(percentilesAggregation(name = "second", field = TestSubDocument.intField))
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using sum aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,37 @@ object ElasticAggregation {
final def multipleAggregations: MultipleAggregations =
Multiple(aggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which percentiles aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] that represents percentiles aggregation
* to be performed.
*/
final def percentilesAggregation[A: Numeric](name: String, field: Field[_, A]): PercentilesAggregation =
Percentiles(name = name, field = field.toString, percents = Chunk.empty, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which percentiles aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] that represents percentiles aggregation
* to be performed.
*/
final def percentilesAggregation(name: String, field: String): PercentilesAggregation =
Percentiles(name = name, field = field, percents = Chunk.empty, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,48 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
aggregations.map(_.toJson).reduce(_ merge _)
}

sealed trait PercentilesAggregation
extends SingleElasticAggregation
with HasMissing[PercentilesAggregation]
with WithAgg {

/**
* Sets the `percents` parameter for the [[zio.elasticsearch.aggregation.PercentilesAggregation]].
*
* @param percents
* a array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
* @return
* an instance of the [[zio.elasticsearch.aggregation.PercentilesAggregation]] enriched with the `percents`
* parameter.
*/
def percents(percent: Double, percents: Double*): PercentilesAggregation
}

private[elasticsearch] final case class Percentiles(
name: String,
field: String,
missing: Option[Double],
percents: Chunk[Double]
) extends PercentilesAggregation { self =>

def missing(value: Double): PercentilesAggregation =
self.copy(missing = Some(value))

def percents(percent: Double, percents: Double*): PercentilesAggregation =
self.copy(percents = Chunk.fromIterable(percent +: percents))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val percentsField =
(if (percents.nonEmpty) Some("percents" -> Arr(percents.map(_.toJson))) else None) ++ missing.map(
"missing" -> _.toJson
)
Obj(name -> Obj("percentiles" -> (Obj("field" -> field.toJson) merge Obj(Chunk.fromIterable(percentsField)))))
}
}

sealed trait SumAggregation extends SingleElasticAggregation with HasMissing[SumAggregation] with WithAgg

private[elasticsearch] final case class Sum(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ object AggregationResponse {
MinAggregationResult(value)
case MissingAggregationResponse(value) =>
MissingAggregationResult(value)
case PercentilesAggregationResponse(values) =>
PercentilesAggregationResult(values)
case SumAggregationResponse(value) =>
SumAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
Expand Down Expand Up @@ -88,6 +90,14 @@ private[elasticsearch] object MissingAggregationResponse {
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
}

private[elasticsearch] final case class PercentilesAggregationResponse(values: Map[String, Double])
extends AggregationResponse

private[elasticsearch] object PercentilesAggregationResponse {
implicit val decoder: JsonDecoder[PercentilesAggregationResponse] =
DeriveJsonDecoder.gen[PercentilesAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
Expand Down Expand Up @@ -138,6 +148,8 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("missing#") =>
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
case str if str.contains("percentiles#") =>
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Expand Down Expand Up @@ -169,6 +181,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("missing#") =>
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
case str if str.contains("percentiles#") =>
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
case str if str.contains("sum#") =>
(field.split("#")(1), data.asInstanceOf[SumAggregationResponse])
case str if str.contains("terms#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("missing#") =>
MissingAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("percentiles#") =>
PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.PercentilesAggregationResult]].
*/
def asPercentilesAggregation(name: String): RIO[R, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ final case class MinAggregationResult private[elasticsearch] (value: Double) ext

final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult

final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double])
extends AggregationResult

final case class SumAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

def asPercentilesAggregation(name: String): IO[DecodingException, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

def asSumAggregation(name: String): IO[DecodingException, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

Expand Down
Loading

0 comments on commit 0ad2161

Please sign in to comment.