Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for percentiles aggregation #306

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_percentiles.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
id: elastic_aggregation_percentiles
title: "Percentiles Aggregation"
---

The `Percentiles` aggregation is a multi-value metrics aggregation that calculates one or more percentiles over numeric values extracted from the aggregated documents.

In order to use the `Percentiles` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.PercentilesAggregation
import zio.elasticsearch.ElasticAggregation.percentilesAggregation
```

You can create a `Percentiles` aggregation using the `percentilesAggregation` method this way:
```scala
val aggregation: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Percentiles` aggregation using the `percentilesAggregation` method this way:
```scala
// Document.intField must be number value
val aggregation: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField)
```

If you want to specify the percentiles you want to calculate, you can use `percents` method:
```scala
val aggregationWithPercents: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField).percents(15, 50, 70)
```

If you want to change the `missing`, you can use `missing` method:
```scala
val aggregationWithMissing: PercentilesAggregation = percentilesAggregation(name = "percentilesAggregation", field = Document.intField).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = percentilesAggregation(name = "percentilesAggregation1", field = Document.intField).withAgg(percentilesAggregation(name = "percentilesAggregation2", field = Document.doubleField))
```

You can find more information about `Percentiles` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics-percentile-aggregation.html).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary new line.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have it everywhere.

Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,56 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentiles aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
)
aggregation =
percentilesAggregation(name = "aggregationInt", field = TestDocument.intField).percents(25, 50, 90)
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.asPercentilesAggregation("aggregationInt")
} yield assert(aggsRes.head.values.size)(equalTo(3))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentiles aggregation as sub aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
)
aggregation =
termsAggregation(name = "first", field = TestDocument.stringField.keyword)
.withSubAgg(percentilesAggregation(name = "second", field = TestSubDocument.intField))
aggsRes <- Executor
.execute(ElasticRequest.aggregate(index = firstSearchIndex, aggregation = aggregation))
.aggregations
} yield assert(aggsRes)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using sum aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,37 @@ object ElasticAggregation {
final def multipleAggregations: MultipleAggregations =
Multiple(aggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which percentiles aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] that represents percentiles aggregation
* to be performed.
*/
final def percentilesAggregation[A: Numeric](name: String, field: Field[_, A]): PercentilesAggregation =
Percentiles(name = name, field = field.toString, percents = Chunk.empty, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which percentiles aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] that represents percentiles aggregation
* to be performed.
*/
final def percentilesAggregation(name: String, field: String): PercentilesAggregation =
Percentiles(name = name, field = field, percents = Chunk.empty, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,48 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
aggregations.map(_.toJson).reduce(_ merge _)
}

sealed trait PercentilesAggregation
extends SingleElasticAggregation
with HasMissing[PercentilesAggregation]
with WithAgg {

/**
* Sets the `percents` parameter for the [[zio.elasticsearch.aggregation.PercentilesAggregation]].
*
* @param percents
* a array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
* @return
* an instance of the [[zio.elasticsearch.aggregation.PercentilesAggregation]] enriched with the `percents`
* parameter.
*/
def percents(percent: Double, percents: Double*): PercentilesAggregation
}

private[elasticsearch] final case class Percentiles(
name: String,
field: String,
missing: Option[Double],
percents: Chunk[Double]
) extends PercentilesAggregation { self =>

def missing(value: Double): PercentilesAggregation =
self.copy(missing = Some(value))

def percents(percent: Double, percents: Double*): PercentilesAggregation =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any specific reason why we named this percents if the aggregation is percentiles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In elastic api the parameter is named percents. But i can change the name to percentiles if it seems more intuitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed to leave this named percents

self.copy(percents = Chunk.fromIterable(percent +: percents))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val percentsField =
(if (percents.nonEmpty) Some("percents" -> Arr(percents.map(_.toJson))) else None) ++ missing.map(
"missing" -> _.toJson
)
Obj(name -> Obj("percentiles" -> (Obj("field" -> field.toJson) merge Obj(Chunk.fromIterable(percentsField)))))
}
}

sealed trait SumAggregation extends SingleElasticAggregation with HasMissing[SumAggregation] with WithAgg

private[elasticsearch] final case class Sum(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ object AggregationResponse {
MinAggregationResult(value)
case MissingAggregationResponse(value) =>
MissingAggregationResult(value)
case PercentilesAggregationResponse(values) =>
PercentilesAggregationResult(values)
case SumAggregationResponse(value) =>
SumAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
Expand Down Expand Up @@ -88,6 +90,14 @@ private[elasticsearch] object MissingAggregationResponse {
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
}

private[elasticsearch] final case class PercentilesAggregationResponse(values: Map[String, Double])
extends AggregationResponse

private[elasticsearch] object PercentilesAggregationResponse {
implicit val decoder: JsonDecoder[PercentilesAggregationResponse] =
DeriveJsonDecoder.gen[PercentilesAggregationResponse]
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at lines below (140+). You must adjust that too. Please, test this aggregation as sub aggregation of some other aggregation (you can add one it-test also).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And below at 171+.

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
Expand Down Expand Up @@ -138,6 +148,8 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("missing#") =>
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
case str if str.contains("percentiles#") =>
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Expand Down Expand Up @@ -169,6 +181,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("missing#") =>
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
case str if str.contains("percentiles#") =>
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
case str if str.contains("sum#") =>
(field.split("#")(1), data.asInstanceOf[SumAggregationResponse])
case str if str.contains("terms#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("missing#") =>
MissingAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("percentiles#") =>
PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.PercentilesAggregationResult]].
*/
def asPercentilesAggregation(name: String): RIO[R, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ final case class MinAggregationResult private[elasticsearch] (value: Double) ext

final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult

final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double])
extends AggregationResult

final case class SumAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

def asPercentilesAggregation(name: String): IO[DecodingException, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

def asSumAggregation(name: String): IO[DecodingException, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

Expand Down
Loading