Skip to content

Commit

Permalink
Support sum aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Mitep committed Jun 24, 2023
1 parent c34c8f6 commit 5b58265
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 8 deletions.
35 changes: 35 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_sum.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
id: elastic_aggregation_sum
title: "Sum Aggregation"
---

The `Sum` aggregation is a single-value metrics aggregation that keeps track and returns the sum value among the numeric values extracted from the aggregated documents.

In order to use the `Sum` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.SumAggregation
import zio.elasticsearch.ElasticAggregation.sumAggregation
```

You can create a `Sum` aggregation using the `sumAggregation` method this way:
```scala
val aggregation: SumAggregation = sumAggregation(name = "sumAggregation", field = "intField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Sum` aggregation using the `sumAggregation` method this way:
```scala
// Document.intField must be number value, because of Sum aggregation
val aggregation: SumAggregation = sumAggregation(name = "sumAggregation", field = Document.intField)
```

If you want to change the `missing`, you can use `missing` method:
```scala
val aggregationWithMissing: SumAggregation = sumAggregation(name = "sumAggregation", field = Document.intField).missing(10.0)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = sumAggregation(name = "sumAggregation1", field = Document.intField).withAgg(sumAggregation(name = "sumAggregation2", field = Document.doubleField))
```

You can find more information about `Sum` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-metrics-sum-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,35 @@ object ElasticAggregation {
final def multipleAggregations: MultipleAggregations =
Multiple(aggregations = Chunk.empty)


/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which sum aggregation will be executed
* @tparam A
* expected number type
* @return
* an instance of [[zio.elasticsearch.aggregation.SumAggregation]] that represents avg aggregation to be performed.
*/
final def sumAggregation[A: Numeric](name: String, field: Field[_, A]): SumAggregation =
Sum(name = name, field = field.toString, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.SumAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which sum aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.SumAggregation]] that represents avg aggregation to be performed.
*/
final def sumAggregation(name: String, field: String): SumAggregation =
Sum(name = name, field = field, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.TermsAggregation]] using the specified
* parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,23 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
aggregations.map(_.toJson).reduce(_ merge _)
}

sealed trait SumAggregation extends SingleElasticAggregation with HasMissing[SumAggregation] with WithAgg

private[elasticsearch] final case class Sum(name: String, field: String, missing: Option[Double])
extends SumAggregation { self =>
def missing(value: Double): SumAggregation =
self.copy(missing = Some(value))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))

Obj(name -> Obj("sum" -> (Obj("field" -> field.toJson) merge missingJson)))
}
}

sealed trait TermsAggregation
extends SingleElasticAggregation
with HasOrder[TermsAggregation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object AggregationResponse {
CardinalityAggregationResult(value)
case MaxAggregationResponse(value) =>
MaxAggregationResult(value)
case SumAggregationResponse(value) =>
SumAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
TermsAggregationResult(
docErrorCount = docErrorCount,
Expand Down Expand Up @@ -69,6 +71,12 @@ private[elasticsearch] object MaxAggregationResponse {
implicit val decoder: JsonDecoder[MaxAggregationResponse] = DeriveJsonDecoder.gen[MaxAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
implicit val decoder: JsonDecoder[SumAggregationResponse] = DeriveJsonDecoder.gen[SumAggregationResponse]
}

private[elasticsearch] final case class TermsAggregationResponse(
@jsonField("doc_count_error_upper_bound")
docErrorCount: Int,
Expand Down Expand Up @@ -106,6 +114,8 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> CardinalityAggregationResponse(value = objFields("value").unsafeAs[Int]))
case str if str.contains("max#") =>
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Some(
field -> TermsAggregationResponse(
Expand All @@ -129,6 +139,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[CardinalityAggregationResponse])
case str if str.contains("max#") =>
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("sum#") =>
(field.split("#")(1), data.asInstanceOf[SumAggregationResponse])
case str if str.contains("terms#") =>
(field.split("#")(1), data.asInstanceOf[TermsAggregationResponse])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
AvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("max#") =>
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("terms#") =>
Expand Down
21 changes: 13 additions & 8 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@

package zio

import zio.elasticsearch.result.{
AggregationResult,
CardinalityAggregationResult,
DocumentResult,
MaxAggregationResult,
ResultWithAggregation,
TermsAggregationResult
}
import zio.elasticsearch.result.{AggregationResult, CardinalityAggregationResult, DocumentResult, MaxAggregationResult, ResultWithAggregation, SumAggregationResult, TermsAggregationResult}
import zio.prelude.Newtype
import zio.schema.Schema

Expand Down Expand Up @@ -91,6 +84,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.SumAggregationResult]].
*/
def asSumAggregation(name: String): RIO[R, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ final case class CardinalityAggregationResult private[elasticsearch] (value: Int

final case class MaxAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class SumAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
docErrorCount: Int,
sumOtherDocCount: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
def asMaxAggregation(name: String): IO[DecodingException, Option[MaxAggregationResult]] =
aggregationAs[MaxAggregationResult](name)

def asSumAggregation(name: String): IO[DecodingException, Option[SumAggregationResult]] =
aggregationAs[SumAggregationResult](name)

def asTermsAggregation(name: String): IO[DecodingException, Option[TermsAggregationResult]] =
aggregationAs[TermsAggregationResult](name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,49 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
)
)
},
test("sum") {
val aggregation = sumAggregation("aggregation", "testField")
val aggregationTs = sumAggregation("aggregation", TestDocument.intField)
val aggregationWithMissing = sumAggregation("aggregation", TestDocument.intField).missing(20.0)

val expected =
"""
|{
| "aggregation": {
| "sum": {
| "field": "testField"
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField"
| }
| }
|}
|""".stripMargin

val expectedWithMissing =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField",
| "missing": 20.0
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("terms") {
val aggregation = termsAggregation("aggregation", "testField")
val aggregationTs = termsAggregation("aggregation", TestSubDocument.stringField)
Expand Down Expand Up @@ -669,6 +712,49 @@ object ElasticAggregationSpec extends ZIOSpecDefault {

assert(aggregation.toJson)(equalTo(expected.toJson))
},
test("sum") {
val aggregation = sumAggregation("aggregation", "testField")
val aggregationTs = sumAggregation("aggregation", TestDocument.intField)
val aggregationWithMissing = sumAggregation("aggregation", TestDocument.intField).missing(20.0)

val expected =
"""
|{
| "aggregation": {
| "sum": {
| "field": "testField"
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField"
| }
| }
|}
|""".stripMargin

val expectedWithMissing =
"""
|{
| "aggregation": {
| "sum": {
| "field": "intField",
| "missing": 20.0
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("terms") {
val aggregation = termsAggregation("aggregation", "testField")
val aggregationTs = termsAggregation("aggregation", TestDocument.stringField)
Expand Down
1 change: 1 addition & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module.exports = {
'overview/aggregations/elastic_aggregation_cardinality',
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_terms',
'overview/aggregations/elastic_aggregation_sum',
],
},
{
Expand Down

0 comments on commit 5b58265

Please sign in to comment.