From 8a02b84b0b84d5827d751cbfd4851443c590dcdc Mon Sep 17 00:00:00 2001 From: Dimitrije Bulaja Date: Fri, 6 Jan 2023 01:14:20 +0100 Subject: [PATCH 1/4] Implement test executor --- .../zio/elasticsearch/ElasticExecutor.scala | 6 + .../zio/elasticsearch/TestExecutor.scala | 124 ++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 44f104a3d..a3ef4fe9e 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -1,6 +1,7 @@ package zio.elasticsearch import sttp.client3.SttpBackend +import zio.stm.TMap import zio.{Task, ZIO, ZLayer} trait ElasticExecutor { @@ -16,6 +17,11 @@ object ElasticExecutor { } yield HttpElasticExecutor(conf, sttp) } + lazy val test: ZLayer[Any, Throwable, ElasticExecutor] = + ZLayer { + TMap.empty[IndexName, TMap[DocumentId, Document]].map(esMap => TestExecutor(esMap)).commit + } + lazy val local: ZLayer[SttpBackend[Task, Any], Throwable, ElasticExecutor] = ZLayer.succeed(ElasticConfig.Default) >>> live } diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala new file mode 100644 index 000000000..920ffdd98 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala @@ -0,0 +1,124 @@ +package zio.elasticsearch + +import zio.Random.nextUUID +import zio.elasticsearch.CreationOutcome.{AlreadyExists, Created} +import zio.elasticsearch.DeletionOutcome.{Deleted, NotFound} +import zio.{Task, ZIO} +import zio.elasticsearch.ElasticRequest._ +import zio.json.ast.Json +import zio.stm.{STM, TMap, ZSTM} + +private[elasticsearch] final case class TestExecutor private (esMap: TMap[IndexName, TMap[DocumentId, Document]]) + extends ElasticExecutor { + self => + + override def execute[A](request: ElasticRequest[A, _]): Task[A] = + request match { + case CreateRequest(index, document, _, _) => + fakeCreate(index, document) + case CreateWithIdRequest(index, id, document, _, _) => + fakeCreateWithId(index, id, document) + case CreateIndexRequest(name, _) => + fakeCreateIndex(name) + case CreateOrUpdateRequest(index, id, document, _, _) => + fakeCreateOrUpdate(index, id, document) + case DeleteByIdRequest(index, id, _, _) => + fakeDeleteById(index, id) + case DeleteIndexRequest(name) => + fakeDeleteIndex(name) + case ExistsRequest(index, id, _) => + fakeExists(index, id) + case GetByIdRequest(index, id, _) => + fakeGetById(index, id) + case GetByQueryRequest(index, _, _) => + fakeGetByQuery(index) + case map @ Map(_, _) => execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) + } + + private def fakeCreate(index: IndexName, document: Document): Task[DocumentId] = + (for { + documents <- getDocumentsFromIndex(index) + documentId = DocumentId(nextUUID.toString) + _ <- documents.put(documentId, document) + } yield documentId).commit + + private def fakeCreateWithId(index: IndexName, documentId: DocumentId, document: Document): Task[CreationOutcome] = + (for { + documents <- getDocumentsFromIndex(index) + alreadyExists <- documents.contains(documentId) + _ <- documents.putIfAbsent(documentId, document) + } yield if (alreadyExists) AlreadyExists else Created).commit + + private def fakeCreateIndex(index: IndexName): Task[CreationOutcome] = + (for { + alreadyExists <- self.esMap.contains(index) + emptyDocuments <- TMap.empty[DocumentId, Document] + _ <- self.esMap.putIfAbsent(index, emptyDocuments) + } yield if (alreadyExists) AlreadyExists else Created).commit + + private def fakeCreateOrUpdate(index: IndexName, documentId: DocumentId, document: Document): Task[Unit] = + (for { + documents <- getDocumentsFromIndex(index) + _ <- documents.put(documentId, document) + } yield ()).commit + + private def fakeDeleteById(index: IndexName, documentId: DocumentId): Task[DeletionOutcome] = + (for { + documents <- getDocumentsFromIndex(index) + notExists <- documents.contains(documentId) + _ <- documents.delete(documentId) + } yield if (notExists) NotFound else Deleted).commit + + private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = + (for { + notExists <- self.esMap.contains(index) + _ <- self.esMap.delete(index) + } yield if (notExists) NotFound else Deleted).commit + + private def fakeExists(index: IndexName, documentId: DocumentId): Task[Boolean] = + (for { + documents <- getDocumentsFromIndex(index) + exists <- documents.contains(documentId) + } yield exists).commit + + private def fakeGetById(index: IndexName, documentId: DocumentId): Task[Option[Document]] = + (for { + documents <- getDocumentsFromIndex(index) + maybeDocument <- documents.get(documentId) + } yield maybeDocument).commit + + private def fakeGetByQuery(index: IndexName): Task[ElasticQueryResponse] = + getDocumentsFromIndex(index).flatMap(documents => createElasticQueryResponse(index, documents)).commit + + private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = + for { + maybeDocuments <- self.esMap.get(index) + documents <- maybeDocuments.fold[STM[ElasticException, TMap[DocumentId, Document]]]( + STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) + )(STM.succeed(_)) + } yield documents + + private def createElasticQueryResponse( + index: IndexName, + documents: TMap[DocumentId, Document] + ): ZSTM[Any, Nothing, ElasticQueryResponse] = { + val shards = Shards(total = 1, successful = 1, skipped = 0, failed = 0) + + for { + items <- + documents.toList.map( + _.map(pair => + Item( + index = index.toString, + `type` = "type", + id = pair._1.toString, + score = 1, + source = Json.Str(pair._2.json) + ) + ) + ) + hitsSize <- documents.size + hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = 1, hits = items) + } yield ElasticQueryResponse(took = 1, timedOut = false, shards = shards, hits = hits) + } +} From 560800480211dc00c8ca247e5921c916cbb7336c Mon Sep 17 00:00:00 2001 From: Dimitrije Bulaja Date: Fri, 6 Jan 2023 12:22:45 +0100 Subject: [PATCH 2/4] Refactor code --- .../zio/elasticsearch/ElasticExecutor.scala | 12 +++++----- .../zio/elasticsearch/TestExecutor.scala | 22 +++++++++++-------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index a3ef4fe9e..f38afed46 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -2,7 +2,7 @@ package zio.elasticsearch import sttp.client3.SttpBackend import zio.stm.TMap -import zio.{Task, ZIO, ZLayer} +import zio.{Task, ULayer, ZIO, ZLayer} trait ElasticExecutor { def execute[A](request: ElasticRequest[A, _]): Task[A] @@ -17,11 +17,11 @@ object ElasticExecutor { } yield HttpElasticExecutor(conf, sttp) } - lazy val test: ZLayer[Any, Throwable, ElasticExecutor] = - ZLayer { - TMap.empty[IndexName, TMap[DocumentId, Document]].map(esMap => TestExecutor(esMap)).commit - } - lazy val local: ZLayer[SttpBackend[Task, Any], Throwable, ElasticExecutor] = ZLayer.succeed(ElasticConfig.Default) >>> live + + lazy val test: ULayer[TestExecutor] = + ZLayer( + TMap.empty[IndexName, TMap[DocumentId, Document]].map(TestExecutor).commit + ) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala index 920ffdd98..317f8f9ec 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala @@ -3,13 +3,14 @@ package zio.elasticsearch import zio.Random.nextUUID import zio.elasticsearch.CreationOutcome.{AlreadyExists, Created} import zio.elasticsearch.DeletionOutcome.{Deleted, NotFound} -import zio.{Task, ZIO} import zio.elasticsearch.ElasticRequest._ import zio.json.ast.Json import zio.stm.{STM, TMap, ZSTM} +import zio.{Task, ZIO} -private[elasticsearch] final case class TestExecutor private (esMap: TMap[IndexName, TMap[DocumentId, Document]]) - extends ElasticExecutor { +private[elasticsearch] final case class TestExecutor private ( + indexWithDocumentsMap: TMap[IndexName, TMap[DocumentId, Document]] +) extends ElasticExecutor { self => override def execute[A](request: ElasticRequest[A, _]): Task[A] = @@ -51,9 +52,9 @@ private[elasticsearch] final case class TestExecutor private (esMap: TMap[IndexN private def fakeCreateIndex(index: IndexName): Task[CreationOutcome] = (for { - alreadyExists <- self.esMap.contains(index) + alreadyExists <- self.indexWithDocumentsMap.contains(index) emptyDocuments <- TMap.empty[DocumentId, Document] - _ <- self.esMap.putIfAbsent(index, emptyDocuments) + _ <- self.indexWithDocumentsMap.putIfAbsent(index, emptyDocuments) } yield if (alreadyExists) AlreadyExists else Created).commit private def fakeCreateOrUpdate(index: IndexName, documentId: DocumentId, document: Document): Task[Unit] = @@ -71,8 +72,8 @@ private[elasticsearch] final case class TestExecutor private (esMap: TMap[IndexN private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = (for { - notExists <- self.esMap.contains(index) - _ <- self.esMap.delete(index) + notExists <- self.indexWithDocumentsMap.contains(index) + _ <- self.indexWithDocumentsMap.delete(index) } yield if (notExists) NotFound else Deleted).commit private def fakeExists(index: IndexName, documentId: DocumentId): Task[Boolean] = @@ -88,11 +89,14 @@ private[elasticsearch] final case class TestExecutor private (esMap: TMap[IndexN } yield maybeDocument).commit private def fakeGetByQuery(index: IndexName): Task[ElasticQueryResponse] = - getDocumentsFromIndex(index).flatMap(documents => createElasticQueryResponse(index, documents)).commit + (for { + documents <- getDocumentsFromIndex(index) + elasticQueryResponse <- createElasticQueryResponse(index, documents) + } yield elasticQueryResponse).commit private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = for { - maybeDocuments <- self.esMap.get(index) + maybeDocuments <- self.indexWithDocumentsMap.get(index) documents <- maybeDocuments.fold[STM[ElasticException, TMap[DocumentId, Document]]]( STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) )(STM.succeed(_)) From e1a1dd4720d71e825dc2c3a622275b67429147cd Mon Sep 17 00:00:00 2001 From: Dimitrije Bulaja Date: Sun, 8 Jan 2023 16:09:14 +0100 Subject: [PATCH 3/4] Fix code remarks --- .../zio/elasticsearch/ElasticExecutor.scala | 4 +--- .../scala/zio/elasticsearch/TestExecutor.scala | 18 +++++++++--------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index f38afed46..52544d8ed 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -21,7 +21,5 @@ object ElasticExecutor { ZLayer.succeed(ElasticConfig.Default) >>> live lazy val test: ULayer[TestExecutor] = - ZLayer( - TMap.empty[IndexName, TMap[DocumentId, Document]].map(TestExecutor).commit - ) + ZLayer(TMap.empty[IndexName, TMap[DocumentId, Document]].map(TestExecutor).commit) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala index 317f8f9ec..8219b27a7 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala @@ -8,9 +8,8 @@ import zio.json.ast.Json import zio.stm.{STM, TMap, ZSTM} import zio.{Task, ZIO} -private[elasticsearch] final case class TestExecutor private ( - indexWithDocumentsMap: TMap[IndexName, TMap[DocumentId, Document]] -) extends ElasticExecutor { +private[elasticsearch] final case class TestExecutor private (data: TMap[IndexName, TMap[DocumentId, Document]]) + extends ElasticExecutor { self => override def execute[A](request: ElasticRequest[A, _]): Task[A] = @@ -33,7 +32,8 @@ private[elasticsearch] final case class TestExecutor private ( fakeGetById(index, id) case GetByQueryRequest(index, _, _) => fakeGetByQuery(index) - case map @ Map(_, _) => execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) + case map @ Map(_, _) => + execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) } private def fakeCreate(index: IndexName, document: Document): Task[DocumentId] = @@ -52,9 +52,9 @@ private[elasticsearch] final case class TestExecutor private ( private def fakeCreateIndex(index: IndexName): Task[CreationOutcome] = (for { - alreadyExists <- self.indexWithDocumentsMap.contains(index) + alreadyExists <- self.data.contains(index) emptyDocuments <- TMap.empty[DocumentId, Document] - _ <- self.indexWithDocumentsMap.putIfAbsent(index, emptyDocuments) + _ <- self.data.putIfAbsent(index, emptyDocuments) } yield if (alreadyExists) AlreadyExists else Created).commit private def fakeCreateOrUpdate(index: IndexName, documentId: DocumentId, document: Document): Task[Unit] = @@ -72,8 +72,8 @@ private[elasticsearch] final case class TestExecutor private ( private def fakeDeleteIndex(index: IndexName): Task[DeletionOutcome] = (for { - notExists <- self.indexWithDocumentsMap.contains(index) - _ <- self.indexWithDocumentsMap.delete(index) + notExists <- self.data.contains(index) + _ <- self.data.delete(index) } yield if (notExists) NotFound else Deleted).commit private def fakeExists(index: IndexName, documentId: DocumentId): Task[Boolean] = @@ -96,7 +96,7 @@ private[elasticsearch] final case class TestExecutor private ( private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = for { - maybeDocuments <- self.indexWithDocumentsMap.get(index) + maybeDocuments <- self.data.get(index) documents <- maybeDocuments.fold[STM[ElasticException, TMap[DocumentId, Document]]]( STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) )(STM.succeed(_)) From 055db686ad0eca74ec62c7d3453bd330ba026d09 Mon Sep 17 00:00:00 2001 From: Dimitrije Bulaja Date: Sun, 8 Jan 2023 20:12:27 +0100 Subject: [PATCH 4/4] Fix code remarks --- .../zio/elasticsearch/TestExecutor.scala | 57 ++++++++++--------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala index 8219b27a7..b8e54dac2 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/TestExecutor.scala @@ -88,11 +88,36 @@ private[elasticsearch] final case class TestExecutor private (data: TMap[IndexNa maybeDocument <- documents.get(documentId) } yield maybeDocument).commit - private def fakeGetByQuery(index: IndexName): Task[ElasticQueryResponse] = + private def fakeGetByQuery(index: IndexName): Task[ElasticQueryResponse] = { + def createElasticQueryResponse( + index: IndexName, + documents: TMap[DocumentId, Document] + ): ZSTM[Any, Nothing, ElasticQueryResponse] = { + val shards = Shards(total = 1, successful = 1, skipped = 0, failed = 0) + + for { + items <- + documents.toList.map( + _.map { case (id, document) => + Item( + index = index.toString, + `type` = "type", + id = id.toString, + score = 1, + source = Json.Str(document.json) + ) + } + ) + hitsSize <- documents.size + hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = 1, hits = items) + } yield ElasticQueryResponse(took = 1, timedOut = false, shards = shards, hits = hits) + } + (for { - documents <- getDocumentsFromIndex(index) - elasticQueryResponse <- createElasticQueryResponse(index, documents) - } yield elasticQueryResponse).commit + documents <- getDocumentsFromIndex(index) + response <- createElasticQueryResponse(index, documents) + } yield response).commit + } private def getDocumentsFromIndex(index: IndexName): ZSTM[Any, ElasticException, TMap[DocumentId, Document]] = for { @@ -101,28 +126,4 @@ private[elasticsearch] final case class TestExecutor private (data: TMap[IndexNa STM.fail[ElasticException](new ElasticException(s"Index $index does not exists!")) )(STM.succeed(_)) } yield documents - - private def createElasticQueryResponse( - index: IndexName, - documents: TMap[DocumentId, Document] - ): ZSTM[Any, Nothing, ElasticQueryResponse] = { - val shards = Shards(total = 1, successful = 1, skipped = 0, failed = 0) - - for { - items <- - documents.toList.map( - _.map(pair => - Item( - index = index.toString, - `type` = "type", - id = pair._1.toString, - score = 1, - source = Json.Str(pair._2.json) - ) - ) - ) - hitsSize <- documents.size - hits = Hits(total = Total(value = hitsSize, relation = ""), maxScore = 1, hits = items) - } yield ElasticQueryResponse(took = 1, timedOut = false, shards = shards, hits = hits) - } }