Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(dsl): Support missing aggregation #269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_missing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
id: elastic_aggregation_missing
title: "Missing Aggregation"
---

The `Missing` aggregation is a field data based single bucket aggregation, that creates a bucket of all documents in the current document set context that are missing a field value.

In order to use the `Missing` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.MissingAggregation
import zio.elasticsearch.ElasticAggregation.missingAggregation
```

You can create a `Missing` aggregation using the `missingAggregation` method this way:
```scala
val aggregation: MissingAggregation = missingAggregation(name = "missingAggregation", field = "stringField")
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Missing` aggregation using the `missingAggregation` method this way:
```scala
// Document.stringField must be string value, because of Missing aggregation
val aggregation: MissingAggregation = missingAggregation(name = "missingAggregation", field = Document.stringField)
```

If you want to add aggregation (on the same level), you can use `withAgg` method:
```scala
val multipleAggregations: MultipleAggregations = missingAggregation(name = "missingAggregation1", field = Document.stringField).withAgg(missingAggregation(name = "missingAggregation2", field = Document.stringField))
```

You can find more information about `Missing` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-missing-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,38 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using missing aggregations") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument)
)
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument)
.refreshTrue
)
aggregation = multipleAggregations.aggregations(
missingAggregation(
name = "aggregationString",
field = TestDocument.stringField.keyword
),
missingAggregation(name = "aggregationString", field = "stringField.keyword")
)
aggsRes <- Executor
.execute(
ElasticRequest
.aggregate(index = firstSearchIndex, aggregation = aggregation)
)
.aggregations
} yield assert(aggsRes)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using multiple terms aggregations") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ object ElasticAggregation {
final def minAggregation(name: String, field: String): MinAggregation =
Min(name = name, field = field, missing = None)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified
* parameters.
*
* @param name
* aggregation name
* @param field
* the type-safe field for which missing aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MissingAggregation]] that represents missing aggregation to be
* performed.
*/
final def missingAggregation(name: String, field: Field[_, String]): MissingAggregation =
Missing(name = name, field = field.toString)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which missing aggregation will be executed
* @return
* an instance of [[zio.elasticsearch.aggregation.MissingAggregation]] that represents missing aggregation to be
* performed.
*/
final def missingAggregation(name: String, field: String): MissingAggregation =
Missing(name = name, field = field)

/**
* Constructs an empty instance of the [[zio.elasticsearch.aggregation.MultipleAggregations]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ private[elasticsearch] final case class Min(name: String, field: String, missing
}
}

sealed trait MissingAggregation extends SingleElasticAggregation with WithAgg

private[elasticsearch] final case class Missing(name: String, field: String) extends MissingAggregation { self =>

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json =
Obj(name -> Obj("missing" -> Obj("field" -> field.toJson)))
}

sealed trait MultipleAggregations extends ElasticAggregation with WithAgg {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object AggregationResponse {
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
MinAggregationResult(value)
case MissingAggregationResponse(value) =>
MissingAggregationResult(value)
case SumAggregationResponse(value) =>
SumAggregationResult(value)
case TermsAggregationResponse(docErrorCount, sumOtherDocCount, buckets) =>
Expand Down Expand Up @@ -79,6 +81,13 @@ private[elasticsearch] object MinAggregationResponse {
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
}

private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int)
extends AggregationResponse

private[elasticsearch] object MissingAggregationResponse {
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
}

private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse

private[elasticsearch] object SumAggregationResponse {
Expand Down Expand Up @@ -127,6 +136,8 @@ private[elasticsearch] object TermsAggregationBucket {
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("min#") =>
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("missing#") =>
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
case str if str.contains("sum#") =>
Some(field -> SumAggregationResponse(value = objFields("value").unsafeAs[Double]))
case str if str.contains("terms#") =>
Expand Down Expand Up @@ -156,6 +167,8 @@ private[elasticsearch] object TermsAggregationBucket {
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
case str if str.contains("min#") =>
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
case str if str.contains("missing#") =>
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
case str if str.contains("sum#") =>
(field.split("#")(1), data.asInstanceOf[SumAggregationResponse])
case str if str.contains("terms#") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("min#") =>
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("missing#") =>
MissingAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("cardinality#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ package object elasticsearch extends IndexNameNewtype with RoutingNewtype {
def asMinAggregation(name: String): RIO[R, Option[MinAggregationResult]] =
aggregationAs[MinAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.MissingAggregationResult]].
*/
def asMissingAggregation(name: String): RIO[R, Option[MissingAggregationResult]] =
aggregationAs[MissingAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ final case class MaxAggregationResult private[elasticsearch] (value: Double) ext

final case class MinAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult

final case class SumAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

final case class TermsAggregationResult private[elasticsearch] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
equalTo(Min(name = "aggregation", field = "intField", missing = Some(20.0)))
)
},
test("missing") {
val aggregation = missingAggregation("aggregation", "testField")
val aggregationTs = missingAggregation("aggregation", TestSubDocument.stringField)
val aggregationTsRaw = missingAggregation("aggregation", TestSubDocument.stringField.raw)

assert(aggregation)(equalTo(Missing(name = "aggregation", field = "testField"))) &&
assert(aggregationTs)(equalTo(Missing(name = "aggregation", field = "stringField"))) &&
assert(aggregationTsRaw)(equalTo(Missing(name = "aggregation", field = "stringField.raw")))
},
test("multiple") {
val aggregation =
multipleAggregations.aggregations(
Expand Down Expand Up @@ -650,6 +659,35 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
},
test("missing") {
val aggregation = missingAggregation("aggregation", "testField")
val aggregationTs = missingAggregation("aggregation", TestDocument.stringField)

val expected =
"""
|{
| "aggregation": {
| "missing": {
| "field": "testField"
| }
| }
|}
|""".stripMargin

val expectedTs =
"""
|{
| "aggregation": {
| "missing": {
| "field": "stringField"
| }
| }
|}
|""".stripMargin

assert(aggregation.toJson)(equalTo(expected.toJson)) &&
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson))
},
test("multiple") {
val aggregation =
multipleAggregations.aggregations(
Expand Down
1 change: 1 addition & 0 deletions website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module.exports = {
'overview/aggregations/elastic_aggregation_cardinality',
'overview/aggregations/elastic_aggregation_max',
'overview/aggregations/elastic_aggregation_min',
'overview/aggregations/elastic_aggregation_missing',
'overview/aggregations/elastic_aggregation_terms',
'overview/aggregations/elastic_aggregation_sum',
],
Expand Down