From e21e5b611ac331a8efd598eb44cb492f0000a0c4 Mon Sep 17 00:00:00 2001 From: mvelimir Date: Wed, 22 Feb 2023 15:26:56 +0100 Subject: [PATCH] Expose execute through Elasticsearch trait --- .../example/src/main/scala/example/Main.scala | 17 +- .../example/RepositoriesElasticsearch.scala | 20 +- .../zio/elasticsearch/HttpExecutorSpec.scala | 242 ++++++++++-------- .../zio/elasticsearch/IntegrationSpec.scala | 8 +- .../zio/elasticsearch/ElasticExecutor.scala | 2 +- .../zio/elasticsearch/Elasticsearch.scala | 35 +++ .../HttpElasticExecutorSpec.scala | 78 +++--- .../zio/elasticsearch/WireMockSpec.scala | 6 +- 8 files changed, 239 insertions(+), 169 deletions(-) create mode 100644 modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala diff --git a/modules/example/src/main/scala/example/Main.scala b/modules/example/src/main/scala/example/Main.scala index de3a3e7fd..32de83ea2 100644 --- a/modules/example/src/main/scala/example/Main.scala +++ b/modules/example/src/main/scala/example/Main.scala @@ -23,7 +23,7 @@ import sttp.client3.SttpBackend import sttp.client3.httpclient.zio.HttpClientZioBackend import zio._ import zio.config.getConfig -import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest} +import zio.elasticsearch.{ElasticConfig, ElasticExecutor, ElasticRequest, Elasticsearch} import zio.http.{Server, ServerConfig} import scala.io.Source @@ -38,25 +38,26 @@ object Main extends ZIOAppDefault { AppConfig.live, elasticConfigLive, ElasticExecutor.live, + Elasticsearch.layer, HttpClientZioBackend.layer() ) } - private[this] def prepare: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = { - val deleteIndex: RIO[ElasticExecutor, Unit] = + private[this] def prepare: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] = { + val deleteIndex: RIO[Elasticsearch, Unit] = for { _ <- ZIO.logInfo(s"Deleting index '$Index'...") - _ <- ElasticRequest.deleteIndex(Index).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteIndex(Index)) } yield () - val createIndex: RIO[ElasticExecutor, Unit] = + val createIndex: RIO[Elasticsearch, Unit] = for { _ <- ZIO.logInfo(s"Creating index '$Index'...") mapping <- ZIO.fromTry(Using(Source.fromURL(getClass.getResource("/mapping.json")))(_.mkString)) - _ <- ElasticRequest.createIndex(Index, Some(mapping)).execute + _ <- Elasticsearch.execute(ElasticRequest.createIndex(Index, Some(mapping))) } yield () - val populate: RIO[SttpBackend[Task, Any] with ElasticExecutor, Unit] = + val populate: RIO[SttpBackend[Task, Any] with Elasticsearch, Unit] = (for { repositories <- RepoFetcher.fetchAllByOrganization(organization) _ <- ZIO.logInfo("Adding GitHub repositories...") @@ -66,7 +67,7 @@ object Main extends ZIOAppDefault { deleteIndex *> createIndex *> populate } - private[this] def runServer: RIO[HttpConfig with ElasticExecutor, ExitCode] = { + private[this] def runServer: RIO[HttpConfig with Elasticsearch, ExitCode] = { val serverConfigLive = ZLayer.fromFunction((http: HttpConfig) => ServerConfig.default.port(http.port)) (for { diff --git a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala index f5294357b..c385aedf8 100644 --- a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala +++ b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala @@ -22,30 +22,30 @@ import zio.elasticsearch.{ CreationOutcome, DeletionOutcome, DocumentId, - ElasticExecutor, ElasticQuery, ElasticRequest, + Elasticsearch, Routing } import zio.prelude.Newtype.unsafeWrap -final case class RepositoriesElasticsearch(executor: ElasticExecutor) { +final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { def findAll(): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) + elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) def findById(organization: String, id: String): Task[Option[GitHubRepo]] = for { routing <- routingOf(organization) req = ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing) - res <- executor.execute(req) + res <- elasticsearch.execute(req) } yield res def create(repository: GitHubRepo): Task[CreationOutcome] = for { routing <- routingOf(repository.organization) req = ElasticRequest.create(Index, DocumentId(repository.id), repository).routing(routing).refreshTrue - res <- executor.execute(req) + res <- elasticsearch.execute(req) } yield res def createAll(repositories: List[GitHubRepo]): Task[Unit] = @@ -55,25 +55,25 @@ final case class RepositoriesElasticsearch(executor: ElasticExecutor) { ElasticRequest.create[GitHubRepo](Index, unsafeWrap(DocumentId)(repository.id), repository) } bulkReq = ElasticRequest.bulk(reqs: _*).routing(routing) - _ <- executor.execute(bulkReq) + _ <- elasticsearch.execute(bulkReq) } yield () def upsert(id: String, repository: GitHubRepo): Task[Unit] = for { routing <- routingOf(repository.organization) req = ElasticRequest.upsert(Index, DocumentId(id), repository).routing(routing).refresh(value = true) - _ <- executor.execute(req) + _ <- elasticsearch.execute(req) } yield () def remove(organization: String, id: String): Task[DeletionOutcome] = for { routing <- routingOf(organization) req = ElasticRequest.deleteById(Index, DocumentId(id)).routing(routing).refreshFalse - res <- executor.execute(req) + res <- elasticsearch.execute(req) } yield res def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] = - executor.execute(ElasticRequest.search[GitHubRepo](Index, query)) + elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query)) private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] = Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e)) @@ -103,6 +103,6 @@ object RepositoriesElasticsearch { def search(query: ElasticQuery[_]): RIO[RepositoriesElasticsearch, List[GitHubRepo]] = ZIO.serviceWithZIO[RepositoriesElasticsearch](_.search(query)) - lazy val live: URLayer[ElasticExecutor, RepositoriesElasticsearch] = + lazy val live: URLayer[Elasticsearch, RepositoriesElasticsearch] = ZLayer.fromFunction(RepositoriesElasticsearch(_)) } diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index f380cdeba..2b49febc3 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -30,51 +30,53 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully create document") { checkOnce(genCustomer) { customer => for { - docId <- ElasticRequest.create[CustomerDocument](index, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, docId).execute + docId <- Elasticsearch.execute(ElasticRequest.create[CustomerDocument](index, customer)) + res <- Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, docId)) } yield assert(res)(isSome(equalTo(customer))) } }, test("successfully create document with ID given") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => - assertZIO(ElasticRequest.create[CustomerDocument](index, documentId, customer).execute)(equalTo(Created)) + assertZIO(Elasticsearch.execute(ElasticRequest.create[CustomerDocument](index, documentId, customer)))( + equalTo(Created) + ) } }, test("return 'AlreadyExists' if document with given ID already exists") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, customer1, customer2) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer1).execute - res <- ElasticRequest.create[CustomerDocument](index, documentId, customer2).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer1)) + res <- Elasticsearch.execute(ElasticRequest.create[CustomerDocument](index, documentId, customer2)) } yield assert(res)(equalTo(AlreadyExists)) } } ), suite("creating index")( test("successfully create index") { - assertZIO(ElasticRequest.createIndex(createIndexTestName, None).execute)(equalTo(Created)) + assertZIO(Elasticsearch.execute(ElasticRequest.createIndex(createIndexTestName, None)))(equalTo(Created)) }, test("return 'AlreadyExists' if index already exists") { for { - _ <- ElasticRequest.createIndex(createIndexTestName, None).execute - res <- ElasticRequest.createIndex(createIndexTestName, None).execute + _ <- Elasticsearch.execute(ElasticRequest.createIndex(createIndexTestName, None)) + res <- Elasticsearch.execute(ElasticRequest.createIndex(createIndexTestName, None)) } yield assert(res)(equalTo(AlreadyExists)) } - ) @@ after(ElasticRequest.deleteIndex(createIndexTestName).execute.orDie), + ) @@ after(Elasticsearch.execute(ElasticRequest.deleteIndex(createIndexTestName)).orDie), suite("creating or updating document")( test("successfully create document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + doc <- Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(doc)(isSome(equalTo(customer))) } }, test("successfully update document") { checkOnce(genDocumentId, genCustomer, genCustomer) { (documentId, firstCustomer, secondCustomer) => for { - _ <- ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer).execute - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer).execute - doc <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer)) + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer)) + doc <- Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(doc)(isSome(equalTo(secondCustomer))) } } @@ -83,14 +85,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.deleteById(index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- Elasticsearch.execute(ElasticRequest.deleteById(index, documentId)) } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.deleteById(index, documentId).execute)(equalTo(NotFound)) + assertZIO(Elasticsearch.execute(ElasticRequest.deleteById(index, documentId)))(equalTo(NotFound)) } } ), @@ -98,14 +100,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully delete existing index") { checkOnce(genIndexName) { name => for { - _ <- ElasticRequest.createIndex(name, None).execute - res <- ElasticRequest.deleteIndex(name).execute + _ <- Elasticsearch.execute(ElasticRequest.createIndex(name, None)) + res <- Elasticsearch.execute(ElasticRequest.deleteIndex(name)) } yield assert(res)(equalTo(Deleted)) } }, test("return 'NotFound' if index does not exists") { checkOnce(genIndexName) { name => - assertZIO(ElasticRequest.deleteIndex(name).execute)(equalTo(NotFound)) + assertZIO(Elasticsearch.execute(ElasticRequest.deleteIndex(name)))(equalTo(NotFound)) } } ), @@ -113,14 +115,14 @@ object HttpExecutorSpec extends IntegrationSpec { test("return true if the document exists") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.exists(index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- Elasticsearch.execute(ElasticRequest.exists(index, documentId)) } yield assert(res)(isTrue) } }, test("return false if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.exists(index, documentId).execute)(isFalse) + assertZIO(Elasticsearch.execute(ElasticRequest.exists(index, documentId)))(isFalse) } } ), @@ -128,21 +130,21 @@ object HttpExecutorSpec extends IntegrationSpec { test("successfully return document") { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { - _ <- ElasticRequest.upsert[CustomerDocument](index, documentId, customer).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) + res <- Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield assert(res)(isSome(equalTo(customer))) } }, test("return None if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticRequest.getById[CustomerDocument](index, documentId).execute)(isNone) + assertZIO(Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, documentId)))(isNone) } }, test("fail with throwable if decoding fails") { checkOnce(genDocumentId, genEmployee) { (documentId, employee) => val result = for { - _ <- ElasticRequest.upsert[EmployeeDocument](index, documentId, employee).execute - res <- ElasticRequest.getById[CustomerDocument](index, documentId).execute + _ <- Elasticsearch.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee)) + res <- Elasticsearch.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) } yield res assertZIO(result.exit)( @@ -156,36 +158,42 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + Elasticsearch.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = range("balance").gte(100) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(isNonEmpty) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("fail if any of results cannot be decoded") { checkOnce(genDocumentId, genDocumentId, genEmployee, genCustomer) { (employeeDocumentId, customerDocumentId, employee, customer) => val result = for { - _ <- ElasticRequest.deleteByQuery(secondSearchIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer).execute - _ <- ElasticRequest - .upsert[EmployeeDocument](secondSearchIndex, employeeDocumentId, employee) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest.upsert[CustomerDocument](secondSearchIndex, customerDocumentId, customer) + ) + _ <- Elasticsearch.execute( + ElasticRequest + .upsert[EmployeeDocument](secondSearchIndex, employeeDocumentId, employee) + .refreshTrue + ) query = range("age").gte(0) - res <- ElasticRequest.search[CustomerDocument](secondSearchIndex, query).execute + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](secondSearchIndex, query)) } yield res assertZIO(result.exit)( @@ -197,69 +205,78 @@ object HttpExecutorSpec extends IntegrationSpec { ) } } @@ around( - ElasticRequest.createIndex(secondSearchIndex, None).execute, - ElasticRequest.deleteIndex(secondSearchIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(secondSearchIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie ), test("search for a document which contains a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + Elasticsearch.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("search for a document which starts with a term using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + Elasticsearch.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3)) - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), test("search for a document which conforms to a pattern using a wildcard query") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => for { - _ <- ElasticRequest.deleteByQuery(firstSearchIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) _ <- - ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer).execute + Elasticsearch.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) _ <- - ElasticRequest - .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) query = wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}") - res <- ElasticRequest.search[CustomerDocument](firstSearchIndex, query).execute + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( - ElasticRequest.createIndex(firstSearchIndex, None).execute, - ElasticRequest.deleteIndex(firstSearchIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(firstSearchIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ) ) @@ shrinks(0), suite("deleting by query")( @@ -267,42 +284,45 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) { (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer, thirdDocumentId, thirdCustomer) => for { - _ <- ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - firstDocumentId, - firstCustomer.copy(balance = 150) - ) - .execute + _ <- Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + firstDocumentId, + firstCustomer.copy(balance = 150) + ) + ) _ <- - ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - secondDocumentId, - secondCustomer.copy(balance = 350) - ) - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + secondDocumentId, + secondCustomer.copy(balance = 350) + ) + ) _ <- - ElasticRequest - .upsert[CustomerDocument]( - deleteByQueryIndex, - thirdDocumentId, - thirdCustomer.copy(balance = 400) - ) - .refreshTrue - .execute + Elasticsearch.execute( + ElasticRequest + .upsert[CustomerDocument]( + deleteByQueryIndex, + thirdDocumentId, + thirdCustomer.copy(balance = 400) + ) + .refreshTrue + ) deleteQuery = range("balance").gte(300) - _ <- ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue.execute - res <- ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll).execute + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue) + res <- Elasticsearch.execute(ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll)) } yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150)))) } } @@ around( - ElasticRequest.createIndex(deleteByQueryIndex, None).execute, - ElasticRequest.deleteIndex(deleteByQueryIndex).execute.orDie + Elasticsearch.execute(ElasticRequest.createIndex(deleteByQueryIndex, None)), + Elasticsearch.execute(ElasticRequest.deleteIndex(deleteByQueryIndex)).orDie ), test("returns NotFound when provided index is missing") { checkOnce(genIndexName) { missingIndex => - assertZIO(ElasticRequest.deleteByQuery(missingIndex, matchAll).execute)(equalTo(NotFound)) + assertZIO(Elasticsearch.execute(ElasticRequest.deleteByQuery(missingIndex, matchAll)))(equalTo(NotFound)) } } ), @@ -311,29 +331,29 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genDocumentId, genDocumentId, genCustomer) { (firstDocId, secondDocId, thirdDocId, customer) => for { - _ <- ElasticRequest - .create[CustomerDocument](index, firstDocId, customer.copy(id = "randomIdString")) - .execute - _ <- ElasticRequest - .create[CustomerDocument](index, secondDocId, customer.copy(id = "randomIdString2")) - .refreshTrue - .execute + _ <- Elasticsearch.execute( + ElasticRequest + .create[CustomerDocument](index, firstDocId, customer.copy(id = "randomIdString")) + ) + _ <- Elasticsearch.execute( + ElasticRequest + .create[CustomerDocument](index, secondDocId, customer.copy(id = "randomIdString2")) + .refreshTrue + ) req1 = ElasticRequest.create[CustomerDocument](index, thirdDocId, customer) req2 = ElasticRequest.create[CustomerDocument](index, customer.copy(id = "randomIdString3")) req3 = ElasticRequest.upsert[CustomerDocument](index, firstDocId, customer.copy(balance = 3000)) req4 = ElasticRequest.deleteById(index, secondDocId) - res <- ElasticRequest.bulk(req1, req2, req3, req4).execute + res <- Elasticsearch.execute(ElasticRequest.bulk(req1, req2, req3, req4)) } yield assert(res)(isUnit) } } ) ) @@ nondeterministic @@ sequential @@ prepareElasticsearchIndexForTests @@ afterAll( - ElasticRequest.deleteIndex(index).execute.orDie + Elasticsearch.execute(ElasticRequest.deleteIndex(index)).orDie ) ).provideShared( elasticsearchLayer ) - } - } diff --git a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala index f94c89552..5c03ad35f 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/IntegrationSpec.scala @@ -27,8 +27,8 @@ import zio.test.{Assertion, Gen, TestAspect, ZIOSpecDefault, checkN} trait IntegrationSpec extends ZIOSpecDefault { - val elasticsearchLayer: ZLayer[Any, Throwable, ElasticExecutor] = - HttpClientZioBackend.layer() >>> ElasticExecutor.local + val elasticsearchLayer: ZLayer[Any, Throwable, Elasticsearch] = + HttpClientZioBackend.layer() >>> ElasticExecutor.local >>> Elasticsearch.layer val index: IndexName = IndexName("users") @@ -41,8 +41,8 @@ trait IntegrationSpec extends ZIOSpecDefault { val createIndexTestName: IndexName = IndexName("create-index-test-name") val prepareElasticsearchIndexForTests: TestAspect[Nothing, Any, Throwable, Any] = beforeAll((for { - _ <- ElasticRequest.createIndex(index, None).execute - _ <- ElasticRequest.deleteByQuery(index, matchAll).refreshTrue.execute + _ <- Elasticsearch.execute(ElasticRequest.createIndex(index, None)) + _ <- Elasticsearch.execute(ElasticRequest.deleteByQuery(index, matchAll).refreshTrue) } yield ()).provide(elasticsearchLayer)) def genIndexName: Gen[Any, IndexName] = diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 7ccaffb17..509ace7d0 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -20,7 +20,7 @@ import sttp.client3.SttpBackend import zio.stm.TMap import zio.{Task, ULayer, ZLayer} -trait ElasticExecutor { +private[elasticsearch] trait ElasticExecutor { def execute[A](request: ElasticRequest[A, _]): Task[A] } diff --git a/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala new file mode 100644 index 000000000..5e1f647a8 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch + +import zio.{RIO, Task, URLayer, ZIO, ZLayer} + +trait Elasticsearch { + def execute[A](request: ElasticRequest[A, _]): Task[A] +} + +object Elasticsearch { + def execute[A](request: ElasticRequest[A, _]): RIO[Elasticsearch, A] = + ZIO.serviceWithZIO[Elasticsearch](_.execute(request)) + + lazy val layer: URLayer[ElasticExecutor, Elasticsearch] = + ZLayer.fromFunction { executor: ElasticExecutor => + new Elasticsearch { + override def execute[A](request: ElasticRequest[A, _]): Task[A] = executor.execute(request) + } + } +} diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 1dbf694ca..dcb48d76b 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -66,7 +66,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue.execute)( + assertZIO( + addStubMapping *> Elasticsearch.execute(ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue) + )( isUnit ) }, @@ -89,11 +91,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .create[GitHubRepo](index = index, doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(DocumentId("V4x8q4UB3agN0z75fv5r"))) }, test("creating request with given ID") { @@ -106,11 +109,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(Created)) }, test("creating index request") { @@ -120,7 +124,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.createIndex(name = index, definition = None).execute)( + assertZIO(addStubMapping *> Elasticsearch.execute(ElasticRequest.createIndex(name = index, definition = None)))( equalTo(Created) ) }, @@ -134,11 +138,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) + .routing(Routing("routing")) + .refreshTrue + ) )(isUnit) }, test("deleting by ID request") { @@ -151,11 +156,12 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .refreshTrue - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + .refreshTrue + ) )(equalTo(Deleted)) }, test("deleting by query request") { @@ -168,7 +174,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest.deleteByQuery(index = index, query = matchAll).refreshTrue.execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest.deleteByQuery(index = index, query = matchAll).refreshTrue + ) )( equalTo(Deleted) ) @@ -182,7 +190,7 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.deleteIndex(name = index).execute)( + assertZIO(addStubMapping *> Elasticsearch.execute(ElasticRequest.deleteIndex(name = index)))( equalTo(Deleted) ) }, @@ -196,10 +204,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .exists(index = index, id = DocumentId("example-id")) - .routing(Routing("routing")) - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .exists(index = index, id = DocumentId("example-id")) + .routing(Routing("routing")) + ) )(isTrue) }, test("getting by ID request") { @@ -227,10 +236,11 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticRequest - .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - .execute + addStubMapping *> Elasticsearch.execute( + ElasticRequest + .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + ) )(isSome(equalTo(repo))) }, test("getting by query request") { @@ -280,7 +290,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) ) - assertZIO(addStubMapping *> ElasticRequest.search[GitHubRepo](index = index, query = matchAll).execute)( + assertZIO( + addStubMapping *> Elasticsearch.execute(ElasticRequest.search[GitHubRepo](index = index, query = matchAll)) + )( equalTo(List(repo)) ) } diff --git a/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala index 449050b0c..be3a5ae5b 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/WireMockSpec.scala @@ -30,9 +30,11 @@ trait WireMockSpec extends ZIOSpecDefault { val port: Int = 9300 - val elasticsearchWireMockLayer: TaskLayer[ElasticExecutor] = + val elasticsearchWireMockLayer: TaskLayer[Elasticsearch] = HttpClientZioBackend - .layer() >>> (ZLayer.succeed(ElasticConfig.apply("localhost", port)) >>> ElasticExecutor.live) + .layer() >>> (ZLayer.succeed( + ElasticConfig.apply("localhost", port) + ) >>> ElasticExecutor.live) >>> Elasticsearch.layer val wireMockServerLayer: TaskLayer[WireMockServer] = { val server = ZIO.acquireRelease(