Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support last sort value and search after request #147

Merged
merged 3 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,49 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
)
) @@ shrinks(0),
suite("searching for documents using SearchAfter Query")(
test("search for document sorted by ascending age while using search after query") {
checkOnce(genTestDocument) { firstDocument =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
reqs = (0 to 100).map { i =>
ElasticRequest.create[TestDocument](
firstSearchIndex,
firstDocument.copy(stringField = Random.alphanumeric.take(5).mkString, intField = i)
)
}
_ <- Executor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range(TestDocument.intField).gte(10)
res <- Executor
.execute(
ElasticRequest
.search(firstSearchIndex, query)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
sa <- res.lastSortValue
res2 <- Executor
.execute(
ElasticRequest
.search(firstSearchIndex, query)
.searchAfter(sa.get)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
.documentAs[TestDocument]
} yield assert(res2.map(_.intField))(
equalTo((20 to 29).toList)
)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
) @@ shrinks(0)
),
suite("deleting by query")(
test("successfully delete all matched documents") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object ElasticRequest {
from = None,
highlights = None,
routing = None,
searchAfter = None,
size = None
)

Expand All @@ -92,6 +93,7 @@ object ElasticRequest {
from = None,
highlights = None,
routing = None,
searchAfter = None,
size = None
)

Expand Down Expand Up @@ -335,7 +337,9 @@ object ElasticRequest {
with HasSize[SearchRequest] {
def aggregate(aggregation: ElasticAggregation): SearchAndAggregateRequest

def highlights(value: Highlights): Search
def highlights(value: Highlights): SearchRequest

def searchAfter(value: Json): SearchRequest
}

private[elasticsearch] final case class Search(
Expand All @@ -345,6 +349,7 @@ object ElasticRequest {
from: Option[Int],
highlights: Option[Highlights],
routing: Option[Routing],
searchAfter: Option[Json],
size: Option[Int]
) extends SearchRequest { self =>

Expand All @@ -357,35 +362,41 @@ object ElasticRequest {
from = from,
highlights = highlights,
routing = routing,
searchAfter = None,
size = size
)

def from(value: Int): SearchRequest =
self.copy(from = Some(value))

def highlights(value: Highlights): Search =
def highlights(value: Highlights): SearchRequest =
self.copy(highlights = Some(value))

def routing(value: Routing): SearchRequest =
self.copy(routing = Some(value))

def searchAfter(value: Json): SearchRequest =
self.copy(searchAfter = Some(value))

def size(value: Int): SearchRequest =
self.copy(size = Some(value))

def sortBy(sorts: Sort*): SearchRequest =
self.copy(sortBy = sortBy ++ sorts.toSet)

def toJson: Json = {
val fromJson: Json = self.from.map(f => Obj("from" -> f.toJson)).getOrElse(Obj())
val fromJson: Json = self.from.fold(Obj())(f => Obj("from" -> f.toJson))

val sizeJson: Json = self.size.map(s => Obj("size" -> s.toJson)).getOrElse(Obj())
val sizeJson: Json = self.size.fold(Obj())(s => Obj("size" -> s.toJson))

val highlightsJson: Json = highlights.map(_.toJson).getOrElse(Obj())

val searchAfterJson: Json = searchAfter.fold(Obj())(sa => Obj("search_after" -> sa))

val sortJson: Json =
if (self.sortBy.nonEmpty) Obj("sort" -> Arr(self.sortBy.toList.map(_.paramsToJson): _*)) else Obj()

fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson
fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson merge searchAfterJson
}
}

Expand All @@ -404,6 +415,7 @@ object ElasticRequest {
from: Option[Int],
highlights: Option[Highlights],
routing: Option[Routing],
searchAfter: Option[Json],
size: Option[Int]
) extends SearchAndAggregateRequest { self =>
def from(value: Int): SearchAndAggregateRequest =
Copy link
Collaborator

@dbulaja98 dbulaja98 Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 def highlights(value: Highlights): SearchAndAggregateRequest =
      self.copy(highlights = Some(value))

Please provide this method in sealed trait SearchAndAggregateRequest:
def highlights(value: Highlights): SearchAndAggregateRequest

Expand All @@ -422,16 +434,24 @@ object ElasticRequest {
self.copy(sortBy = sortBy ++ sorts.toSet)

def toJson: Json = {
val fromJson: Json = self.from.map(f => Obj("from" -> f.toJson)).getOrElse(Obj())
val fromJson: Json = self.from.fold(Obj())(f => Obj("from" -> f.toJson))

val sizeJson: Json = self.size.map(s => Obj("size" -> s.toJson)).getOrElse(Obj())
val sizeJson: Json = self.size.fold(Obj())(s => Obj("size" -> s.toJson))

val highlightsJson: Json = highlights.map(_.toJson).getOrElse(Obj())

val highlightsJson: Json = self.highlights.map(_.toJson).getOrElse(Obj())
val searchAfterJson: Json = searchAfter.fold(Obj())(sa => Obj("search_after" -> sa))

val sortJson: Json =
if (self.sortBy.nonEmpty) Obj("sort" -> Arr(self.sortBy.toList.map(_.paramsToJson): _*)) else Obj()

fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson merge aggregation.toJson
fromJson merge
sizeJson merge
highlightsJson merge
sortJson merge
self.query.toJson merge
aggregation.toJson merge
searchAfterJson
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
case HttpOk =>
response.body.fold(
e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")),
value => ZIO.succeed(new SearchResult(itemFromResultsWithHighlights(value.resultsWithHighlights).toList))
value =>
ZIO.succeed(
new SearchResult(itemFromResultsWithHighlights(value.resultsWithHighlights).toList, value.lastSortField)
)
)
case _ =>
ZIO.fail(handleFailuresFromCustomResponse(response))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ package object request {
def routing(value: Routing): R
}

private[elasticsearch] trait HasFrom[R <: HasSize[R]] {
private[elasticsearch] trait HasFrom[R <: HasFrom[R]] {
def from(value: Int): R
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.elasticsearch.result

import zio.elasticsearch.executor.response.AggregationResponse
import zio.json.ast.Json
import zio.prelude.ZValidation
import zio.schema.Schema
import zio.{IO, Task, UIO, ZIO}
Expand Down Expand Up @@ -56,15 +57,18 @@ final class GetResult private[elasticsearch] (private val doc: Option[Item]) ext
.mapError(e => DecodingException(s"Could not parse the document: ${e.message}"))
}

final class SearchResult private[elasticsearch] (private val hits: List[Item]) extends DocumentResult[List] {
final class SearchResult private[elasticsearch] (private val hits: List[Item], private val lastSort: Option[Json])
extends DocumentResult[List] {
def documentAs[A: Schema]: IO[DecodingException, List[A]] =
ZIO.fromEither {
ZValidation.validateAll(hits.map(item => ZValidation.fromEither(item.documentAs))).toEitherWith { errors =>
DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})")
}
}

def items: UIO[List[Item]] = ZIO.succeed(hits)
lazy val items: UIO[List[Item]] = ZIO.succeed(hits)

lazy val lastSortValue: UIO[Option[Json]] = ZIO.succeed(lastSort)
}

final class SearchAndAggregateResult private[elasticsearch] (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package zio.elasticsearch

import zio.elasticsearch.ElasticAggregation.termsAggregation
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery.range
import zio.elasticsearch.ElasticRequest.search
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.domain.TestDocument
import zio.elasticsearch.query.sort.Missing.First
import zio.elasticsearch.utils.RichString
import zio.json.ast.Json
import zio.json.ast.Json.{Arr, Str}
import zio.test.Assertion.equalTo
import zio.test._

object ElasticRequestsDSLSpec extends ZIOSpecDefault {

override def spec: Spec[TestEnvironment, Any] =
suite("Elastic Requests JSON encoding")(
test("successfully encode search request to JSON") {
val jsonRequest: Json = search(IndexName("indexName"), Query) match {
case r: ElasticRequest.Search => r.toJson
}
val expected =
"""
|{
| "query" : {
| "range" : {
| "intField" : {
| "gte" : 10
| }
| }
| }
|}
|""".stripMargin

assert(jsonRequest)(equalTo(expected.toJson))
},
test("successfully encode search request to JSON with search after parameter") {
val jsonRequest: Json = search(IndexName("indexName"), Query).searchAfter(Arr(Str("12345"))) match {
case r: ElasticRequest.Search => r.toJson
}
val expected =
"""
|{
| "query" : {
| "range" : {
| "intField" : {
| "gte" : 10
| }
| }
| },
| "search_after" : [
| "12345"
| ]
|}
|""".stripMargin

assert(jsonRequest)(equalTo(expected.toJson))
},
test("successfully encode search request to JSON with size parameter") {
val jsonRequest: Json = search(IndexName("indexName"), Query).size(20) match {
case r: ElasticRequest.Search => r.toJson
}
val expected =
"""
|{
| "query" : {
| "range" : {
| "intField" : {
| "gte" : 10
| }
| }
| },
| "size" : 20
|}
|""".stripMargin

assert(jsonRequest)(equalTo(expected.toJson))
},
test("successfully encode search request to JSON with multiple parameters") {
val jsonRequest = search(IndexName("indexName"), Query)
.size(20)
.sortBy(sortBy(TestDocument.intField).missing(First))
.from(10) match {
case r: ElasticRequest.Search => r.toJson
}
val expected =
"""
|{
| "query" : {
| "range" : {
| "intField" : {
| "gte" : 10
| }
| }
| },
| "size" : 20,
| "from" : 10,
| "sort": [
| {
| "intField": {
| "missing": "_first"
| }
| }
| ]
|}
|""".stripMargin

assert(jsonRequest)(equalTo(expected.toJson))
},
test("successfully encode search request to JSON with all parameters") {
val jsonRequest = search(IndexName("indexName"), Query)
.size(20)
.highlights(highlight(TestDocument.intField))
.sortBy(sortBy(TestDocument.intField).missing(First))
.from(10)
.aggregate(termsAggregation(name = "aggregation", field = "day_of_week")) match {
case r: ElasticRequest.SearchAndAggregate => r.toJson
}
val expected =
"""
|{
| "query" : {
| "range" : {
| "intField" : {
| "gte" : 10
| }
| }
| },
| "size" : 20,
| "from" : 10,
| "sort": [
| {
| "intField": {
| "missing": "_first"
| }
| }
| ],
| "highlight" : {
| "fields" : {
| "intField" : {}
| }
| },
| "aggs": {
| "aggregation" : {
| "terms" : {
| "field" : "day_of_week"
| }
| }
| }
|}
|""".stripMargin

assert(jsonRequest)(equalTo(expected.toJson))
}
)

private val Query = range(TestDocument.intField).gte(10)
}