Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support last sort value and search after request #147

Merged
merged 3 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.elasticsearch

import zio.{Chunk, ZIO}
import zio.Chunk
import zio.elasticsearch.ElasticAggregation.{multipleAggregations, termsAggregation}
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery._
Expand Down Expand Up @@ -177,6 +177,53 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test(
"search for first result using match all query with multiple terms aggregations and search after parameter"
) {
checkOnce(genTestDocument) { firstDocument =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
reqs = (0 to 20).map { i =>
ElasticRequest.create[TestDocument](
firstSearchIndex,
firstDocument.copy(stringField = Random.alphanumeric.take(5).mkString, intField = i)
)
}
_ <- Executor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = matchAll
aggregation = termsAggregation(
name = "aggregationString",
field = TestDocument.stringField,
multiField = Some("keyword")
).withAgg(termsAggregation("aggregationInt", "intField"))
res <- Executor
.execute(
ElasticRequest
.search(index = firstSearchIndex, query = query, aggregation = aggregation)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
sa <- res.lastSortValue
res2 <- Executor
.execute(
ElasticRequest
.search(index = firstSearchIndex, query = query, aggregation = aggregation)
.searchAfter(sa.get)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
docs <- res2.documentAs[TestDocument]
aggs <- res2.aggregations
} yield assert(docs.length)(equalTo(10)) && assert(aggs)(isNonEmpty)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("search using match all query with multiple terms aggregations with descending sort on one field") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
Expand Down Expand Up @@ -1013,6 +1060,49 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
)
) @@ shrinks(0),
suite("searching for documents using SearchAfter Query")(
test("search for document sorted by ascending age while using search after query") {
checkOnce(genTestDocument) { firstDocument =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
reqs = (0 to 100).map { i =>
ElasticRequest.create[TestDocument](
firstSearchIndex,
firstDocument.copy(stringField = Random.alphanumeric.take(5).mkString, intField = i)
)
}
_ <- Executor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range(TestDocument.intField).gte(10)
res <- Executor
.execute(
ElasticRequest
.search(firstSearchIndex, query)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
sa <- res.lastSortValue
res2 <- Executor
.execute(
ElasticRequest
.search(firstSearchIndex, query)
.searchAfter(sa.get)
.size(10)
.sortBy(
sortBy(TestDocument.intField).order(Asc)
)
)
.documentAs[TestDocument]
} yield assert(res2.map(_.intField))(
equalTo((20 to 29).toList)
)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
) @@ shrinks(0)
),
suite("deleting by query")(
test("successfully delete all matched documents") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object ElasticRequest {
from = None,
highlights = None,
routing = None,
searchAfter = None,
size = None
)

Expand All @@ -92,6 +93,7 @@ object ElasticRequest {
from = None,
highlights = None,
routing = None,
searchAfter = None,
size = None
)

Expand Down Expand Up @@ -335,7 +337,9 @@ object ElasticRequest {
with HasSize[SearchRequest] {
def aggregate(aggregation: ElasticAggregation): SearchAndAggregateRequest

def highlights(value: Highlights): Search
def highlights(value: Highlights): SearchRequest

def searchAfter(value: Json): SearchRequest
}

private[elasticsearch] final case class Search(
Expand All @@ -345,6 +349,7 @@ object ElasticRequest {
from: Option[Int],
highlights: Option[Highlights],
routing: Option[Routing],
searchAfter: Option[Json],
size: Option[Int]
) extends SearchRequest { self =>

Expand All @@ -357,35 +362,41 @@ object ElasticRequest {
from = from,
highlights = highlights,
routing = routing,
searchAfter = None,
size = size
)

def from(value: Int): SearchRequest =
self.copy(from = Some(value))

def highlights(value: Highlights): Search =
def highlights(value: Highlights): SearchRequest =
self.copy(highlights = Some(value))

def routing(value: Routing): SearchRequest =
self.copy(routing = Some(value))

def searchAfter(value: Json): SearchRequest =
self.copy(searchAfter = Some(value))

def size(value: Int): SearchRequest =
self.copy(size = Some(value))

def sortBy(sorts: Sort*): SearchRequest =
self.copy(sortBy = sortBy ++ sorts.toSet)

def toJson: Json = {
val fromJson: Json = self.from.map(f => Obj("from" -> f.toJson)).getOrElse(Obj())
val fromJson: Json = self.from.fold(Obj())(f => Obj("from" -> f.toJson))

val sizeJson: Json = self.size.map(s => Obj("size" -> s.toJson)).getOrElse(Obj())
val sizeJson: Json = self.size.fold(Obj())(s => Obj("size" -> s.toJson))

val highlightsJson: Json = highlights.map(_.toJson).getOrElse(Obj())

val searchAfterJson: Json = searchAfter.fold(Obj())(sa => Obj("search_after" -> sa))

val sortJson: Json =
if (self.sortBy.nonEmpty) Obj("sort" -> Arr(self.sortBy.toList.map(_.paramsToJson): _*)) else Obj()

fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson
fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson merge searchAfterJson
}
}

Expand All @@ -394,7 +405,11 @@ object ElasticRequest {
with HasFrom[SearchAndAggregateRequest]
with HasRouting[SearchAndAggregateRequest]
with HasSize[SearchAndAggregateRequest]
with WithSort[SearchAndAggregateRequest]
with WithSort[SearchAndAggregateRequest] {
def highlights(value: Highlights): SearchAndAggregateRequest

def searchAfter(value: Json): SearchAndAggregateRequest
}

private[elasticsearch] final case class SearchAndAggregate(
index: IndexName,
Expand All @@ -404,6 +419,7 @@ object ElasticRequest {
from: Option[Int],
highlights: Option[Highlights],
routing: Option[Routing],
searchAfter: Option[Json],
size: Option[Int]
) extends SearchAndAggregateRequest { self =>
def from(value: Int): SearchAndAggregateRequest =
Copy link
Collaborator

@dbulaja98 dbulaja98 Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 def highlights(value: Highlights): SearchAndAggregateRequest =
      self.copy(highlights = Some(value))

Please provide this method in sealed trait SearchAndAggregateRequest:
def highlights(value: Highlights): SearchAndAggregateRequest

Expand All @@ -418,20 +434,31 @@ object ElasticRequest {
def size(value: Int): SearchAndAggregateRequest =
self.copy(size = Some(value))

def searchAfter(value: Json): SearchAndAggregateRequest =
self.copy(searchAfter = Some(value))

def sortBy(sorts: Sort*): SearchAndAggregateRequest =
self.copy(sortBy = sortBy ++ sorts.toSet)

def toJson: Json = {
val fromJson: Json = self.from.map(f => Obj("from" -> f.toJson)).getOrElse(Obj())
val fromJson: Json = self.from.fold(Obj())(f => Obj("from" -> f.toJson))

val sizeJson: Json = self.size.map(s => Obj("size" -> s.toJson)).getOrElse(Obj())
val sizeJson: Json = self.size.fold(Obj())(s => Obj("size" -> s.toJson))

val highlightsJson: Json = highlights.map(_.toJson).getOrElse(Obj())

val highlightsJson: Json = self.highlights.map(_.toJson).getOrElse(Obj())
val searchAfterJson: Json = searchAfter.fold(Obj())(sa => Obj("search_after" -> sa))

val sortJson: Json =
if (self.sortBy.nonEmpty) Obj("sort" -> Arr(self.sortBy.toList.map(_.paramsToJson): _*)) else Obj()

fromJson merge sizeJson merge highlightsJson merge sortJson merge self.query.toJson merge aggregation.toJson
fromJson merge
sizeJson merge
highlightsJson merge
sortJson merge
self.query.toJson merge
aggregation.toJson merge
searchAfterJson
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
case HttpOk =>
response.body.fold(
e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")),
value => ZIO.succeed(new SearchResult(itemFromResultsWithHighlights(value.resultsWithHighlights).toList))
value =>
ZIO.succeed(
new SearchResult(itemFromResultsWithHighlights(value.resultsWithHighlights).toList, value.lastSortField)
)
)
case _ =>
ZIO.fail(handleFailuresFromCustomResponse(response))
Expand Down Expand Up @@ -455,7 +458,8 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
value.resultsWithHighlights.map { case (source, highlight) =>
Item(source, highlight)
},
value.aggs
value.aggs,
value.lastSortField
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ package object request {
def routing(value: Routing): R
}

private[elasticsearch] trait HasFrom[R <: HasSize[R]] {
private[elasticsearch] trait HasFrom[R <: HasFrom[R]] {
def from(value: Int): R
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.elasticsearch.result

import zio.elasticsearch.executor.response.AggregationResponse
import zio.json.ast.Json
import zio.prelude.ZValidation
import zio.schema.Schema
import zio.{IO, Task, UIO, ZIO}
Expand Down Expand Up @@ -56,20 +57,24 @@ final class GetResult private[elasticsearch] (private val doc: Option[Item]) ext
.mapError(e => DecodingException(s"Could not parse the document: ${e.message}"))
}

final class SearchResult private[elasticsearch] (private val hits: List[Item]) extends DocumentResult[List] {
final class SearchResult private[elasticsearch] (private val hits: List[Item], private val lastSort: Option[Json])
extends DocumentResult[List] {
def documentAs[A: Schema]: IO[DecodingException, List[A]] =
ZIO.fromEither {
ZValidation.validateAll(hits.map(item => ZValidation.fromEither(item.documentAs))).toEitherWith { errors =>
DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})")
}
}

def items: UIO[List[Item]] = ZIO.succeed(hits)
lazy val items: UIO[List[Item]] = ZIO.succeed(hits)

lazy val lastSortValue: UIO[Option[Json]] = ZIO.succeed(lastSort)
}

final class SearchAndAggregateResult private[elasticsearch] (
private val hits: List[Item],
private val aggs: Map[String, AggregationResponse]
private val aggs: Map[String, AggregationResponse],
private val lastSort: Option[Json]
) extends DocumentResult[List]
with AggregationsResult {
def aggregation(name: String): Task[Option[AggregationResponse]] =
Expand All @@ -86,4 +91,8 @@ final class SearchAndAggregateResult private[elasticsearch] (
)
}
}

lazy val items: UIO[List[Item]] = ZIO.succeed(hits)

lazy val lastSortValue: UIO[Option[Json]] = ZIO.succeed(lastSort)
}
Loading