Skip to content

Commit

Permalink
Support search with ZIO schema (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
markaya authored Jan 12, 2023
1 parent d311861 commit 9a4fff3
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 42 deletions.
107 changes: 89 additions & 18 deletions modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package zio.elasticsearch

import zio.elasticsearch.CreationOutcome.{AlreadyExists, Created}
import zio.elasticsearch.DeletionOutcome.{Deleted, NotFound}
import zio.elasticsearch.ElasticQuery._
import zio.test.Assertion._
import zio.test.TestAspect.nondeterministic
import zio.test.TestAspect._
import zio.test._

object HttpExecutorSpec extends IntegrationSpec {
Expand Down Expand Up @@ -39,21 +40,17 @@ object HttpExecutorSpec extends IntegrationSpec {
),
suite("creating index")(
test("successfully create index") {
checkOnce(genIndexName) { name =>
assertZIO(ElasticRequest.createIndex(name, None).execute)(equalTo(Created))
}
assertZIO(ElasticRequest.createIndex(createIndexTestName, None).execute)(equalTo(Created))
},
test("return 'AlreadyExists' if index already exists") {
checkOnce(genIndexName) { name =>
val result = for {
_ <- ElasticRequest.createIndex(name, None).execute
res <- ElasticRequest.createIndex(name, None).execute
} yield res
val result = for {
_ <- ElasticRequest.createIndex(createIndexTestName, None).execute
res <- ElasticRequest.createIndex(createIndexTestName, None).execute
} yield res

assertZIO(result)(equalTo(AlreadyExists))
}
assertZIO(result)(equalTo(AlreadyExists))
}
),
) @@ after(ElasticRequest.deleteIndex(createIndexTestName).execute.orDie),
suite("creating or updating document")(
test("successfully create document") {
checkOnce(genDocumentId, genCustomer) { (documentId, customer) =>
Expand All @@ -66,14 +63,14 @@ object HttpExecutorSpec extends IntegrationSpec {
}
},
test("successfully update document") {
checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, customer1, customer2) =>
checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, firstCustomer, secondCustomer) =>
val result = for {
_ <- ElasticRequest.create[CustomerDocument](index, documentId, customer1).execute
_ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer2).execute
_ <- ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer).execute
_ <- ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer).execute
doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute
} yield doc

assertZIO(result)(isSome(equalTo(customer2)))
assertZIO(result)(isSome(equalTo(secondCustomer)))
}
}
),
Expand Down Expand Up @@ -151,9 +148,83 @@ object HttpExecutorSpec extends IntegrationSpec {
res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute
} yield res

assertZIO(result.exit)(fails(isSubtype[Exception](assertException("Decoding error: .address(missing)"))))
assertZIO(result.exit)(
fails(isSubtype[Exception](assertException("Could not parse the document: .address(missing)")))
)
}
}
),
suite("searching documents")(
test("search for document using range query") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) {
(firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) =>
val result = for {
_ <- ElasticRequest.deleteByQuery(index, matchAll()).execute
_ <- ElasticRequest.upsert[CustomerDocument](index, firstDocumentId, firstCustomer).execute
_ <-
ElasticRequest.upsert[CustomerDocument](index, secondDocumentId, secondCustomer).refreshTrue.execute
query = range("balance").gte(100)
res <- ElasticRequest.search[CustomerDocument](index, query).execute
} yield res

assertZIO(result)(isNonEmpty)
}
},
test("fail if any of results cannot be decoded") {
checkOnce(genDocumentId, genDocumentId, genEmployee, genCustomer) {
(employeeDocumentId, customerDocumentId, employee, customer) =>
val result = for {
_ <- ElasticRequest.deleteByQuery(index, matchAll()).execute
_ <- ElasticRequest.upsert[CustomerDocument](index, customerDocumentId, customer).execute
_ <- ElasticRequest.upsert[EmployeeDocument](index, employeeDocumentId, employee).refreshTrue.execute
query = range("age").gte(0)
res <- ElasticRequest.search[CustomerDocument](index, query).execute
} yield res

assertZIO(result.exit)(
fails(
isSubtype[Exception](
assertException("Could not parse all documents successfully: .address(missing))")
)
)
)
}
}
) @@ shrinks(0) @@ sequential @@ afterAll(
ElasticRequest.deleteByQuery(index, matchAll()).refreshTrue.execute.orDie
),
suite("deleting by query")(
test("successfully deleted all matched documents") {
checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) {
(firstDocumentId, firstCustomer, secondDocumentId, secondCustomer, thirdDocumentId, thirdCustomer) =>
val result =
for {
_ <- ElasticRequest
.upsert[CustomerDocument](index, firstDocumentId, firstCustomer.copy(balance = 150))
.execute
_ <-
ElasticRequest
.upsert[CustomerDocument](index, secondDocumentId, secondCustomer.copy(balance = 350))
.execute
_ <-
ElasticRequest
.upsert[CustomerDocument](index, thirdDocumentId, thirdCustomer.copy(balance = 400))
.refreshTrue
.execute
deleteQuery = range("balance").gte(300)
_ <- ElasticRequest.deleteByQuery(index, deleteQuery).refreshTrue.execute
res <- ElasticRequest.search[CustomerDocument](index, matchAll()).execute
} yield res

assertZIO(result)(hasSameElements(List(firstCustomer.copy(balance = 150))))
}
},
test("returns Not Found when provided index is missing") {
checkOnce(genIndexName) { missingIndex =>
assertZIO(ElasticRequest.deleteByQuery(missingIndex, matchAll()).execute)(equalTo(NotFound))
}
}
)
).provideShared(elasticsearchLayer) @@ nondeterministic
).provideShared(elasticsearchLayer) @@ nondeterministic @@ sequential @@ prepareElasticsearchIndexForTests

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package zio.elasticsearch

import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio.ZLayer
import zio._
import zio.elasticsearch.ElasticQuery.matchAll
import zio.prelude.Newtype.unsafeWrap
import zio.test.Assertion.{containsString, hasMessage}
import zio.test.CheckVariants.CheckN
import zio.test.{Assertion, Gen, ZIOSpecDefault, checkN}
import zio.test.TestAspect.beforeAll
import zio.test.{Assertion, Gen, TestAspect, ZIOSpecDefault, checkN}

trait IntegrationSpec extends ZIOSpecDefault {
val elasticsearchLayer: ZLayer[Any, Throwable, ElasticExecutor] =
HttpClientZioBackend.layer() >>> ElasticExecutor.local

val index: IndexName = IndexName("users")

val createIndexTestName: IndexName = IndexName("create-index-test-name")

val prepareElasticsearchIndexForTests: TestAspect[Nothing, Any, Throwable, Any] = beforeAll((for {
_ <- ElasticRequest.createIndex(index, None).execute
_ <- ElasticRequest.deleteByQuery(index, matchAll()).refreshTrue.execute
} yield ()).provide(elasticsearchLayer))

def genIndexName: Gen[Any, IndexName] =
Gen.stringBounded(10, 40)(Gen.alphaChar).map(name => unsafeWrap(IndexName)(name.toLowerCase))

Expand All @@ -23,13 +32,15 @@ trait IntegrationSpec extends ZIOSpecDefault {
name <- Gen.stringBounded(5, 10)(Gen.alphaChar)
address <- Gen.stringBounded(5, 10)(Gen.alphaNumericChar)
balance <- Gen.bigDecimal(100, 10000)
} yield CustomerDocument(id = id, name = name, address = address, balance = balance)
age <- Gen.int(18, 75)
} yield CustomerDocument(id = id, name = name, address = address, balance = balance, age = age)

def genEmployee: Gen[Any, EmployeeDocument] = for {
id <- Gen.stringBounded(5, 10)(Gen.alphaNumericChar)
name <- Gen.stringBounded(5, 10)(Gen.alphaChar)
degree <- Gen.stringBounded(5, 10)(Gen.alphaChar)
} yield EmployeeDocument(id = id, name = name, degree = degree)
age <- Gen.int(18, 75)
} yield EmployeeDocument(id = id, name = name, degree = degree, age = age)

def checkOnce: CheckN = checkN(1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package zio.elasticsearch

import zio.schema.{DeriveSchema, Schema}

final case class CustomerDocument(id: String, name: String, address: String, balance: BigDecimal)
final case class CustomerDocument(id: String, name: String, address: String, balance: BigDecimal, age: Int)

final case class EmployeeDocument(id: String, name: String, degree: String)
final case class EmployeeDocument(id: String, name: String, degree: String, age: Int)

object CustomerDocument {
implicit val schema: Schema[CustomerDocument] = DeriveSchema.gen[CustomerDocument]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ private[elasticsearch] final case class ElasticQueryResponse(
@jsonField("_shards")
shards: Shards,
hits: Hits
)
) {

lazy val results: List[Json] = hits.hits.map(_.source)
}

private[elasticsearch] object ElasticQueryResponse {
implicit val decoder: JsonDecoder[ElasticQueryResponse] = DeriveJsonDecoder.gen[ElasticQueryResponse]
Expand All @@ -30,7 +33,7 @@ private[elasticsearch] object Shards {
private[elasticsearch] final case class Hits(
total: Total,
@jsonField("max_score")
maxScore: Double,
maxScore: Option[Double] = None,
hits: List[Item]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package zio.elasticsearch

import zio.elasticsearch.Refresh.WithRefresh
import zio.elasticsearch.Routing.{Routing, WithRouting}
import zio.prelude._
import zio.schema.Schema
import zio.schema.codec.JsonCodec.JsonDecoder
import zio.{RIO, ZIO}

sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self =>
Expand Down Expand Up @@ -41,6 +43,9 @@ object ElasticRequest {
def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] =
DeleteByIdRequest(index, id)

def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] =
DeleteByQueryRequest(index, query)

def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] =
DeleteIndexRequest(name)

Expand All @@ -51,14 +56,25 @@ object ElasticRequest {
GetByIdRequest(index, id).map {
case Some(document) =>
document.decode match {
case Left(e) => Left(DecodingException(s"Decoding error: ${e.message}"))
case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}"))
case Right(doc) => Right(Some(doc))
}
case None => Right(None)
case None =>
Right(None)
}

def search(index: IndexName, query: ElasticQuery[_]): ElasticRequest[ElasticQueryResponse, GetByQuery] =
GetByQueryRequest(index, query)
def search[A](index: IndexName, query: ElasticQuery[_])(implicit
schema: Schema[A]
): ElasticRequest[List[A], GetByQuery] =
GetByQueryRequest(index, query).map { response =>
Validation
.validateAll(response.results.map { json =>
ZValidation.fromEither(JsonDecoder.decode(schema, json.toString))
})
.toEitherWith { errors =>
DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})")
}
}

def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] =
CreateOrUpdateRequest(index, id, Document.from(doc))
Expand Down Expand Up @@ -98,6 +114,13 @@ object ElasticRequest {
routing: Option[Routing] = None
) extends ElasticRequest[DeletionOutcome, DeleteById]

private[elasticsearch] final case class DeleteByQueryRequest(
index: IndexName,
query: ElasticQuery[_],
refresh: Boolean = false,
routing: Option[Routing] = None
) extends ElasticRequest[DeletionOutcome, DeleteByQuery]

private[elasticsearch] final case class DeleteIndexRequest(name: IndexName)
extends ElasticRequest[DeletionOutcome, DeleteIndex]

Expand Down Expand Up @@ -128,15 +151,16 @@ object ElasticRequest {
sealed trait ElasticRequestType

object ElasticRequestType {
trait CreateIndex extends ElasticRequestType
trait Create extends ElasticRequestType
trait CreateWithId extends ElasticRequestType
trait DeleteById extends ElasticRequestType
trait DeleteIndex extends ElasticRequestType
trait Exists extends ElasticRequestType
trait GetById extends ElasticRequestType
trait GetByQuery extends ElasticRequestType
trait Upsert extends ElasticRequestType
trait CreateIndex extends ElasticRequestType
trait Create extends ElasticRequestType
trait CreateWithId extends ElasticRequestType
trait DeleteById extends ElasticRequestType
trait DeleteByQuery extends ElasticRequestType
trait DeleteIndex extends ElasticRequestType
trait Exists extends ElasticRequestType
trait GetById extends ElasticRequestType
trait GetByQuery extends ElasticRequestType
trait Upsert extends ElasticRequestType
}

sealed abstract class CreationOutcome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
case r: CreateIndexRequest => executeCreateIndex(r)
case r: CreateOrUpdateRequest => executeCreateOrUpdate(r)
case r: DeleteByIdRequest => executeDeleteById(r)
case r: DeleteByQueryRequest => executeDeleteByQuery(r)
case r: DeleteIndexRequest => executeDeleteIndex(r)
case r: ExistsRequest => executeExists(r)
case r: GetByIdRequest => executeGetById(r)
Expand Down Expand Up @@ -122,6 +123,23 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
}
}

def executeDeleteByQuery(r: DeleteByQueryRequest): Task[DeletionOutcome] = {
val uri = uri"${config.uri}/${IndexName.unwrap(r.index)}/_delete_by_query".withParam("refresh", r.refresh.toString)

sendRequest(
request
.post(uri)
.contentType(ApplicationJson)
.body(r.query.toJsonBody)
).flatMap { response =>
response.code match {
case HttpOk => ZIO.succeed(Deleted)
case HttpNotFound => ZIO.succeed(NotFound)
case _ => ZIO.fail(createElasticException(response))
}
}
}

private def executeDeleteIndex(r: DeleteIndexRequest): Task[DeletionOutcome] =
sendRequest(request.delete(uri"${config.uri}/${r.name}")).flatMap { response =>
response.code match {
Expand Down Expand Up @@ -162,7 +180,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
private def executeGetByQuery(r: GetByQueryRequest): Task[ElasticQueryResponse] =
sendRequestWithCustomResponse(
request
.post(uri"${config.uri}/${IndexName.unwrap(r.index)}/_search")
.post(uri"${config.uri}/${r.index}/_search")
.response(asJson[ElasticQueryResponse])
.contentType(ApplicationJson)
.body(r.query.toJsonBody)
Expand Down
11 changes: 10 additions & 1 deletion modules/library/src/main/scala/zio/elasticsearch/Refresh.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import zio.elasticsearch.ElasticRequest.{
CreateRequest,
CreateWithIdRequest,
DeleteByIdRequest,
DeleteByQueryRequest,
Map
}
import zio.elasticsearch.ElasticRequestType.{Create, CreateWithId, DeleteById, Upsert}
import zio.elasticsearch.ElasticRequestType.{Create, CreateWithId, DeleteById, DeleteByQuery, Upsert}

object Refresh {

Expand Down Expand Up @@ -40,6 +41,14 @@ object Refresh {
}
}

implicit val deleteByQueryWithRefresh: WithRefresh[DeleteByQuery] = new WithRefresh[DeleteByQuery] {
def withRefresh[A](request: ElasticRequest[A, DeleteByQuery], value: Boolean): ElasticRequest[A, DeleteByQuery] =
request match {
case Map(r, mapper) => Map(withRefresh(r, value), mapper)
case r: DeleteByQueryRequest => r.copy(refresh = value)
}
}

implicit val upsertWithRefresh: WithRefresh[Upsert] = new WithRefresh[Upsert] {
def withRefresh[A](request: ElasticRequest[A, Upsert], value: Boolean): ElasticRequest[A, Upsert] =
request match {
Expand Down
Loading

0 comments on commit 9a4fff3

Please sign in to comment.