From 9a4fff3625edb252a250e8f1a871909e23bf3f62 Mon Sep 17 00:00:00 2001 From: Marko Ristic <44512213+markaya@users.noreply.github.com> Date: Thu, 12 Jan 2023 12:10:34 +0100 Subject: [PATCH] Support search with ZIO schema (#32) --- .../zio/elasticsearch/HttpExecutorSpec.scala | 107 +++++++++++++++--- .../zio/elasticsearch/IntegrationSpec.scala | 19 +++- .../zio/elasticsearch/UserDocument.scala | 4 +- .../elasticsearch/ElasticQueryResponse.scala | 7 +- .../zio/elasticsearch/ElasticRequest.scala | 50 +++++--- .../elasticsearch/HttpElasticExecutor.scala | 20 +++- .../scala/zio/elasticsearch/Refresh.scala | 11 +- .../zio/elasticsearch/TestExecutor.scala | 10 +- 8 files changed, 186 insertions(+), 42 deletions(-) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index f3b527bac..59cddb18e 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -2,8 +2,9 @@ package zio.elasticsearch import zio.elasticsearch.CreationOutcome.{AlreadyExists, Created} import zio.elasticsearch.DeletionOutcome.{Deleted, NotFound} +import zio.elasticsearch.ElasticQuery._ import zio.test.Assertion._ -import zio.test.TestAspect.nondeterministic +import zio.test.TestAspect._ import zio.test._ object HttpExecutorSpec extends IntegrationSpec { @@ -39,21 +40,17 @@ object HttpExecutorSpec extends IntegrationSpec { ), suite("creating index")( test("successfully create index") { - checkOnce(genIndexName) { name => - assertZIO(ElasticRequest.createIndex(name, None).execute)(equalTo(Created)) - } + assertZIO(ElasticRequest.createIndex(createIndexTestName, None).execute)(equalTo(Created)) }, test("return 'AlreadyExists' if index already exists") { - checkOnce(genIndexName) { name => - val result = for { - _ <- ElasticRequest.createIndex(name, None).execute - res <- ElasticRequest.createIndex(name, None).execute - } yield res + val result = for { + _ <- ElasticRequest.createIndex(createIndexTestName, None).execute + res <- ElasticRequest.createIndex(createIndexTestName, None).execute + } yield res - assertZIO(result)(equalTo(AlreadyExists)) - } + assertZIO(result)(equalTo(AlreadyExists)) } - ), + ) @@ after(ElasticRequest.deleteIndex(createIndexTestName).execute.orDie), suite("creating or updating document")( test("successfully create document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => @@ -66,14 +63,14 @@ object HttpExecutorSpec extends IntegrationSpec { } }, test("successfully update document") { - checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, customer1, customer2) => + checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, firstCustomer, secondCustomer) => val result = for { - _ <- ElasticRequest.create[CustomerDocument](index, documentId, customer1).execute - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer2).execute + _ <- ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer).execute + _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer).execute doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute } yield doc - assertZIO(result)(isSome(equalTo(customer2))) + assertZIO(result)(isSome(equalTo(secondCustomer))) } } ), @@ -151,9 +148,83 @@ object HttpExecutorSpec extends IntegrationSpec { res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute } yield res - assertZIO(result.exit)(fails(isSubtype[Exception](assertException("Decoding error: .address(missing)")))) + assertZIO(result.exit)( + fails(isSubtype[Exception](assertException("Could not parse the document: .address(missing)"))) + ) + } + } + ), + suite("searching documents")( + test("search for document using range query") { + checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { + (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => + val result = for { + _ <- ElasticRequest.deleteByQuery(index, matchAll()).execute + _ <- ElasticRequest.upsert[CustomerDocument](index, firstDocumentId, firstCustomer).execute + _ <- + ElasticRequest.upsert[CustomerDocument](index, secondDocumentId, secondCustomer).refreshTrue.execute + query = range("balance").gte(100) + res <- ElasticRequest.search[CustomerDocument](index, query).execute + } yield res + + assertZIO(result)(isNonEmpty) + } + }, + test("fail if any of results cannot be decoded") { + checkOnce(genDocumentId, genDocumentId, genEmployee, genCustomer) { + (employeeDocumentId, customerDocumentId, employee, customer) => + val result = for { + _ <- ElasticRequest.deleteByQuery(index, matchAll()).execute + _ <- ElasticRequest.upsert[CustomerDocument](index, customerDocumentId, customer).execute + _ <- ElasticRequest.upsert[EmployeeDocument](index, employeeDocumentId, employee).refreshTrue.execute + query = range("age").gte(0) + res <- ElasticRequest.search[CustomerDocument](index, query).execute + } yield res + + assertZIO(result.exit)( + fails( + isSubtype[Exception]( + assertException("Could not parse all documents successfully: .address(missing))") + ) + ) + ) + } + } + ) @@ shrinks(0) @@ sequential @@ afterAll( + ElasticRequest.deleteByQuery(index, matchAll()).refreshTrue.execute.orDie + ), + suite("deleting by query")( + test("successfully deleted all matched documents") { + checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) { + (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer, thirdDocumentId, thirdCustomer) => + val result = + for { + _ <- ElasticRequest + .upsert[CustomerDocument](index, firstDocumentId, firstCustomer.copy(balance = 150)) + .execute + _ <- + ElasticRequest + .upsert[CustomerDocument](index, secondDocumentId, secondCustomer.copy(balance = 350)) + .execute + _ <- + ElasticRequest + .upsert[CustomerDocument](index, thirdDocumentId, thirdCustomer.copy(balance = 400)) + .refreshTrue + .execute + deleteQuery = range("balance").gte(300) + _ <- ElasticRequest.deleteByQuery(index, deleteQuery).refreshTrue.execute + res <- ElasticRequest.search[CustomerDocument](index, matchAll()).execute + } yield res + + assertZIO(result)(hasSameElements(List(firstCustomer.copy(balance = 150)))) + } + }, + test("returns Not Found when provided index is missing") { + checkOnce(genIndexName) { missingIndex => + assertZIO(ElasticRequest.deleteByQuery(missingIndex, matchAll()).execute)(equalTo(NotFound)) } } ) - ).provideShared(elasticsearchLayer) @@ nondeterministic + ).provideShared(elasticsearchLayer) @@ nondeterministic @@ sequential @@ prepareElasticsearchIndexForTests + } diff --git a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala index a727529ab..4ed1dae1f 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala @@ -1,11 +1,13 @@ package zio.elasticsearch import sttp.client3.httpclient.zio.HttpClientZioBackend -import zio.ZLayer +import zio._ +import zio.elasticsearch.ElasticQuery.matchAll import zio.prelude.Newtype.unsafeWrap import zio.test.Assertion.{containsString, hasMessage} import zio.test.CheckVariants.CheckN -import zio.test.{Assertion, Gen, ZIOSpecDefault, checkN} +import zio.test.TestAspect.beforeAll +import zio.test.{Assertion, Gen, TestAspect, ZIOSpecDefault, checkN} trait IntegrationSpec extends ZIOSpecDefault { val elasticsearchLayer: ZLayer[Any, Throwable, ElasticExecutor] = @@ -13,6 +15,13 @@ trait IntegrationSpec extends ZIOSpecDefault { val index: IndexName = IndexName("users") + val createIndexTestName: IndexName = IndexName("create-index-test-name") + + val prepareElasticsearchIndexForTests: TestAspect[Nothing, Any, Throwable, Any] = beforeAll((for { + _ <- ElasticRequest.createIndex(index, None).execute + _ <- ElasticRequest.deleteByQuery(index, matchAll()).refreshTrue.execute + } yield ()).provide(elasticsearchLayer)) + def genIndexName: Gen[Any, IndexName] = Gen.stringBounded(10, 40)(Gen.alphaChar).map(name => unsafeWrap(IndexName)(name.toLowerCase)) @@ -23,13 +32,15 @@ trait IntegrationSpec extends ZIOSpecDefault { name <- Gen.stringBounded(5, 10)(Gen.alphaChar) address <- Gen.stringBounded(5, 10)(Gen.alphaNumericChar) balance <- Gen.bigDecimal(100, 10000) - } yield CustomerDocument(id = id, name = name, address = address, balance = balance) + age <- Gen.int(18, 75) + } yield CustomerDocument(id = id, name = name, address = address, balance = balance, age = age) def genEmployee: Gen[Any, EmployeeDocument] = for { id <- Gen.stringBounded(5, 10)(Gen.alphaNumericChar) name <- Gen.stringBounded(5, 10)(Gen.alphaChar) degree <- Gen.stringBounded(5, 10)(Gen.alphaChar) - } yield EmployeeDocument(id = id, name = name, degree = degree) + age <- Gen.int(18, 75) + } yield EmployeeDocument(id = id, name = name, degree = degree, age = age) def checkOnce: CheckN = checkN(1) diff --git a/modules/library/src/it/scala/zio/elasticsearch/UserDocument.scala b/modules/library/src/it/scala/zio/elasticsearch/UserDocument.scala index 3dbd69478..c62e3322c 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/UserDocument.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/UserDocument.scala @@ -2,9 +2,9 @@ package zio.elasticsearch import zio.schema.{DeriveSchema, Schema} -final case class CustomerDocument(id: String, name: String, address: String, balance: BigDecimal) +final case class CustomerDocument(id: String, name: String, address: String, balance: BigDecimal, age: Int) -final case class EmployeeDocument(id: String, name: String, degree: String) +final case class EmployeeDocument(id: String, name: String, degree: String, age: Int) object CustomerDocument { implicit val schema: Schema[CustomerDocument] = DeriveSchema.gen[CustomerDocument] diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala index 90f039347..df78525bd 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala @@ -10,7 +10,10 @@ private[elasticsearch] final case class ElasticQueryResponse( @jsonField("_shards") shards: Shards, hits: Hits -) +) { + + lazy val results: List[Json] = hits.hits.map(_.source) +} private[elasticsearch] object ElasticQueryResponse { implicit val decoder: JsonDecoder[ElasticQueryResponse] = DeriveJsonDecoder.gen[ElasticQueryResponse] @@ -30,7 +33,7 @@ private[elasticsearch] object Shards { private[elasticsearch] final case class Hits( total: Total, @jsonField("max_score") - maxScore: Double, + maxScore: Option[Double] = None, hits: List[Item] ) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index 3237a7d1b..8b478c9a8 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -2,7 +2,9 @@ package zio.elasticsearch import zio.elasticsearch.Refresh.WithRefresh import zio.elasticsearch.Routing.{Routing, WithRouting} +import zio.prelude._ import zio.schema.Schema +import zio.schema.codec.JsonCodec.JsonDecoder import zio.{RIO, ZIO} sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self => @@ -41,6 +43,9 @@ object ElasticRequest { def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] = DeleteByIdRequest(index, id) + def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] = + DeleteByQueryRequest(index, query) + def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] = DeleteIndexRequest(name) @@ -51,14 +56,25 @@ object ElasticRequest { GetByIdRequest(index, id).map { case Some(document) => document.decode match { - case Left(e) => Left(DecodingException(s"Decoding error: ${e.message}")) + case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) case Right(doc) => Right(Some(doc)) } - case None => Right(None) + case None => + Right(None) } - def search(index: IndexName, query: ElasticQuery[_]): ElasticRequest[ElasticQueryResponse, GetByQuery] = - GetByQueryRequest(index, query) + def search[A](index: IndexName, query: ElasticQuery[_])(implicit + schema: Schema[A] + ): ElasticRequest[List[A], GetByQuery] = + GetByQueryRequest(index, query).map { response => + Validation + .validateAll(response.results.map { json => + ZValidation.fromEither(JsonDecoder.decode(schema, json.toString)) + }) + .toEitherWith { errors => + DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") + } + } def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] = CreateOrUpdateRequest(index, id, Document.from(doc)) @@ -98,6 +114,13 @@ object ElasticRequest { routing: Option[Routing] = None ) extends ElasticRequest[DeletionOutcome, DeleteById] + private[elasticsearch] final case class DeleteByQueryRequest( + index: IndexName, + query: ElasticQuery[_], + refresh: Boolean = false, + routing: Option[Routing] = None + ) extends ElasticRequest[DeletionOutcome, DeleteByQuery] + private[elasticsearch] final case class DeleteIndexRequest(name: IndexName) extends ElasticRequest[DeletionOutcome, DeleteIndex] @@ -128,15 +151,16 @@ object ElasticRequest { sealed trait ElasticRequestType object ElasticRequestType { - trait CreateIndex extends ElasticRequestType - trait Create extends ElasticRequestType - trait CreateWithId extends ElasticRequestType - trait DeleteById extends ElasticRequestType - trait DeleteIndex extends ElasticRequestType - trait Exists extends ElasticRequestType - trait GetById extends ElasticRequestType - trait GetByQuery extends ElasticRequestType - trait Upsert extends ElasticRequestType + trait CreateIndex extends ElasticRequestType + trait Create extends ElasticRequestType + trait CreateWithId extends ElasticRequestType + trait DeleteById extends ElasticRequestType + trait DeleteByQuery extends ElasticRequestType + trait DeleteIndex extends ElasticRequestType + trait Exists extends ElasticRequestType + trait GetById extends ElasticRequestType + trait GetByQuery extends ElasticRequestType + trait Upsert extends ElasticRequestType } sealed abstract class CreationOutcome diff --git a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala index 71b084d48..328a0801b 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala @@ -28,6 +28,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC case r: CreateIndexRequest => executeCreateIndex(r) case r: CreateOrUpdateRequest => executeCreateOrUpdate(r) case r: DeleteByIdRequest => executeDeleteById(r) + case r: DeleteByQueryRequest => executeDeleteByQuery(r) case r: DeleteIndexRequest => executeDeleteIndex(r) case r: ExistsRequest => executeExists(r) case r: GetByIdRequest => executeGetById(r) @@ -122,6 +123,23 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } + def executeDeleteByQuery(r: DeleteByQueryRequest): Task[DeletionOutcome] = { + val uri = uri"${config.uri}/${IndexName.unwrap(r.index)}/_delete_by_query".withParam("refresh", r.refresh.toString) + + sendRequest( + request + .post(uri) + .contentType(ApplicationJson) + .body(r.query.toJsonBody) + ).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(Deleted) + case HttpNotFound => ZIO.succeed(NotFound) + case _ => ZIO.fail(createElasticException(response)) + } + } + } + private def executeDeleteIndex(r: DeleteIndexRequest): Task[DeletionOutcome] = sendRequest(request.delete(uri"${config.uri}/${r.name}")).flatMap { response => response.code match { @@ -162,7 +180,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC private def executeGetByQuery(r: GetByQueryRequest): Task[ElasticQueryResponse] = sendRequestWithCustomResponse( request - .post(uri"${config.uri}/${IndexName.unwrap(r.index)}/_search") + .post(uri"${config.uri}/${r.index}/_search") .response(asJson[ElasticQueryResponse]) .contentType(ApplicationJson) .body(r.query.toJsonBody) diff --git a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala b/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala index 9888e6a81..dd99924cc 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala @@ -5,9 +5,10 @@ import zio.elasticsearch.ElasticRequest.{ CreateRequest, CreateWithIdRequest, DeleteByIdRequest, + DeleteByQueryRequest, Map } -import zio.elasticsearch.ElasticRequestType.{Create, CreateWithId, DeleteById, Upsert} +import zio.elasticsearch.ElasticRequestType.{Create, CreateWithId, DeleteById, DeleteByQuery, Upsert} object Refresh { @@ -40,6 +41,14 @@ object Refresh { } } + implicit val deleteByQueryWithRefresh: WithRefresh[DeleteByQuery] = new WithRefresh[DeleteByQuery] { + def withRefresh[A](request: ElasticRequest[A, DeleteByQuery], value: Boolean): ElasticRequest[A, DeleteByQuery] = + request match { + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: DeleteByQueryRequest => r.copy(refresh = value) + } + } + implicit val upsertWithRefresh: WithRefresh[Upsert] = new WithRefresh[Upsert] { def withRefresh[A](request: ElasticRequest[A, Upsert], value: Boolean): ElasticRequest[A, Upsert] = request match { diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala index b4d08ce72..1fc2e1960 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala @@ -24,6 +24,8 @@ private[elasticsearch] final case class TestExecutor private (data: TMap[IndexNa fakeCreateOrUpdate(index, id, document) case DeleteByIdRequest(index, id, _, _) => fakeDeleteById(index, id) + case DeleteByQueryRequest(index, _, _, _) => + fakeDeleteByQuery(index) case DeleteIndexRequest(name) => fakeDeleteIndex(name) case ExistsRequest(index, id, _) => @@ -71,6 +73,12 @@ private[elasticsearch] final case class TestExecutor private (data: TMap[IndexNa _ <- documents.delete(documentId) } yield if (exists) Deleted else NotFound).commit + private def fakeDeleteByQuery(index: IndexName): Task[DeletionOutcome] = + (for { + exists <- self.data.contains(index) + } yield if (exists) Deleted else NotFound).commit + // until we have a way of using query to delete we can either delete all or delete none documents + private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = (for { exists <- self.data.contains(index) @@ -110,7 +118,7 @@ private[elasticsearch] final case class TestExecutor private (data: TMap[IndexNa } ) hitsSize <- documents.size - hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = 1, hits = items) + hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = Some(1), hits = items) } yield ElasticQueryResponse(took = 1, timedOut = false, shards = shards, hits = hits) }