Skip to content

Commit

Permalink
(dsl): Support Percentile ranks aggregation (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanjaftn authored Nov 20, 2023
1 parent 4cf07e7 commit 5b0dd30
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 1 deletion.
35 changes: 35 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_percentile_ranks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
id: elastic_aggregation_percentile_ranks
title: "Percentiles Aggregation"
---

The `Percentile ranks` aggregation is a multi-value metrics aggregation that calculates percentile of values at or below a threshold grouped by a specified value.

In order to use the `Percentile ranks` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.PercentileRanksAggregation
import zio.elasticsearch.ElasticAggregation.percentileRanksAggregation
```

You can create a `Percentile ranks` aggregation using the `percentileRanksAggregation` method this way:
```scala
val aggregation: PercentileRanksAggregation = percentileRanksAggregation(field = "intField", name = "percentileRanksAggregation", values = 500, 600)
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Percentile ranks` aggregation using the `percentileRanksAggregation` method this way:
```scala
// Document.intField must be number value
val aggregation: PercentileRanksAggregation = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation", values = 500, 600)
```

If you want to change the `missing`, you can use `missing` method:
```scala
val aggregationWithMissing: PercentileRanksAggregation = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation", values = 500, 600).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation1", values = 500, 600).withAgg(percentileRanksAggregation(field = Document.doubleField, name = "percentileRanksAggregation2", values = 500, 600))
```

You can find more information about `Percentile ranks` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-rank-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,38 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentile ranks aggregation") {
val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0)
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument, thirdDocumentId, thirdDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 400))
)
_ <-
Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 500))
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, thirdDocumentId, thirdDocument.copy(intField = 550))
.refreshTrue
)
aggregation =
percentileRanksAggregation(name = "aggregation", field = "intField", value = 500.0, values = 600.0)
aggsRes <-
Executor
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
.asPercentileRanksAggregation("aggregation")
} yield assert(aggsRes.head.values)(equalTo(expectedResult))
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentiles aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,66 @@ object ElasticAggregation {
final def multipleAggregations: MultipleAggregations =
Multiple(aggregations = Chunk.empty)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] using the specified
* parameters.
*
* @param name
* the name of the aggregation
* @param field
* the type-safe field for which percentile ranks aggregation will be executed
* @param value
* the first value to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
* @param values
* an array of values to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] that represents percentile ranks
* aggregation to be performed.
*/
final def percentileRanksAggregation[A: Numeric](
name: String,
field: Field[_, A],
value: BigDecimal,
values: BigDecimal*
): PercentileRanksAggregation =
PercentileRanks(
name = name,
field = field.toString,
values = value +: Chunk.fromIterable(values),
missing = None
)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] using the specified
* parameters.
*
* @param name
* the name of the aggregation
* @param field
* the field for which percentile ranks aggregation will be executed
* @param value
* the first value to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
* @param values
* an array of values to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
* @return
* an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] that represents percentile ranks
* aggregation to be performed.
*/
final def percentileRanksAggregation(
name: String,
field: String,
value: BigDecimal,
values: BigDecimal*
): PercentileRanksAggregation =
PercentileRanks(
name = name,
field = field,
values = value +: Chunk.fromIterable(values),
missing = None
)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified
* parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,37 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
aggregations.map(_.toJson).reduce(_ merge _)
}

sealed trait PercentileRanksAggregation
extends SingleElasticAggregation
with HasMissing[PercentileRanksAggregation]
with WithAgg

private[elasticsearch] final case class PercentileRanks(
name: String,
field: String,
values: Chunk[BigDecimal],
missing: Option[Double]
) extends PercentileRanksAggregation { self =>

def missing(value: Double): PercentileRanksAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(
name -> Obj(
"percentile_ranks" -> ((Obj("field" -> field.toJson) merge Obj(
"values" -> Arr(values.map(_.toJson))
)) merge missingJson)
)
)
}
}

sealed trait PercentilesAggregation
extends SingleElasticAggregation
with HasMissing[PercentilesAggregation]
Expand All @@ -294,7 +325,7 @@ sealed trait PercentilesAggregation
* Sets the `percents` parameter for the [[zio.elasticsearch.aggregation.PercentilesAggregation]].
*
* @param percents
* a array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
* an array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
* @return
* an instance of the [[zio.elasticsearch.aggregation.PercentilesAggregation]] enriched with the `percents`
* parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ object AggregationResponse {
MinAggregationResult(value)
case MissingAggregationResponse(value) =>
MissingAggregationResult(value)
case PercentileRanksAggregationResponse(values) =>
PercentileRanksAggregationResult(values)
case PercentilesAggregationResponse(values) =>
PercentilesAggregationResult(values)
case StatsAggregationResponse(count, min, max, avg, sum) =>
Expand Down Expand Up @@ -201,6 +203,10 @@ private[elasticsearch] object FilterAggregationResponse extends JsonDecoderOps {
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("missing#") =>
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
case str if str.contains("percentile_ranks#") =>
Some(
field -> PercentileRanksAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]])
)
case str if str.contains("percentiles#") =>
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
case str if str.contains("stats#") =>
Expand Down Expand Up @@ -243,6 +249,8 @@ private[elasticsearch] object FilterAggregationResponse extends JsonDecoderOps {
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("missing#") =>
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
case str if str.contains("percentile_ranks#") =>
(field.split("#")(1), data.asInstanceOf[PercentileRanksAggregationResponse])
case str if str.contains("percentiles#") =>
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
case str if str.contains("stats#") =>
Expand Down Expand Up @@ -287,6 +295,14 @@ private[elasticsearch] object MissingAggregationResponse {
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
}

private[elasticsearch] final case class PercentileRanksAggregationResponse(values: Map[String, Double])
extends AggregationResponse

private[elasticsearch] object PercentileRanksAggregationResponse {
implicit val decoder: JsonDecoder[PercentileRanksAggregationResponse] =
DeriveJsonDecoder.gen[PercentileRanksAggregationResponse]
}

private[elasticsearch] final case class PercentilesAggregationResponse(values: Map[String, Double])
extends AggregationResponse

Expand Down Expand Up @@ -396,6 +412,10 @@ private[elasticsearch] object TermsAggregationBucket extends JsonDecoderOps {
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("missing#") =>
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
case str if str.contains("percentile_ranks#") =>
Some(
field -> PercentileRanksAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]])
)
case str if str.contains("percentiles#") =>
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
case str if str.contains("stats#") =>
Expand Down Expand Up @@ -439,6 +459,8 @@ private[elasticsearch] object TermsAggregationBucket extends JsonDecoderOps {
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("missing#") =>
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
case str if str.contains("percentile_ranks#") =>
(field.split("#")(1), data.asInstanceOf[PercentileRanksAggregationResponse])
case str if str.contains("percentiles#") =>
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
case str if str.contains("stats#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("missing#") =>
MissingAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("percentile_ranks#") =>
PercentileRanksAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("percentiles#") =>
PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("stats#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.PercentileRanksAggregationResult]].
*/
def asPercentileRanksAggregation(name: String): RIO[R, Option[PercentileRanksAggregationResult]] =
aggregationAs[PercentileRanksAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ final case class MinAggregationResult private[elasticsearch] (value: Double) ext

final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult

final case class PercentileRanksAggregationResult private[elasticsearch] (values: Map[String, Double])
extends AggregationResult

final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double])
extends AggregationResult

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

def asPercentileRanksAggregation(name: String): IO[DecodingException, Option[PercentileRanksAggregationResult]] =
aggregationAs[PercentileRanksAggregationResult](name)

def asPercentilesAggregation(name: String): IO[DecodingException, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,32 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
)
)
},
test("percentileRanks") {
val aggregation = percentileRanksAggregation("aggregation", "testField", 5, 6)
val aggregationTs = percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6)
val aggregationTsRaw = percentileRanksAggregation("aggregation", TestSubDocument.intField.raw, 5, 6)
val aggregationWithMissing =
percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6).missing(20.0)

assert(aggregation)(
equalTo(
PercentileRanks(name = "aggregation", field = "testField", values = Chunk(5.0, 6.0), missing = None)
)
) &&
assert(aggregationTs)(
equalTo(PercentileRanks(name = "aggregation", field = "intField", values = Chunk(5.0, 6.0), missing = None))
) &&
assert(aggregationTsRaw)(
equalTo(
PercentileRanks(name = "aggregation", field = "intField.raw", values = Chunk(5.0, 6.0), missing = None)
)
) &&
assert(aggregationWithMissing)(
equalTo(
PercentileRanks(name = "aggregation", field = "intField", values = Chunk(5.0, 6.0), missing = Some(20.0))
)
)
},
test("percentiles") {
val aggregation = percentilesAggregation("aggregation", "testField")
val aggregationTs = percentilesAggregation("aggregation", TestSubDocument.intField)
Expand Down Expand Up @@ -1121,6 +1147,53 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationWithSubAggregation.toJson)(equalTo(expectedWithSubAggregation.toJson))
},
test("percentileRanks") {
val aggregation = percentileRanksAggregation("aggregation", "testField", 5, 6)
val aggregationTs = percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6)
val aggregationWithMissing =
percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6).missing(20.0)

val expected =
"""
|{
| "aggregation": {
| "percentile_ranks": {
| "field": "testField",
| "values": [5, 6]
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "percentile_ranks": {
| "field": "intField",
| "values": [5, 6]
| }
| }
|}
|""".stripMargin

val expectedWithMissing =
"""
|{
| "aggregation": {
| "percentile_ranks": {
| "field": "intField",
| "values": [5, 6],
| "missing": 20.0
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("percentiles") {
val aggregation = percentilesAggregation("aggregation", "testField")
val aggregationTs = percentilesAggregation("aggregation", TestDocument.intField)
Expand Down
1 change: 1 addition & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module.exports = {
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_min',
'overview/aggregations/elastic_aggregation_missing',
'overview/aggregations/elastic_aggregation_percentile_ranks',
'overview/aggregations/elastic_aggregation_percentiles',
'overview/aggregations/elastic_aggregation_stats',
'overview/aggregations/elastic_aggregation_sum',
Expand Down

0 comments on commit 5b0dd30

Please sign in to comment.