diff --git a/modules/example/src/main/scala/example/Main.scala b/modules/example/src/main/scala/example/Main.scala index de3a3e7fd..b35e79309 100644 --- a/modules/example/src/main/scala/example/Main.scala +++ b/modules/example/src/main/scala/example/Main.scala @@ -23,7 +23,7 @@ import sttp.client3.SttpBackend import sttp.client3.httpclient.zio.HttpClientZioBackend import zio._ import zio.config.getConfig -import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest} +import zio.elasticsearch.{ElasticConfig, ElasticExecutor} import zio.http.{Server, ServerConfig} import scala.io.Source @@ -46,14 +46,14 @@ object Main extends ZIOAppDefault { val deleteIndex: RIO[ElasticExecutor, Unit] = for { _ <- ZIO.logInfo(s"Deleting index '$Index'...") - _ <- ElasticRequest.deleteIndex(Index).execute + _ <- ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(Index).execute) } yield () val createIndex: RIO[ElasticExecutor, Unit] = for { _ <- ZIO.logInfo(s"Creating index '$Index'...") mapping <- ZIO.fromTry(Using(Source.fromURL(getClass.getResource("/mapping.json")))(_.mkString)) - _ <- ElasticRequest.createIndex(Index, Some(mapping)).execute + _ <- ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(Index, Some(mapping)).execute) } yield () val populate: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = diff --git a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala index f5294357b..e41b9e25e 100644 --- a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala +++ b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala @@ -18,62 +18,49 @@ package example import zio._ import zio.elasticsearch.ElasticQuery.matchAll -import zio.elasticsearch.{ - CreationOutcome, - DeletionOutcome, - DocumentId, - ElasticExecutor, - ElasticQuery, - ElasticRequest, - Routing -} +import zio.elasticsearch.{CreationOutcome, DeletionOutcome, DocumentId, ElasticExecutor, ElasticQuery, Routing} import zio.prelude.Newtype.unsafeWrap final case class RepositoriesElasticsearch(executor: ElasticExecutor) { def findAll(): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) + executor.search[GitHubRepo](Index, matchAll).execute def findById(organization: String, id: String): Task[Option[GitHubRepo]] = for { routing <- routingOf(organization) - req = ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing) - res <- executor.execute(req) + res <- executor.getById[GitHubRepo](Index, DocumentId(id)).routing(routing).execute } yield res def create(repository: GitHubRepo): Task[CreationOutcome] = for { routing <- routingOf(repository.organization) - req = ElasticRequest.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue - res <- executor.execute(req) + res <- executor.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue.execute } yield res def createAll(repositories: List[GitHubRepo]): Task[Unit] = for { routing <- routingOf(organization) reqs = repositories.map { repository => - ElasticRequest.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository) + executor.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository) } - bulkReq = ElasticRequest.bulk(reqs: _*).routing(routing) - _ <- executor.execute(bulkReq) + _ <- executor.bulk(reqs: _*).routing(routing).execute } yield () def upsert(id: String, repository: GitHubRepo): Task[Unit] = for { routing <- routingOf(repository.organization) - req = ElasticRequest.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true) - _ <- executor.execute(req) + _ <- executor.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true).execute } yield () def remove(organization: String, id: String): Task[DeletionOutcome] = for { routing <- routingOf(organization) - req = ElasticRequest.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse - res <- executor.execute(req) + res <- executor.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse.execute } yield res def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, query)) + executor.search[GitHubRepo](Index, query).execute private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] = Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e)) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index f380cdeba..a654368fb 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -16,6 +16,7 @@ package zio.elasticsearch +import zio.ZIO import zio.elasticsearch.ElasticQuery._ import zio.test.Assertion._ import zio.test.TestAspect._ @@ -30,51 +31,60 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully create document") { checkOnce(genCustomer) { customer => for { - docId <- ElasticRequest.create[CustomerDocument](index, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, docId).execute + executor <- ZIO.service[ElasticExecutor] + docId <- executor.create[CustomerDocument](index, customer).execute + res <- executor.getById[CustomerDocument](index, docId).execute } yield assert(res)(isSome(equalTo(customer))) } }, test("successfully create document with ID given") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => - assertZIO(ElasticRequest.create[CustomerDocument](index, documentId, customer).execute)(equalTo(Created)) + assertZIO( + ZIO.serviceWithZIO[ElasticExecutor](_.create[CustomerDocument](index, documentId, customer).execute) + )(equalTo(Created)) } }, test("return 'AlreadyExists' if document with given ID already exists") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, customer1, customer2) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer1).execute - res <- ElasticRequest.create[CustomerDocument](index, documentId, customer2).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[CustomerDocument](index, documentId, customer1).execute + res <- executor.create[CustomerDocument](index, documentId, customer2).execute } yield assert(res)(equalTo(AlreadyExists)) } } ), suite("creating index")( test("successfully create index") { - assertZIO(ElasticRequest.createIndex(createIndexTestName, None).execute)(equalTo(Created)) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(createIndexTestName, None).execute))( + equalTo(Created) + ) }, test("return 'AlreadyExists' if index already exists") { for { - _ <- ElasticRequest.createIndex(createIndexTestName, None).execute - res <- ElasticRequest.createIndex(createIndexTestName, None).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.createIndex(createIndexTestName, None).execute + res <- executor.createIndex(createIndexTestName, None).execute } yield assert(res)(equalTo(AlreadyExists)) } - ) @@ after(ElasticRequest.deleteIndex(createIndexTestName).execute.orDie), + ) @@ after(ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(createIndexTestName).execute.orDie)), suite("creating or updating document")( test("successfully create document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[CustomerDocument](index, documentId, customer).execute + doc <- executor.getById[CustomerDocument](index, documentId).execute } yield assert(doc)(isSome(equalTo(customer))) } }, test("successfully update document") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, firstCustomer, secondCustomer) => for { - _ <- ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer).execute - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.create[CustomerDocument](index, documentId, firstCustomer).execute + _ <- executor.upsert[CustomerDocument](index, documentId, secondCustomer).execute + doc <- executor.getById[CustomerDocument](index, documentId).execute } yield assert(doc)(isSome(equalTo(secondCustomer))) } } @@ -83,14 +93,15 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.deleteById(index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[CustomerDocument](index, documentId, customer).execute + res <- executor.deleteById(index, documentId).execute } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.deleteById(index, documentId).execute)(equalTo(NotFound)) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.deleteById(index, documentId).execute))(equalTo(NotFound)) } } ), @@ -98,14 +109,15 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing index") { checkOnce(genIndexName) { name => for { - _ <- ElasticRequest.createIndex(name, None).execute - res <- ElasticRequest.deleteIndex(name).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.createIndex(name, None).execute + res <- executor.deleteIndex(name).execute } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if index does not exists") { checkOnce(genIndexName) { name => - assertZIO(ElasticRequest.deleteIndex(name).execute)(equalTo(NotFound)) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(name).execute))(equalTo(NotFound)) } } ), @@ -113,14 +125,15 @@ object HttpExecutorSpec extends IntegrationSpec { test("return true if the document exists") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.exists(index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[CustomerDocument](index, documentId, customer).execute + res <- executor.exists(index, documentId).execute } yield assert(res)(isTrue) } }, test("return false if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.exists(index, documentId).execute)(isFalse) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.exists(index, documentId).execute))(isFalse) } } ), @@ -128,21 +141,25 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully return document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[CustomerDocument](index, documentId, customer).execute + res <- executor.getById[CustomerDocument](index, documentId).execute } yield assert(res)(isSome(equalTo(customer))) } }, test("return None if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.getById[CustomerDocument](index, documentId).execute)(isNone) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.getById[CustomerDocument](index, documentId).execute))( + isNone + ) } }, test("fail with throwable if decoding fails") { checkOnce(genDocumentId, genEmployee) { (documentId, employee) => val result = for { - _ <- ElasticRequest.upsert[EmployeeDocument](index, documentId, employee).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.upsert[EmployeeDocument](index, documentId, employee).execute + res <- executor.getById[CustomerDocument](index, documentId).execute } yield res assertZIO(result.exit)( @@ -156,36 +173,38 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.deleteByQuery(firstSearchIndex, matchAll).execute _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + executor.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute _ <- - ElasticRequest + executor .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) .refreshTrue .execute query = range("balance").gte(100) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- executor.search[CustomerDocument](firstSearchIndex, query).execute } yield assert(res)(isNonEmpty) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(firstSearchIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(firstSearchIndex).execute.orDie) ), test("fail if any of results cannot be decoded") { checkOnce(genDocumentId, genDocumentId, genEmployee, genCustomer) { (employeeDocumentId, customerDocumentId, employee, customer) => val result = for { - _ <- ElasticRequest.deleteByQuery(secondSearchIndex, matchAll).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.deleteByQuery(secondSearchIndex, matchAll).execute _ <- - ElasticRequest.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer).execute - _ <- ElasticRequest + executor.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer).execute + _ <- executor .upsert[EmployeeDocument](secondSearchIndex, employeeDocumentId, employee) .refreshTrue .execute query = range("age").gte(0) - res <- ElasticRequest.search[CustomerDocument](secondSearchIndex, query).execute + res <- executor.search[CustomerDocument](secondSearchIndex, query).execute } yield res assertZIO(result.exit)( @@ -197,69 +216,72 @@ object HttpExecutorSpec extends IntegrationSpec { ) } } @@ around( - ElasticRequest.createIndex(secondSearchIndex, None).execute, - ElasticRequest.deleteIndex(secondSearchIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(secondSearchIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(secondSearchIndex).execute.orDie) ), test("search for a document which contains a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.deleteByQuery(firstSearchIndex, matchAll).execute _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + executor.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute _ <- - ElasticRequest + executor .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) .refreshTrue .execute query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- executor.search[CustomerDocument](firstSearchIndex, query).execute } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(firstSearchIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(firstSearchIndex).execute.orDie) ), test("search for a document which starts with a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.deleteByQuery(firstSearchIndex, matchAll).execute _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + executor.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute _ <- - ElasticRequest + executor .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) .refreshTrue .execute query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- executor.search[CustomerDocument](firstSearchIndex, query).execute } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(firstSearchIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(firstSearchIndex).execute.orDie) ), test("search for a document which conforms to a pattern using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.deleteByQuery(firstSearchIndex, matchAll).execute _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + executor.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute _ <- - ElasticRequest + executor .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) .refreshTrue .execute query = wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}") - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- executor.search[CustomerDocument](firstSearchIndex, query).execute } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(firstSearchIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(firstSearchIndex).execute.orDie) ) ) @@ shrinks(0), suite("deleting by query")( @@ -267,7 +289,8 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer, thirdDocumentId, thirdCustomer) => for { - _ <- ElasticRequest + executor <- ZIO.service[ElasticExecutor] + _ <- executor .upsert[CustomerDocument]( deleteByQueryIndex, firstDocumentId, @@ -275,7 +298,7 @@ object HttpExecutorSpec extends IntegrationSpec { ) .execute _ <- - ElasticRequest + executor .upsert[CustomerDocument]( deleteByQueryIndex, secondDocumentId, @@ -283,7 +306,7 @@ object HttpExecutorSpec extends IntegrationSpec { ) .execute _ <- - ElasticRequest + executor .upsert[CustomerDocument]( deleteByQueryIndex, thirdDocumentId, @@ -292,17 +315,19 @@ object HttpExecutorSpec extends IntegrationSpec { .refreshTrue .execute deleteQuery = range("balance").gte(300) - _ <- ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue.execute - res <- ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll).execute + _ <- executor.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue.execute + res <- executor.search[CustomerDocument](deleteByQueryIndex, matchAll).execute } yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150)))) } } @@ around( - ElasticRequest.createIndex(deleteByQueryIndex, None).execute, - ElasticRequest.deleteIndex(deleteByQueryIndex).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(deleteByQueryIndex, None).execute), + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(deleteByQueryIndex).execute.orDie) ), test("returns NotFound when provided index is missing") { checkOnce(genIndexName) { missingIndex => - assertZIO(ElasticRequest.deleteByQuery(missingIndex, matchAll).execute)(equalTo(NotFound)) + assertZIO(ZIO.serviceWithZIO[ElasticExecutor](_.deleteByQuery(missingIndex, matchAll).execute))( + equalTo(NotFound) + ) } } ), @@ -311,24 +336,25 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genDocumentId, genDocumentId, genCustomer) { (firstDocId, secondDocId, thirdDocId, customer) => for { - _ <- ElasticRequest + executor <- ZIO.service[ElasticExecutor] + _ <- executor .create[CustomerDocument](index, firstDocId, customer.copy(id = "randomIdString")) .execute - _ <- ElasticRequest + _ <- executor .create[CustomerDocument](index, secondDocId, customer.copy(id = "randomIdString2")) .refreshTrue .execute - req1 = ElasticRequest.create[CustomerDocument](index, thirdDocId, customer) - req2 = ElasticRequest.create[CustomerDocument](index, customer.copy(id = "randomIdString3")) - req3 = ElasticRequest.upsert[CustomerDocument](index, firstDocId, customer.copy(balance = 3000)) - req4 = ElasticRequest.deleteById(index, secondDocId) - res <- ElasticRequest.bulk(req1, req2, req3, req4).execute + req1 = executor.create[CustomerDocument](index, thirdDocId, customer) + req2 = executor.create[CustomerDocument](index, customer.copy(id = "randomIdString3")) + req3 = executor.upsert[CustomerDocument](index, firstDocId, customer.copy(balance = 3000)) + req4 = executor.deleteById(index, secondDocId) + res <- executor.bulk(req1, req2, req3, req4).execute } yield assert(res)(isUnit) } } ) ) @@ nondeterministic @@ sequential @@ prepareElasticsearchIndexForTests @@ afterAll( - ElasticRequest.deleteIndex(index).execute.orDie + ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(index).execute.orDie) ) ).provideShared( elasticsearchLayer diff --git a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala index f94c89552..867028766 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala @@ -41,8 +41,9 @@ trait IntegrationSpec extends ZIOSpecDefault { val createIndexTestName: IndexName = IndexName("create-index-test-name") val prepareElasticsearchIndexForTests: TestAspect[Nothing, Any, Throwable, Any] = beforeAll((for { - _ <- ElasticRequest.createIndex(index, None).execute - _ <- ElasticRequest.deleteByQuery(index, matchAll).refreshTrue.execute + executor <- ZIO.service[ElasticExecutor] + _ <- executor.createIndex(index, None).execute + _ <- executor.deleteByQuery(index, matchAll).refreshTrue.execute } yield ()).provide(elasticsearchLayer)) def genIndexName: Gen[Any, IndexName] = diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 7ccaffb17..fec75ecba 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -17,11 +17,36 @@ package zio.elasticsearch import sttp.client3.SttpBackend -import zio.stm.TMap -import zio.{Task, ULayer, ZLayer} +import zio.elasticsearch.ElasticRequest.BulkableRequest +import zio.elasticsearch.ElasticRequestType._ +import zio.schema.Schema +import zio.{Task, ZLayer} trait ElasticExecutor { - def execute[A](request: ElasticRequest[A, _]): Task[A] + + def bulk(requests: BulkableRequest*): ElasticRequest[Unit, Bulk] + + def create[A: Schema](index: IndexName, doc: A): ElasticRequest[DocumentId, Create] + + def create[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[CreationOutcome, CreateWithId] + + def createIndex(name: IndexName, definition: Option[String]): ElasticRequest[CreationOutcome, CreateIndex] + + def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] + + def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] + + def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] + + def exists(index: IndexName, id: DocumentId): ElasticRequest[Boolean, Exists] + + def getById[A: Schema](index: IndexName, id: DocumentId): ElasticRequest[Option[A], GetById] + + def search[A](index: IndexName, query: ElasticQuery[_])(implicit + schema: Schema[A] + ): ElasticRequest[List[A], GetByQuery] + + def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] } object ElasticExecutor { @@ -30,7 +55,4 @@ object ElasticExecutor { lazy val local: ZLayer[SttpBackend[Task, Any], Throwable, ElasticExecutor] = ZLayer.succeed(ElasticConfig.Default) >>> live - - lazy val test: ULayer[TestExecutor] = - ZLayer(TMap.empty[IndexName, TMap[DocumentId, Document]].map(TestExecutor).commit) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index abaa01136..ee50a8a44 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -18,18 +18,14 @@ package zio.elasticsearch import zio.elasticsearch.Refresh.WithRefresh import zio.elasticsearch.Routing.{Routing, WithRouting} -import zio.prelude._ -import zio.schema.Schema -import zio.schema.codec.JsonCodec.JsonDecoder -import zio.{RIO, ZIO} +import zio.{Task, ZIO} import scala.annotation.unused import scala.language.implicitConversions sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self => - final def execute: RIO[ElasticExecutor, A] = - ZIO.serviceWithZIO[ElasticExecutor](_.execute(self)) + def execute: Task[A] final def map[B](f: A => Either[DecodingException, B]): ElasticRequest[B, ERT] = ElasticRequest.Map(self, f) @@ -49,58 +45,6 @@ sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self => object ElasticRequest { import ElasticRequestType._ - - def bulk(requests: BulkableRequest*): ElasticRequest[Unit, Bulk] = - BulkRequest.of(requests: _*) - - def create[A: Schema](index: IndexName, doc: A): ElasticRequest[DocumentId, Create] = - CreateRequest(index = index, document = Document.from(doc), refresh = false, routing = None) - - def create[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[CreationOutcome, CreateWithId] = - CreateWithIdRequest(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) - - def createIndex(name: IndexName, definition: Option[String]): ElasticRequest[CreationOutcome, CreateIndex] = - CreateIndexRequest(name, definition) - - def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] = - DeleteByIdRequest(index = index, id = id, refresh = false, routing = None) - - def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] = - DeleteByQueryRequest(index = index, query = query, refresh = false, routing = None) - - def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] = - DeleteIndexRequest(name) - - def exists(index: IndexName, id: DocumentId): ElasticRequest[Boolean, Exists] = - ExistsRequest(index = index, id = id, routing = None) - - def getById[A: Schema](index: IndexName, id: DocumentId): ElasticRequest[Option[A], GetById] = - GetByIdRequest(index = index, id = id, routing = None).map { - case Some(document) => - document.decode match { - case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) - case Right(doc) => Right(Some(doc)) - } - case None => - Right(None) - } - - def search[A](index: IndexName, query: ElasticQuery[_])(implicit - schema: Schema[A] - ): ElasticRequest[List[A], GetByQuery] = - GetByQueryRequest(index = index, query = query, routing = None).map { response => - Validation - .validateAll(response.results.map { json => - ZValidation.fromEither(JsonDecoder.decode(schema, json.toString)) - }) - .toEitherWith { errors => - DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") - } - } - - def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] = - CreateOrUpdateRequest(index, id, Document.from(doc), refresh = false, routing = None) - private[elasticsearch] final case class BulkableRequest private (request: ElasticRequest[_, _]) object BulkableRequest { @@ -115,111 +59,190 @@ object ElasticRequest { requests.map(BulkableRequest(_)) } - private[elasticsearch] final case class BulkRequest( - requests: List[BulkableRequest], - index: Option[IndexName], - refresh: Boolean, - routing: Option[Routing] + private[elasticsearch] abstract class BulkRequest( + val requests: List[BulkableRequest], + val index: Option[IndexName], + val refresh: Boolean, + val routing: Option[Routing] ) extends ElasticRequest[Unit, Bulk] { lazy val body: String = requests.flatMap { r => // We use @unchecked to ignore 'pattern match not exhaustive' error since we guarantee that it will not happen // because these are only Bulkable Requests and other matches will not occur. (r.request: @unchecked) match { - case CreateRequest(index, document, _, maybeRouting) => - List(getActionAndMeta("create", List(("_index", Some(index)), ("routing", maybeRouting))), document.json) - case CreateWithIdRequest(index, id, document, _, maybeRouting) => + case r: CreateRequest => + List(getActionAndMeta("create", List(("_index", Some(r.index)), ("routing", r.routing))), r.document.json) + case r: CreateWithIdRequest => List( - getActionAndMeta("create", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), - document.json + getActionAndMeta("create", List(("_index", Some(r.index)), ("_id", Some(r.id)), ("routing", r.routing))), + r.document.json ) - case CreateOrUpdateRequest(index, id, document, _, maybeRouting) => + case r: CreateOrUpdateRequest => List( - getActionAndMeta("index", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), - document.json + getActionAndMeta("index", List(("_index", Some(r.index)), ("_id", Some(r.id)), ("routing", r.routing))), + r.document.json ) - case DeleteByIdRequest(index, id, _, maybeRouting) => - List(getActionAndMeta("delete", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting)))) + case r: DeleteByIdRequest => + List(getActionAndMeta("delete", List(("_index", Some(r.index)), ("_id", Some(r.id)), ("routing", r.routing)))) } }.mkString(start = "", sep = "\n", end = "\n") + + def execute: Task[Unit] = execute(requests, index, refresh, routing) + + private[elasticsearch] def execute( + requests: List[BulkableRequest], + index: Option[IndexName], + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] + } + + private[elasticsearch] abstract class CreateRequest( + val index: IndexName, + val document: Document, + val refresh: Boolean, + val routing: Option[Routing] + ) extends ElasticRequest[DocumentId, Create] { + def execute: Task[DocumentId] = execute(index, document, refresh, routing) + + private[elasticsearch] def execute( + index: IndexName, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[DocumentId] + } + + private[elasticsearch] abstract class CreateWithIdRequest( + val index: IndexName, + val id: DocumentId, + val document: Document, + val refresh: Boolean, + val routing: Option[Routing] + ) extends ElasticRequest[CreationOutcome, CreateWithId] { + def execute: Task[CreationOutcome] = execute(index, id, document, refresh, routing) + + private[elasticsearch] def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[CreationOutcome] + } + + private[elasticsearch] abstract class CreateIndexRequest( + val index: IndexName, + val definition: Option[String] + ) extends ElasticRequest[CreationOutcome, CreateIndex] { + def execute: Task[CreationOutcome] = execute(index, definition) + + private[elasticsearch] def execute(index: IndexName, definition: Option[String]): Task[CreationOutcome] + } + + private[elasticsearch] abstract class CreateOrUpdateRequest( + val index: IndexName, + val id: DocumentId, + val document: Document, + val refresh: Boolean, + val routing: Option[Routing] + ) extends ElasticRequest[Unit, Upsert] { + def execute: Task[Unit] = execute(index, id, document, refresh, routing) + + private[elasticsearch] def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] } - object BulkRequest { - def of(requests: BulkableRequest*): BulkRequest = - BulkRequest(requests = requests.toList, index = None, refresh = false, routing = None) + private[elasticsearch] abstract class DeleteByIdRequest( + val index: IndexName, + val id: DocumentId, + val refresh: Boolean, + val routing: Option[Routing] + ) extends ElasticRequest[DeletionOutcome, DeleteById] { + def execute: Task[DeletionOutcome] = execute(index, id, refresh, routing) + + private[elasticsearch] def execute( + index: IndexName, + id: DocumentId, + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] } - private[elasticsearch] final case class CreateRequest( - index: IndexName, - document: Document, - refresh: Boolean, - routing: Option[Routing] - ) extends ElasticRequest[DocumentId, Create] - - private[elasticsearch] final case class CreateWithIdRequest( - index: IndexName, - id: DocumentId, - document: Document, - refresh: Boolean, - routing: Option[Routing] - ) extends ElasticRequest[CreationOutcome, CreateWithId] - - private[elasticsearch] final case class CreateIndexRequest( - name: IndexName, - definition: Option[String] - ) extends ElasticRequest[CreationOutcome, CreateIndex] - - private[elasticsearch] final case class CreateOrUpdateRequest( - index: IndexName, - id: DocumentId, - document: Document, - refresh: Boolean, - routing: Option[Routing] - ) extends ElasticRequest[Unit, Upsert] - - private[elasticsearch] final case class DeleteByIdRequest( - index: IndexName, - id: DocumentId, - refresh: Boolean, - routing: Option[Routing] - ) extends ElasticRequest[DeletionOutcome, DeleteById] - - private[elasticsearch] final case class DeleteByQueryRequest( - index: IndexName, - query: ElasticQuery[_], - refresh: Boolean, - routing: Option[Routing] - ) extends ElasticRequest[DeletionOutcome, DeleteByQuery] - - private[elasticsearch] final case class DeleteIndexRequest(name: IndexName) - extends ElasticRequest[DeletionOutcome, DeleteIndex] - - private[elasticsearch] final case class ExistsRequest( - index: IndexName, - id: DocumentId, - routing: Option[Routing] - ) extends ElasticRequest[Boolean, Exists] - - private[elasticsearch] final case class GetByIdRequest( - index: IndexName, - id: DocumentId, - routing: Option[Routing] - ) extends ElasticRequest[Option[Document], GetById] - - private[elasticsearch] final case class GetByQueryRequest( - index: IndexName, - query: ElasticQuery[_], - routing: Option[Routing] - ) extends ElasticRequest[ElasticQueryResponse, GetByQuery] + private[elasticsearch] abstract class DeleteByQueryRequest( + val index: IndexName, + val query: ElasticQuery[_], + val refresh: Boolean, + val routing: Option[Routing] + ) extends ElasticRequest[DeletionOutcome, DeleteByQuery] { + def execute: Task[DeletionOutcome] = execute(index, query, refresh, routing) + + private[elasticsearch] def execute( + index: IndexName, + query: ElasticQuery[_], + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] + } + + private[elasticsearch] abstract class DeleteIndexRequest(val index: IndexName) + extends ElasticRequest[DeletionOutcome, DeleteIndex] { + def execute: Task[DeletionOutcome] = execute(index) + + private[elasticsearch] def execute(index: IndexName): Task[DeletionOutcome] + } + + private[elasticsearch] abstract class ExistsRequest( + val index: IndexName, + val id: DocumentId, + val routing: Option[Routing] + ) extends ElasticRequest[Boolean, Exists] { + def execute: Task[Boolean] = execute(index, id, routing) + + private[elasticsearch] def execute(index: IndexName, id: DocumentId, routing: Option[Routing]): Task[Boolean] + } + + private[elasticsearch] abstract class GetByIdRequest( + val index: IndexName, + val id: DocumentId, + val routing: Option[Routing] + ) extends ElasticRequest[Option[Document], GetById] { + def execute: Task[Option[Document]] = execute(index, id, routing) + + private[elasticsearch] def execute( + index: IndexName, + id: DocumentId, + routing: Option[Routing] + ): Task[Option[Document]] + } + + private[elasticsearch] abstract class GetByQueryRequest( + val index: IndexName, + val query: ElasticQuery[_], + val routing: Option[Routing] + ) extends ElasticRequest[ElasticQueryResponse, GetByQuery] { + def execute: Task[ElasticQueryResponse] = execute(index, query, routing) + private[elasticsearch] def execute( + index: IndexName, + query: ElasticQuery[_], + routing: Option[Routing] + ): Task[ElasticQueryResponse] + } private[elasticsearch] final case class Map[A, B, ERT <: ElasticRequestType]( request: ElasticRequest[A, ERT], mapper: A => Either[DecodingException, B] - ) extends ElasticRequest[B, ERT] + ) extends ElasticRequest[B, ERT] { + def execute: Task[B] = request.execute.flatMap(a => ZIO.fromEither(mapper(a))) + } private def getActionAndMeta(requestType: String, parameters: List[(String, Any)]): String = parameters.collect { case (name, Some(value)) => s""""$name" : "${value.toString}"""" } .mkString(s"""{ "$requestType" : { """, ", ", " } }") - } sealed trait ElasticRequestType @@ -240,12 +263,12 @@ object ElasticRequestType { sealed trait Upsert extends BulkableRequestType } -sealed abstract class CreationOutcome +sealed trait CreationOutcome case object Created extends CreationOutcome case object AlreadyExists extends CreationOutcome -sealed abstract class DeletionOutcome +sealed trait DeletionOutcome case object Deleted extends DeletionOutcome case object NotFound extends DeletionOutcome diff --git a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala index 8be6123f8..31cd000b8 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala @@ -28,6 +28,23 @@ import sttp.model.StatusCode.{ } import zio.ZIO.logDebug import zio.elasticsearch.ElasticRequest._ +import zio.elasticsearch.ElasticRequestType.{ + Bulk, + Create, + CreateIndex, + CreateWithId, + DeleteById, + DeleteByQuery, + DeleteIndex, + Exists, + GetById, + GetByQuery, + Upsert +} +import zio.elasticsearch.Routing.Routing +import zio.prelude.{Validation, ZValidation} +import zio.schema.Schema +import zio.schema.codec.JsonCodec.JsonDecoder import zio.{Task, ZIO} import scala.collection.immutable.{Map => ScalaMap} @@ -37,192 +54,258 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC import HttpElasticExecutor._ - def execute[A](request: ElasticRequest[A, _]): Task[A] = - request match { - case r: BulkRequest => executeBulk(r) - case r: CreateRequest => executeCreate(r) - case r: CreateWithIdRequest => executeCreateWithId(r) - case r: CreateIndexRequest => executeCreateIndex(r) - case r: CreateOrUpdateRequest => executeCreateOrUpdate(r) - case r: DeleteByIdRequest => executeDeleteById(r) - case r: DeleteByQueryRequest => executeDeleteByQuery(r) - case r: DeleteIndexRequest => executeDeleteIndex(r) - case r: ExistsRequest => executeExists(r) - case r: GetByIdRequest => executeGetById(r) - case r: GetByQueryRequest => executeGetByQuery(r) - case map @ Map(_, _) => execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) - } - - private def executeBulk(r: BulkRequest): Task[Unit] = { - val uri = (r.index match { - case Some(index) => uri"${config.uri}/$index/$Bulk" - case None => uri"${config.uri}/$Bulk" - }).withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) - - sendRequest( - request.post(uri).contentType(ApplicationJson).body(r.body) - ).flatMap { response => - response.code match { - case HttpOk => ZIO.unit - case _ => ZIO.fail(createElasticException(response)) - } - } - } - - private def executeCreate(r: CreateRequest): Task[DocumentId] = { - val uri = uri"${config.uri}/${r.index}/$Doc" - .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) - - sendRequestWithCustomResponse[ElasticCreateResponse]( - request - .post(uri) - .contentType(ApplicationJson) - .body(r.document.json) - .response(asJson[ElasticCreateResponse]) - ).flatMap { response => - response.code match { - case HttpCreated => - response.body - .map(res => DocumentId(res.id)) - .fold( - e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), - value => ZIO.succeed(value) - ) - case _ => - ZIO.fail(createElasticExceptionFromCustomResponse(response)) + def bulk(requests: BulkableRequest*): ElasticRequest[Unit, Bulk] = + new BulkRequest(requests = requests.toList, index = None, refresh = false, routing = None) { + def execute( + requests: List[BulkableRequest], + index: Option[IndexName], + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = { + val uri = (index match { + case Some(index) => uri"${config.uri}/$index/$Bulk" + case None => uri"${config.uri}/$Bulk" + }).withParams(getQueryParams(List(("refresh", Some(refresh)), ("routing", routing)))) + + sendRequest( + request.post(uri).contentType(ApplicationJson).body(body) + ).flatMap { response => + response.code match { + case HttpOk => ZIO.unit + case _ => ZIO.fail(createElasticException(response)) + } + } } } - } - - private def executeCreateWithId(r: CreateWithIdRequest): Task[CreationOutcome] = { - val uri = uri"${config.uri}/${r.index}/$Create/${r.id}" - .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) - - sendRequest( - request - .post(uri) - .contentType(ApplicationJson) - .body(r.document.json) - ).flatMap { response => - response.code match { - case HttpCreated => ZIO.succeed(Created) - case HttpConflict => ZIO.succeed(AlreadyExists) - case _ => ZIO.fail(createElasticException(response)) + def create[A: Schema](index: IndexName, doc: A): ElasticRequest[DocumentId, Create] = + new CreateRequest(index = index, document = Document.from(doc), refresh = false, routing = None) { + def execute( + index: IndexName, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[DocumentId] = { + val uri = uri"${config.uri}/$index/$Doc" + .withParams(getQueryParams(List(("refresh", Some(refresh)), ("routing", routing)))) + + sendRequestWithCustomResponse[ElasticCreateResponse]( + request + .post(uri) + .contentType(ApplicationJson) + .body(document.json) + .response(asJson[ElasticCreateResponse]) + ).flatMap { response => + response.code match { + case HttpCreated => + response.body + .map(res => DocumentId(res.id)) + .fold( + e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), + value => ZIO.succeed(value) + ) + case _ => + ZIO.fail(createElasticExceptionFromCustomResponse(response)) + } + } } } - } - - private def executeCreateIndex(createIndex: CreateIndexRequest): Task[CreationOutcome] = - sendRequest( - request - .put(uri"${config.uri}/${createIndex.name}") - .contentType(ApplicationJson) - .body(createIndex.definition.getOrElse("")) - ).flatMap { response => - response.code match { - case HttpOk => ZIO.succeed(Created) - case HttpBadRequest => ZIO.succeed(AlreadyExists) - case _ => ZIO.fail(createElasticException(response)) + + def create[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[CreationOutcome, CreateWithId] = + new CreateWithIdRequest(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[CreationOutcome] = { + val uri = uri"${config.uri}/$index/$Create/$id" + .withParams(getQueryParams(List(("refresh", Some(refresh)), ("routing", routing)))) + + sendRequest( + request + .post(uri) + .contentType(ApplicationJson) + .body(document.json) + ).flatMap { response => + response.code match { + case HttpCreated => ZIO.succeed(Created) + case HttpConflict => ZIO.succeed(AlreadyExists) + case _ => ZIO.fail(createElasticException(response)) + } + } } } - private def executeCreateOrUpdate(r: CreateOrUpdateRequest): Task[Unit] = { - val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}" - .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) + def createIndex(name: IndexName, definition: Option[String]): ElasticRequest[CreationOutcome, CreateIndex] = + new CreateIndexRequest(name, definition) { + def execute(index: IndexName, definition: Option[String]): Task[CreationOutcome] = + sendRequest( + request + .put(uri"${config.uri}/$name") + .contentType(ApplicationJson) + .body(definition.getOrElse("")) + ).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(Created) + case HttpBadRequest => ZIO.succeed(AlreadyExists) + case _ => ZIO.fail(createElasticException(response)) + } + } + } - sendRequest(request.put(uri).contentType(ApplicationJson).body(r.document.json)).flatMap { response => - response.code match { - case HttpOk | HttpCreated => ZIO.unit - case _ => ZIO.fail(createElasticException(response)) + def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] = + new DeleteByIdRequest(index = index, id = id, refresh = false, routing = None) { + def execute( + index: IndexName, + id: DocumentId, + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] = { + val uri = uri"${config.uri}/$index/$Doc/$id" + .withParams(getQueryParams(List(("refresh", Some(refresh)), ("routing", routing)))) + + sendRequest(request.delete(uri)).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(Deleted) + case HttpNotFound => ZIO.succeed(NotFound) + case _ => ZIO.fail(createElasticException(response)) + } + } } } - } - - private def executeDeleteById(r: DeleteByIdRequest): Task[DeletionOutcome] = { - val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}" - .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) - sendRequest(request.delete(uri)).flatMap { response => - response.code match { - case HttpOk => ZIO.succeed(Deleted) - case HttpNotFound => ZIO.succeed(NotFound) - case _ => ZIO.fail(createElasticException(response)) + def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] = + new DeleteByQueryRequest(index = index, query = query, refresh = false, routing = None) { + def execute( + index: IndexName, + query: ElasticQuery[_], + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] = { + val uri = + uri"${config.uri}/$index/$DeleteByQuery".withParams(getQueryParams(List(("refresh", Some(refresh))))) + + sendRequest( + request + .post(uri) + .contentType(ApplicationJson) + .body(query.toJson) + ).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(Deleted) + case HttpNotFound => ZIO.succeed(NotFound) + case _ => ZIO.fail(createElasticException(response)) + } + } } } - } - - def executeDeleteByQuery(r: DeleteByQueryRequest): Task[DeletionOutcome] = { - val uri = - uri"${config.uri}/${r.index}/$DeleteByQuery".withParams(getQueryParams(List(("refresh", Some(r.refresh))))) - - sendRequest( - request - .post(uri) - .contentType(ApplicationJson) - .body(r.query.toJson) - ).flatMap { response => - response.code match { - case HttpOk => ZIO.succeed(Deleted) - case HttpNotFound => ZIO.succeed(NotFound) - case _ => ZIO.fail(createElasticException(response)) - } + + def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] = + new DeleteIndexRequest(name) { + def execute(index: IndexName): Task[DeletionOutcome] = + sendRequest(request.delete(uri"${config.uri}/$name")).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(Deleted) + case HttpNotFound => ZIO.succeed(NotFound) + case _ => ZIO.fail(createElasticException(response)) + } + } } - } - - private def executeDeleteIndex(r: DeleteIndexRequest): Task[DeletionOutcome] = - sendRequest(request.delete(uri"${config.uri}/${r.name}")).flatMap { response => - response.code match { - case HttpOk => ZIO.succeed(Deleted) - case HttpNotFound => ZIO.succeed(NotFound) - case _ => ZIO.fail(createElasticException(response)) + + def exists(index: IndexName, id: DocumentId): ElasticRequest[Boolean, Exists] = + new ExistsRequest(index = index, id = id, routing = None) { + def execute(index: IndexName, id: DocumentId, routing: Option[Routing]): Task[Boolean] = { + val uri = uri"${config.uri}/$index/$Doc/$id".withParams(getQueryParams(List(("routing", routing)))) + + sendRequest(request.head(uri)).flatMap { response => + response.code match { + case HttpOk => ZIO.succeed(true) + case HttpNotFound => ZIO.succeed(false) + case _ => ZIO.fail(createElasticException(response)) + } + } } } - private def executeExists(r: ExistsRequest): Task[Boolean] = { - val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}".withParams(getQueryParams(List(("routing", r.routing)))) - - sendRequest(request.head(uri)).flatMap { response => - response.code match { - case HttpOk => ZIO.succeed(true) - case HttpNotFound => ZIO.succeed(false) - case _ => ZIO.fail(createElasticException(response)) + def getById[A: Schema](index: IndexName, id: DocumentId): ElasticRequest[Option[A], GetById] = + new GetByIdRequest(index = index, id = id, routing = None) { + def execute(index: IndexName, id: DocumentId, routing: Option[Routing]): Task[Option[Document]] = { + val uri = uri"${config.uri}/$index/$Doc/$id".withParams(getQueryParams(List(("routing", routing)))) + + sendRequestWithCustomResponse[ElasticGetResponse]( + request + .get(uri) + .response(asJson[ElasticGetResponse]) + ).flatMap { response => + response.code match { + case HttpOk => ZIO.attempt(response.body.toOption.map(d => Document.from(d.source))) + case HttpNotFound => ZIO.succeed(None) + case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) + } + } } + }.map { + case Some(document) => + document.decode match { + case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) + case Right(doc) => Right(Some(doc)) + } + case None => + Right(None) } - } - - private def executeGetById(r: GetByIdRequest): Task[Option[Document]] = { - val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}".withParams(getQueryParams(List(("routing", r.routing)))) - - sendRequestWithCustomResponse[ElasticGetResponse]( - request - .get(uri) - .response(asJson[ElasticGetResponse]) - ).flatMap { response => - response.code match { - case HttpOk => ZIO.attempt(response.body.toOption.map(d => Document.from(d.source))) - case HttpNotFound => ZIO.succeed(None) - case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) - } + + def search[A](index: IndexName, query: ElasticQuery[_])(implicit + schema: Schema[A] + ): ElasticRequest[List[A], GetByQuery] = + new GetByQueryRequest(index = index, query = query, routing = None) { + def execute(index: IndexName, query: ElasticQuery[_], routing: Option[Routing]): Task[ElasticQueryResponse] = + sendRequestWithCustomResponse( + request + .post(uri"${config.uri}/$index/$Search") + .response(asJson[ElasticQueryResponse]) + .contentType(ApplicationJson) + .body(query.toJson) + ).flatMap { response => + response.code match { + case HttpOk => + response.body.fold( + e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), + value => ZIO.succeed(value) + ) + case _ => + ZIO.fail(createElasticExceptionFromCustomResponse(response)) + } + } + }.map { response => + Validation + .validateAll(response.results.map { json => + ZValidation.fromEither(JsonDecoder.decode(schema, json.toString)) + }) + .toEitherWith { errors => + DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") + } } - } - - private def executeGetByQuery(r: GetByQueryRequest): Task[ElasticQueryResponse] = - sendRequestWithCustomResponse( - request - .post(uri"${config.uri}/${r.index}/$Search") - .response(asJson[ElasticQueryResponse]) - .contentType(ApplicationJson) - .body(r.query.toJson) - ).flatMap { response => - response.code match { - case HttpOk => - response.body.fold( - e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), - value => ZIO.succeed(value) - ) - case _ => - ZIO.fail(createElasticExceptionFromCustomResponse(response)) + + def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] = + new CreateOrUpdateRequest(index, id, Document.from(doc), refresh = false, routing = None) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = { + val uri = uri"${config.uri}/$index/$Doc/$id" + .withParams(getQueryParams(List(("refresh", Some(refresh)), ("routing", routing)))) + + sendRequest(request.put(uri).contentType(ApplicationJson).body(document.json)).flatMap { response => + response.code match { + case HttpOk | HttpCreated => ZIO.unit + case _ => ZIO.fail(createElasticException(response)) + } + } } } @@ -258,7 +341,6 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC private def getQueryParams(parameters: List[(String, Any)]): ScalaMap[String, String] = parameters.collect { case (name, Some(value)) => (name, value.toString) }.toMap - } private[elasticsearch] object HttpElasticExecutor { diff --git a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala b/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala index 095947511..a35a8e8bb 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala @@ -16,8 +16,10 @@ package zio.elasticsearch +import zio.Task import zio.elasticsearch.ElasticRequest._ import zio.elasticsearch.ElasticRequestType._ +import zio.elasticsearch.Routing.Routing object Refresh { @@ -30,47 +32,103 @@ object Refresh { def withRefresh[A](request: ElasticRequest[A, Bulk], value: Boolean): ElasticRequest[A, Bulk] = request match { case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: BulkRequest => r.copy(refresh = value) + case r: BulkRequest => + new BulkRequest(r.requests, r.index, value, r.routing) { + def execute( + requests: List[BulkableRequest], + index: Option[IndexName], + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = + r.execute(requests, index, refresh, routing) + } } } implicit val createWithRefresh: WithRefresh[Create] = new WithRefresh[Create] { def withRefresh[A](request: ElasticRequest[A, Create], value: Boolean): ElasticRequest[A, Create] = request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateRequest => r.copy(refresh = value) + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: CreateRequest => + new CreateRequest(r.index, r.document, value, r.routing) { + def execute( + index: IndexName, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[DocumentId] = + r.execute(index, document, refresh, routing) + } } } implicit val createWithIdWithRefresh: WithRefresh[CreateWithId] = new WithRefresh[CreateWithId] { def withRefresh[A](request: ElasticRequest[A, CreateWithId], value: Boolean): ElasticRequest[A, CreateWithId] = request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateWithIdRequest => r.copy(refresh = value) + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: CreateWithIdRequest => + new CreateWithIdRequest(r.index, r.id, r.document, value, r.routing) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[CreationOutcome] = + r.execute(index, id, document, refresh, routing) + } } } implicit val deleteByIdWithRefresh: WithRefresh[DeleteById] = new WithRefresh[DeleteById] { def withRefresh[A](request: ElasticRequest[A, DeleteById], value: Boolean): ElasticRequest[A, DeleteById] = request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: DeleteByIdRequest => r.copy(refresh = value) + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: DeleteByIdRequest => + new DeleteByIdRequest(r.index, r.id, value, r.routing) { + def execute( + index: IndexName, + id: DocumentId, + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] = + r.execute(index, id, refresh, routing) + } } } implicit val deleteByQueryWithRefresh: WithRefresh[DeleteByQuery] = new WithRefresh[DeleteByQuery] { def withRefresh[A](request: ElasticRequest[A, DeleteByQuery], value: Boolean): ElasticRequest[A, DeleteByQuery] = request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: DeleteByQueryRequest => r.copy(refresh = value) + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: DeleteByQueryRequest => + new DeleteByQueryRequest(r.index, r.query, value, r.routing) { + def execute( + index: IndexName, + query: ElasticQuery[_], + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] = + r.execute(index, query, refresh, routing) + } } } implicit val upsertWithRefresh: WithRefresh[Upsert] = new WithRefresh[Upsert] { def withRefresh[A](request: ElasticRequest[A, Upsert], value: Boolean): ElasticRequest[A, Upsert] = request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateOrUpdateRequest => r.copy(refresh = value) + case Map(r, mapper) => Map(withRefresh(r, value), mapper) + case r: CreateOrUpdateRequest => + new CreateOrUpdateRequest(r.index, r.id, r.document, value, r.routing) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = + r.execute(index, id, document, refresh, routing) + } } } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/Routing.scala b/modules/library/src/main/scala/zio/elasticsearch/Routing.scala index e90a4a1ed..dccff0e3b 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Routing.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Routing.scala @@ -16,6 +16,7 @@ package zio.elasticsearch +import zio.Task import zio.elasticsearch.ElasticRequest._ import zio.elasticsearch.ElasticRequestType.{Bulk, Create, CreateWithId, DeleteById, Exists, GetById, Upsert} import zio.prelude.Assertion.isEmptyString @@ -35,55 +36,110 @@ object Routing extends Newtype[String] { def withRouting[A](request: ElasticRequest[A, Bulk], routing: Routing): ElasticRequest[A, Bulk] = request match { case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: BulkRequest => r.copy(routing = Some(routing)) + case r: BulkRequest => + new BulkRequest(r.requests, r.index, r.refresh, Some(routing)) { + def execute( + requests: List[BulkableRequest], + index: Option[IndexName], + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = + r.execute(requests, index, refresh, routing) + } } } implicit val createWithRouting: WithRouting[Create] = new WithRouting[Create] { def withRouting[A](request: ElasticRequest[A, Create], routing: Routing): ElasticRequest[A, Create] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: CreateRequest => + new CreateRequest(r.index, r.document, r.refresh, Some(routing)) { + def execute( + index: IndexName, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[DocumentId] = + r.execute(index, document, refresh, routing) + } } } implicit val createWithIdWithRouting: WithRouting[CreateWithId] = new WithRouting[CreateWithId] { def withRouting[A](request: ElasticRequest[A, CreateWithId], routing: Routing): ElasticRequest[A, CreateWithId] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateWithIdRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: CreateWithIdRequest => + new CreateWithIdRequest(r.index, r.id, r.document, r.refresh, Some(routing)) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[CreationOutcome] = + r.execute(index, id, document, refresh, routing) + } } } implicit val deleteByIdWithRouting: WithRouting[DeleteById] = new WithRouting[DeleteById] { def withRouting[A](request: ElasticRequest[A, DeleteById], routing: Routing): ElasticRequest[A, DeleteById] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: DeleteByIdRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: DeleteByIdRequest => + new DeleteByIdRequest(r.index, r.id, r.refresh, Some(routing)) { + def execute( + index: IndexName, + id: DocumentId, + refresh: Boolean, + routing: Option[Routing] + ): Task[DeletionOutcome] = + r.execute(index, id, refresh, routing) + } } } implicit val existsWithRouting: WithRouting[Exists] = new WithRouting[Exists] { def withRouting[A](request: ElasticRequest[A, Exists], routing: Routing): ElasticRequest[A, Exists] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: ExistsRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: ExistsRequest => + new ExistsRequest(r.index, r.id, Some(routing)) { + def execute(index: IndexName, id: DocumentId, routing: Option[Routing]): Task[Boolean] = + r.execute(index, id, routing) + } } } implicit val getByIdWithRouting: WithRouting[GetById] = new WithRouting[GetById] { def withRouting[A](request: ElasticRequest[A, GetById], routing: Routing): ElasticRequest[A, GetById] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: GetByIdRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: GetByIdRequest => + new GetByIdRequest(r.index, r.id, Some(routing)) { + def execute(index: IndexName, id: DocumentId, routing: Option[Routing]): Task[Option[Document]] = + r.execute(index, id, routing) + } } } implicit val upsertWithRouting: WithRouting[Upsert] = new WithRouting[Upsert] { def withRouting[A](request: ElasticRequest[A, Upsert], routing: Routing): ElasticRequest[A, Upsert] = request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateOrUpdateRequest => r.copy(routing = Some(routing)) + case Map(r, mapper) => Map(withRouting(r, routing), mapper) + case r: CreateOrUpdateRequest => + new CreateOrUpdateRequest(r.index, r.id, r.document, r.refresh, Some(routing)) { + def execute( + index: IndexName, + id: DocumentId, + document: Document, + refresh: Boolean, + routing: Option[Routing] + ): Task[Unit] = + r.execute(index, id, document, refresh, routing) + } } } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala deleted file mode 100644 index 0edc825ba..000000000 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright 2022 LambdaWorks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.elasticsearch - -import zio.Random.nextUUID -import zio.elasticsearch.ElasticRequest._ -import zio.json.ast.Json -import zio.stm.{STM, TMap, ZSTM} -import zio.{Task, ZIO} - -private[elasticsearch] final case class TestExecutor private (data: TMap[IndexName, TMap[DocumentId, Document]]) - extends ElasticExecutor { - self => - - def execute[A](request: ElasticRequest[A, _]): Task[A] = - request match { - case BulkRequest(requests, _, _, _) => - fakeBulk(requests) - case CreateRequest(index, document, _, _) => - fakeCreate(index, document) - case CreateWithIdRequest(index, id, document, _, _) => - fakeCreateWithId(index, id, document) - case CreateIndexRequest(name, _) => - fakeCreateIndex(name) - case CreateOrUpdateRequest(index, id, document, _, _) => - fakeCreateOrUpdate(index, id, document) - case DeleteByIdRequest(index, id, _, _) => - fakeDeleteById(index, id) - case DeleteByQueryRequest(index, _, _, _) => - fakeDeleteByQuery(index) - case DeleteIndexRequest(name) => - fakeDeleteIndex(name) - case ExistsRequest(index, id, _) => - fakeExists(index, id) - case GetByIdRequest(index, id, _) => - fakeGetById(index, id) - case GetByQueryRequest(index, _, _) => - fakeGetByQuery(index) - case map @ Map(_, _) => - execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) - } - - private def fakeBulk(requests: List[BulkableRequest]): Task[Unit] = - ZIO.attempt { - requests.map(_.request).map { r => - execute(r) - } - }.unit - - private def fakeCreate(index: IndexName, document: Document): Task[DocumentId] = - for { - uuid <- nextUUID - documents <- getDocumentsFromIndex(index).commit - documentId = DocumentId(uuid.toString) - _ <- documents.put(documentId, document).commit - } yield documentId - - private def fakeCreateWithId(index: IndexName, documentId: DocumentId, document: Document): Task[CreationOutcome] = - (for { - documents <- getDocumentsFromIndex(index) - alreadyExists <- documents.contains(documentId) - _ <- documents.putIfAbsent(documentId, document) - } yield if (alreadyExists) AlreadyExists else Created).commit - - private def fakeCreateIndex(index: IndexName): Task[CreationOutcome] = - (for { - alreadyExists <- self.data.contains(index) - emptyDocuments <- TMap.empty[DocumentId, Document] - _ <- self.data.putIfAbsent(index, emptyDocuments) - } yield if (alreadyExists) AlreadyExists else Created).commit - - private def fakeCreateOrUpdate(index: IndexName, documentId: DocumentId, document: Document): Task[Unit] = - (for { - documents <- getDocumentsFromIndex(index) - _ <- documents.put(documentId, document) - } yield ()).commit - - private def fakeDeleteById(index: IndexName, documentId: DocumentId): Task[DeletionOutcome] = - (for { - documents <- getDocumentsFromIndex(index) - exists <- documents.contains(documentId) - _ <- documents.delete(documentId) - } yield if (exists) Deleted else NotFound).commit - - private def fakeDeleteByQuery(index: IndexName): Task[DeletionOutcome] = - (for { - exists <- self.data.contains(index) - } yield if (exists) Deleted else NotFound).commit - // until we have a way of using query to delete we can either delete all or delete none documents - - private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = - (for { - exists <- self.data.contains(index) - _ <- self.data.delete(index) - } yield if (exists) Deleted else NotFound).commit - - private def fakeExists(index: IndexName, documentId: DocumentId): Task[Boolean] = - (for { - documents <- getDocumentsFromIndex(index) - exists <- documents.contains(documentId) - } yield exists).commit - - private def fakeGetById(index: IndexName, documentId: DocumentId): Task[Option[Document]] = - (for { - documents <- getDocumentsFromIndex(index) - maybeDocument <- documents.get(documentId) - } yield maybeDocument).commit - - private def fakeGetByQuery(index: IndexName): Task[ElasticQueryResponse] = { - def createElasticQueryResponse( - index: IndexName, - documents: TMap[DocumentId, Document] - ): ZSTM[Any, Nothing, ElasticQueryResponse] = { - val shards = Shards(total = 1, successful = 1, skipped = 0, failed = 0) - - for { - items <- - documents.toList.map( - _.map { case (id, document) => - Item( - index = index.toString, - `type` = "type", - id = id.toString, - score = 1, - source = Json.Str(document.json) - ) - } - ) - hitsSize <- documents.size - hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = Some(1), hits = items) - } yield ElasticQueryResponse(took = 1, timedOut = false, shards = shards, hits = hits) - } - - (for { - documents <- getDocumentsFromIndex(index) - response <- createElasticQueryResponse(index, documents) - } yield response).commit - } - - private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = - for { - maybeDocuments <- self.data.get(index) - documents <- maybeDocuments.fold[STM[ElasticException, TMap[DocumentId, Document]]]( - STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) - )(STM.succeed(_)) - } yield documents -} diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 1dbf694ca..7f9cffafb 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -66,7 +66,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue.execute)( + assertZIO( + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor](executor => + executor.bulk(executor.create(index, repo)).refreshTrue.execute + ) + )( isUnit ) }, @@ -89,11 +93,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.create[GitHubRepo](index = index, doc = repo) + .routing(Routing("routing")) + .refreshTrue + .execute + ) )(equalTo(DocumentId("V4x8q4UB3agN0z75fv5r"))) }, test("creating request with given ID") { @@ -106,11 +111,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + .execute + ) )(equalTo(Created)) }, test("creating index request") { @@ -120,7 +126,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.createIndex(name = index, definition = None).execute)( + assertZIO( + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor](_.createIndex(name = index, definition = None).execute) + )( equalTo(Created) ) }, @@ -134,11 +142,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + .execute + ) )(isUnit) }, test("deleting by ID request") { @@ -151,11 +160,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + .refreshTrue + .execute + ) )(equalTo(Deleted)) }, test("deleting by query request") { @@ -168,7 +178,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest.deleteByQuery(index = index, query = matchAll).refreshTrue.execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.deleteByQuery(index = index, query = matchAll).refreshTrue.execute + ) )( equalTo(Deleted) ) @@ -182,7 +194,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.deleteIndex(name = index).execute)( + assertZIO(addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor](_.deleteIndex(name = index).execute))( equalTo(Deleted) ) }, @@ -196,10 +208,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .exists(index = index, id = DocumentId("example-id")) - .routing(Routing("routing")) - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.exists(index = index, id = DocumentId("example-id")) + .routing(Routing("routing")) + .execute + ) )(isTrue) }, test("getting by ID request") { @@ -227,10 +240,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .execute + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + .execute + ) )(isSome(equalTo(repo))) }, test("getting by query request") { @@ -280,7 +294,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.search[GitHubRepo](index = index, query = matchAll).execute)( + assertZIO( + addStubMapping *> ZIO.serviceWithZIO[ElasticExecutor]( + _.search[GitHubRepo](index = index, query = matchAll).execute + ) + )( equalTo(List(repo)) ) } diff --git a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala index 7f177dc1d..887382e9c 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala @@ -16,7 +16,7 @@ package zio.elasticsearch -import zio.Scope +import sttp.client3.httpclient.zio.HttpClientZioBackend import zio.elasticsearch.ElasticQuery._ import zio.elasticsearch.ElasticRequest.BulkRequest import zio.elasticsearch.utils._ @@ -25,6 +25,7 @@ import zio.prelude.Validation import zio.schema.{DeriveSchema, Schema} import zio.test.Assertion.equalTo import zio.test._ +import zio.{Scope, ZIO} object QueryDSLSpec extends ZIOSpecDefault { @@ -1058,37 +1059,41 @@ object QueryDSLSpec extends ZIOSpecDefault { assert(query3.toJson)(equalTo(expected23.toJson)) }, test("properly encode Bulk request body") { - val bulkQuery = IndexName.make("users").map { index => - val user = - UserDocument(id = "WeeMwR5d5", name = "Name", address = "Address", balance = 1000, age = 24) - val req1 = - ElasticRequest - .create[UserDocument](index, DocumentId("ETux1srpww2ObCx"), user.copy(age = 39)) - .routing(unsafeWrap(Routing)(user.id)) - val req2 = ElasticRequest.create[UserDocument](index, user).routing(unsafeWrap(Routing)(user.id)) - val req3 = - ElasticRequest - .upsert[UserDocument](index, DocumentId("yMyEG8iFL5qx"), user.copy(balance = 3000)) - .routing(unsafeWrap(Routing)(user.id)) - val req4 = - ElasticRequest.deleteById(index, DocumentId("1VNzFt2XUFZfXZheDc")).routing(unsafeWrap(Routing)(user.id)) - ElasticRequest.bulk(req1, req2, req3, req4) match { - case r: BulkRequest => Some(r.body) - case _ => None + ZIO + .serviceWith[ElasticExecutor] { executor => + val bulkQuery = IndexName.make("users").map { index => + val user = + UserDocument(id = "WeeMwR5d5", name = "Name", address = "Address", balance = 1000, age = 24) + val req1 = + executor + .create[UserDocument](index, DocumentId("ETux1srpww2ObCx"), user.copy(age = 39)) + .routing(unsafeWrap(Routing)(user.id)) + val req2 = executor.create[UserDocument](index, user).routing(unsafeWrap(Routing)(user.id)) + val req3 = + executor + .upsert[UserDocument](index, DocumentId("yMyEG8iFL5qx"), user.copy(balance = 3000)) + .routing(unsafeWrap(Routing)(user.id)) + val req4 = + executor.deleteById(index, DocumentId("1VNzFt2XUFZfXZheDc")).routing(unsafeWrap(Routing)(user.id)) + executor.bulk(req1, req2, req3, req4) match { + case r: BulkRequest => Some(r.body) + case _ => None + } + } + + val expectedBody = + """|{ "create" : { "_index" : "users", "_id" : "ETux1srpww2ObCx", "routing" : "WeeMwR5d5" } } + |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":39} + |{ "create" : { "_index" : "users", "routing" : "WeeMwR5d5" } } + |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":24} + |{ "index" : { "_index" : "users", "_id" : "yMyEG8iFL5qx", "routing" : "WeeMwR5d5" } } + |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":3000.0,"age":24} + |{ "delete" : { "_index" : "users", "_id" : "1VNzFt2XUFZfXZheDc", "routing" : "WeeMwR5d5" } } + |""".stripMargin + + assert(bulkQuery)(equalTo(Validation.succeed(Some(expectedBody)))) } - } - - val expectedBody = - """|{ "create" : { "_index" : "users", "_id" : "ETux1srpww2ObCx", "routing" : "WeeMwR5d5" } } - |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":39} - |{ "create" : { "_index" : "users", "routing" : "WeeMwR5d5" } } - |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":24} - |{ "index" : { "_index" : "users", "_id" : "yMyEG8iFL5qx", "routing" : "WeeMwR5d5" } } - |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":3000.0,"age":24} - |{ "delete" : { "_index" : "users", "_id" : "1VNzFt2XUFZfXZheDc", "routing" : "WeeMwR5d5" } } - |""".stripMargin - - assert(bulkQuery)(equalTo(Validation.succeed(Some(expectedBody)))) + .provide(ElasticExecutor.local, HttpClientZioBackend.layer()) } ) )