diff --git a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala index 8a84df0d1..e2ee2f4eb 100644 --- a/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala +++ b/modules/example/src/main/scala/example/RepositoriesElasticsearch.scala @@ -32,12 +32,13 @@ import zio.prelude.Newtype.unsafeWrap final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { def findAll(): Task[List[GitHubRepo]] = - elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, matchAll)) + elasticsearch.execute(ElasticRequest.search(Index, matchAll)).documentAs[GitHubRepo] def findById(organization: String, id: String): Task[Option[GitHubRepo]] = for { routing <- routingOf(organization) - res <- elasticsearch.execute(ElasticRequest.getById[GitHubRepo](Index, DocumentId(id)).routing(routing)) + res <- + elasticsearch.execute(ElasticRequest.getById(Index, DocumentId(id)).routing(routing)).documentAs[GitHubRepo] } yield res def create(repository: GitHubRepo): Task[CreationOutcome] = @@ -75,7 +76,7 @@ final case class RepositoriesElasticsearch(elasticsearch: Elasticsearch) { } yield res def search(query: ElasticQuery[_]): Task[List[GitHubRepo]] = - elasticsearch.execute(ElasticRequest.search[GitHubRepo](Index, query)) + elasticsearch.execute(ElasticRequest.search(Index, query)).documentAs[GitHubRepo] private def routingOf(value: String): IO[IllegalArgumentException, Routing.Type] = Routing.make(value).toZIO.mapError(e => new IllegalArgumentException(e)) diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index a6db87fca..8b9750aa9 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -16,11 +16,15 @@ package zio.elasticsearch +import zio.Chunk import zio.elasticsearch.ElasticQuery._ +import zio.stream.{Sink, ZSink} import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import scala.util.Random + object HttpExecutorSpec extends IntegrationSpec { def spec: Spec[TestEnvironment, Any] = { @@ -31,7 +35,7 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genCustomer) { customer => for { docId <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, customer)) - res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, docId)) + res <- ElasticExecutor.execute(ElasticRequest.getById(index, docId)).documentAs[CustomerDocument] } yield assert(res)(isSome(equalTo(customer))) } }, @@ -67,7 +71,7 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) - doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) + doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(doc)(isSome(equalTo(customer))) } }, @@ -76,7 +80,7 @@ object HttpExecutorSpec extends IntegrationSpec { for { _ <- ElasticExecutor.execute(ElasticRequest.create[CustomerDocument](index, documentId, firstCustomer)) _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, secondCustomer)) - doc <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) + doc <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(doc)(isSome(equalTo(secondCustomer))) } } @@ -131,20 +135,22 @@ object HttpExecutorSpec extends IntegrationSpec { checkOnce(genDocumentId, genCustomer) { (documentId, customer) => for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[CustomerDocument](index, documentId, customer)) - res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) + res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield assert(res)(isSome(equalTo(customer))) } }, test("return None if the document does not exist") { checkOnce(genDocumentId) { documentId => - assertZIO(ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)))(isNone) + assertZIO( + ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] + )(isNone) } }, test("fail with throwable if decoding fails") { checkOnce(genDocumentId, genEmployee) { (documentId, employee) => val result = for { _ <- ElasticExecutor.execute(ElasticRequest.upsert[EmployeeDocument](index, documentId, employee)) - res <- ElasticExecutor.execute(ElasticRequest.getById[CustomerDocument](index, documentId)) + res <- ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument] } yield res assertZIO(result.exit)( @@ -170,7 +176,8 @@ object HttpExecutorSpec extends IntegrationSpec { .refreshTrue ) query = range("balance").gte(100) - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) + res <- + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(isNonEmpty) } } @@ around( @@ -193,7 +200,9 @@ object HttpExecutorSpec extends IntegrationSpec { .refreshTrue ) query = range("age").gte(0) - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](secondSearchIndex, query)) + res <- ElasticExecutor + .execute(ElasticRequest.search(secondSearchIndex, query)) + .documentAs[CustomerDocument] } yield res assertZIO(result.exit)( @@ -224,7 +233,8 @@ object HttpExecutorSpec extends IntegrationSpec { .refreshTrue ) query = ElasticQuery.contains("name.keyword", firstCustomer.name.take(3)) - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) + res <- + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -247,7 +257,8 @@ object HttpExecutorSpec extends IntegrationSpec { .refreshTrue ) query = ElasticQuery.startsWith("name.keyword", firstCustomer.name.take(3)) - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) + res <- + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -271,7 +282,8 @@ object HttpExecutorSpec extends IntegrationSpec { ) query = wildcard("name.keyword", s"${firstCustomer.name.take(2)}*${firstCustomer.name.takeRight(2)}") - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](firstSearchIndex, query)) + res <- + ElasticExecutor.execute(ElasticRequest.search(firstSearchIndex, query)).documentAs[CustomerDocument] } yield assert(res)(Assertion.contains(firstCustomer)) } } @@ around( @@ -279,6 +291,92 @@ object HttpExecutorSpec extends IntegrationSpec { ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ) ) @@ shrinks(0), + suite("searching documents and returning them as a stream")( + test("search for documents using range query") { + checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer) { + (firstDocumentId, firstCustomer, secondDocumentId, secondCustomer) => + val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item] + + for { + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) + _ <- ElasticExecutor.execute( + ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer) + ) + _ <- ElasticExecutor.execute( + ElasticRequest + .upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer) + .refreshTrue + ) + query = range("balance").gte(100) + res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink) + } yield assert(res)(isNonEmpty) + } + } @@ around( + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie + ), + test("search for documents using range query with multiple pages") { + checkOnce(genCustomer) { customer => + def sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item] + + for { + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll)) + reqs = (0 to 203).map { _ => + ElasticRequest.create[CustomerDocument]( + secondSearchIndex, + customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150) + ) + } + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue) + query = range("balance").gte(100) + res <- ElasticExecutor + .stream( + ElasticRequest.search(secondSearchIndex, query) + ) + .run(sink) + } yield assert(res)(hasSize(equalTo(204))) + } + } @@ around( + ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie + ), + test("search for documents using range query with multiple pages and return type") { + checkOnce(genCustomer) { customer => + def sink: Sink[Throwable, CustomerDocument, Nothing, Chunk[CustomerDocument]] = + ZSink.collectAll[CustomerDocument] + + for { + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll)) + reqs = (0 to 200).map { _ => + ElasticRequest.create[CustomerDocument]( + secondSearchIndex, + customer.copy(id = Random.alphanumeric.take(5).mkString, balance = 150) + ) + } + _ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue) + query = range("balance").gte(100) + res <- ElasticExecutor + .streamAs[CustomerDocument](ElasticRequest.search(secondSearchIndex, query)) + .run(sink) + } yield assert(res)(hasSize(equalTo(201))) + } + } @@ around( + ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie + ), + test("search for documents using range query - empty stream") { + val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item] + + for { + _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) + query = range("balance").gte(100) + res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink) + } yield assert(res)(hasSize(equalTo(0))) + } @@ around( + ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)), + ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie + ) + ) @@ shrinks(0), suite("deleting by query")( test("successfully delete all matched documents") { checkOnce(genDocumentId, genCustomer, genDocumentId, genCustomer, genDocumentId, genCustomer) { @@ -314,7 +412,9 @@ object HttpExecutorSpec extends IntegrationSpec { deleteQuery = range("balance").gte(300) _ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(deleteByQueryIndex, deleteQuery).refreshTrue) - res <- ElasticExecutor.execute(ElasticRequest.search[CustomerDocument](deleteByQueryIndex, matchAll)) + res <- ElasticExecutor + .execute(ElasticRequest.search(deleteByQueryIndex, matchAll)) + .documentAs[CustomerDocument] } yield assert(res)(hasSameElements(List(firstCustomer.copy(balance = 150)))) } } @@ around( diff --git a/modules/library/src/main/scala/zio/elasticsearch/Document.scala b/modules/library/src/main/scala/zio/elasticsearch/Document.scala index bd24d8660..c6c8f5de0 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Document.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Document.scala @@ -16,19 +16,13 @@ package zio.elasticsearch -import zio.json.ast.Json import zio.schema.Schema -import zio.schema.codec.JsonCodec.JsonDecoder -import zio.schema.codec.{DecodeError, JsonCodec} +import zio.schema.codec.JsonCodec -private[elasticsearch] final case class Document(json: String) { - def decode[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, json) -} +private[elasticsearch] final case class Document(json: String) private[elasticsearch] object Document { def from[A](doc: A)(implicit schema: Schema[A]): Document = Document( JsonCodec.jsonEncoder(schema).encodeJson(doc, indent = None).toString ) - - def from(json: Json): Document = new Document(json.toString) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala index 067e4f47f..30e209eab 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticExecutor.scala @@ -17,10 +17,17 @@ package zio.elasticsearch import sttp.client3.SttpBackend +import zio.elasticsearch.ElasticRequest.Search +import zio.schema.Schema +import zio.stream.{Stream, ZStream} import zio.{RIO, Task, URLayer, ZIO, ZLayer} private[elasticsearch] trait ElasticExecutor { - def execute[A](request: ElasticRequest[A, _]): Task[A] + def execute[A](request: ElasticRequest[A]): Task[A] + + def stream(request: Search): Stream[Throwable, Item] + + def streamAs[A: Schema](request: Search): Stream[Throwable, A] } object ElasticExecutor { @@ -30,6 +37,12 @@ object ElasticExecutor { lazy val local: URLayer[SttpBackend[Task, Any], ElasticExecutor] = ZLayer.succeed(ElasticConfig.Default) >>> live - private[elasticsearch] def execute[A](request: ElasticRequest[A, _]): RIO[ElasticExecutor, A] = + private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] = ZIO.serviceWithZIO[ElasticExecutor](_.execute(request)) + + private[elasticsearch] def stream(request: Search): ZStream[ElasticExecutor, Throwable, Item] = + ZStream.serviceWithStream[ElasticExecutor](_.stream(request)) + + private[elasticsearch] def streamAs[A: Schema](request: Search): ZStream[ElasticExecutor, Throwable, A] = + ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request)) } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala index cc3870b17..6fb0f422e 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticQueryResponse.scala @@ -20,6 +20,8 @@ import zio.json.ast.Json import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField} private[elasticsearch] final case class ElasticQueryResponse( + @jsonField("_scroll_id") + scrollId: Option[String], took: Int, @jsonField("timed_out") timedOut: Boolean, @@ -50,7 +52,7 @@ private[elasticsearch] final case class Hits( total: Total, @jsonField("max_score") maxScore: Option[Double] = None, - hits: List[Item] + hits: List[Hit] ) private[elasticsearch] object Hits { @@ -63,7 +65,7 @@ private[elasticsearch] object Total { implicit val decoder: JsonDecoder[Total] = DeriveJsonDecoder.gen[Total] } -private[elasticsearch] final case class Item( +private[elasticsearch] final case class Hit( @jsonField("_index") index: String, @jsonField("_type") @@ -76,6 +78,6 @@ private[elasticsearch] final case class Item( source: Json ) -private[elasticsearch] object Item { - implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item] +private[elasticsearch] object Hit { + implicit val decoder: JsonDecoder[Hit] = DeriveJsonDecoder.gen[Hit] } diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index 4ece49d9f..cb5a0df1f 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -16,204 +16,238 @@ package zio.elasticsearch -import zio.elasticsearch.Refresh.WithRefresh -import zio.elasticsearch.Routing.{Routing, WithRouting} -import zio.prelude._ +import zio.elasticsearch.Routing.Routing import zio.schema.Schema -import zio.schema.codec.JsonCodec.JsonDecoder -import scala.annotation.unused -import scala.language.implicitConversions +trait HasRefresh[A] { + def refresh(value: Boolean): ElasticRequest[A] -sealed trait ElasticRequest[+A, ERT <: ElasticRequestType] { self => + def refreshFalse: ElasticRequest[A] - final def map[B](f: A => Either[DecodingException, B]): ElasticRequest[B, ERT] = ElasticRequest.Map(self, f) - - final def refresh(value: Boolean)(implicit wr: WithRefresh[ERT]): ElasticRequest[A, ERT] = - wr.withRefresh(request = self, value = value) + def refreshTrue: ElasticRequest[A] +} - final def refreshFalse(implicit wr: WithRefresh[ERT]): ElasticRequest[A, ERT] = - wr.withRefresh(request = self, value = false) +trait HasRouting[A] { + def routing(value: Routing): ElasticRequest[A] +} - final def refreshTrue(implicit wr: WithRefresh[ERT]): ElasticRequest[A, ERT] = - wr.withRefresh(request = self, value = true) +sealed trait BulkableRequest[A] extends ElasticRequest[A] - final def routing(value: Routing)(implicit wr: WithRouting[ERT]): ElasticRequest[A, ERT] = - wr.withRouting(request = self, routing = value) -} +sealed trait ElasticRequest[A] object ElasticRequest { - import ElasticRequestType._ - - def bulk(requests: BulkableRequest*): ElasticRequest[Unit, Bulk] = - BulkRequest.of(requests: _*) - - def create[A: Schema](index: IndexName, doc: A): ElasticRequest[DocumentId, Create] = - CreateRequest(index = index, document = Document.from(doc), refresh = false, routing = None) - - def create[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[CreationOutcome, CreateWithId] = - CreateWithIdRequest(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) - - def createIndex(name: IndexName): ElasticRequest[CreationOutcome, CreateIndex] = - CreateIndexRequest(name = name, definition = None) - - def createIndex(name: IndexName, definition: String): ElasticRequest[CreationOutcome, CreateIndex] = - CreateIndexRequest(name = name, definition = Some(definition)) - - def deleteById(index: IndexName, id: DocumentId): ElasticRequest[DeletionOutcome, DeleteById] = - DeleteByIdRequest(index = index, id = id, refresh = false, routing = None) - - def deleteByQuery(index: IndexName, query: ElasticQuery[_]): ElasticRequest[DeletionOutcome, DeleteByQuery] = - DeleteByQueryRequest(index = index, query = query, refresh = false, routing = None) - - def deleteIndex(name: IndexName): ElasticRequest[DeletionOutcome, DeleteIndex] = - DeleteIndexRequest(name) - - def exists(index: IndexName, id: DocumentId): ElasticRequest[Boolean, Exists] = - ExistsRequest(index = index, id = id, routing = None) - - def getById[A: Schema](index: IndexName, id: DocumentId): ElasticRequest[Option[A], GetById] = - GetByIdRequest(index = index, id = id, routing = None).map { - case Some(document) => - document.decode match { - case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) - case Right(doc) => Right(Some(doc)) - } - case None => - Right(None) - } - - def search[A](index: IndexName, query: ElasticQuery[_])(implicit - schema: Schema[A] - ): ElasticRequest[List[A], GetByQuery] = - GetByQueryRequest(index = index, query = query, routing = None).map { response => - Validation - .validateAll(response.results.map { json => - ZValidation.fromEither(JsonDecoder.decode(schema, json.toString)) - }) - .toEitherWith { errors => - DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") - } - } - - def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): ElasticRequest[Unit, Upsert] = - CreateOrUpdateRequest(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) - - private[elasticsearch] final case class BulkableRequest private (request: ElasticRequest[_, _]) - - object BulkableRequest { - implicit def toBulkable[ERT <: ElasticRequestType](request: ElasticRequest[_, ERT])(implicit - @unused ev: ERT <:< BulkableRequestType - ): BulkableRequest = - BulkableRequest(request) - - implicit def toBulkableList[ERT <: ElasticRequestType](requests: List[ElasticRequest[_, ERT]])(implicit - @unused ev: ERT <:< BulkableRequestType - ): List[BulkableRequest] = - requests.map(BulkableRequest(_)) - } + def bulk(requests: BulkableRequest[_]*): Bulk = + Bulk.of(requests = requests: _*) + + def create[A: Schema](index: IndexName, doc: A): Create = + Create(index = index, document = Document.from(doc), refresh = false, routing = None) + + def create[A: Schema](index: IndexName, id: DocumentId, doc: A): CreateWithId = + CreateWithId(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) + + def createIndex(name: IndexName): CreateIndex = + CreateIndex(name = name, definition = None) + + def createIndex(name: IndexName, definition: String): CreateIndex = + CreateIndex(name = name, definition = Some(definition)) + + def deleteById(index: IndexName, id: DocumentId): DeleteById = + DeleteById(index = index, id = id, refresh = false, routing = None) + + def deleteByQuery(index: IndexName, query: ElasticQuery[_]): DeleteByQuery = + DeleteByQuery(index = index, query = query, refresh = false, routing = None) + + def deleteIndex(name: IndexName): DeleteIndex = + DeleteIndex(name = name) + + def exists(index: IndexName, id: DocumentId): Exists = + Exists(index = index, id = id, routing = None) + + def getById(index: IndexName, id: DocumentId): GetById = + GetById(index = index, id = id, routing = None) + + def search(index: IndexName, query: ElasticQuery[_]): Search = + Search(index = index, query = query, routing = None) - private[elasticsearch] final case class BulkRequest( - requests: List[BulkableRequest], + def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): CreateOrUpdate = + CreateOrUpdate(index = index, id = id, document = Document.from(doc), refresh = false, routing = None) + + sealed trait BulkRequest extends ElasticRequest[Unit] with HasRefresh[Unit] with HasRouting[Unit] + + private[elasticsearch] final case class Bulk( + requests: List[BulkableRequest[_]], index: Option[IndexName], refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[Unit, Bulk] { + ) extends BulkRequest { self => + def refresh(value: Boolean): Bulk = self.copy(refresh = value) + + def refreshFalse: Bulk = refresh(false) + + def refreshTrue: Bulk = refresh(true) + + def routing(value: Routing): Bulk = self.copy(routing = Some(value)) + lazy val body: String = requests.flatMap { r => - // We use @unchecked to ignore 'pattern match not exhaustive' error since we guarantee that it will not happen - // because these are only Bulkable Requests and other matches will not occur. - (r.request: @unchecked) match { - case CreateRequest(index, document, _, maybeRouting) => + r match { + case Create(index, document, _, maybeRouting) => List(getActionAndMeta("create", List(("_index", Some(index)), ("routing", maybeRouting))), document.json) - case CreateWithIdRequest(index, id, document, _, maybeRouting) => + case CreateWithId(index, id, document, _, maybeRouting) => List( getActionAndMeta("create", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), document.json ) - case CreateOrUpdateRequest(index, id, document, _, maybeRouting) => + case CreateOrUpdate(index, id, document, _, maybeRouting) => List( getActionAndMeta("index", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), document.json ) - case DeleteByIdRequest(index, id, _, maybeRouting) => + case DeleteById(index, id, _, maybeRouting) => List(getActionAndMeta("delete", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting)))) } }.mkString(start = "", sep = "\n", end = "\n") } - object BulkRequest { - def of(requests: BulkableRequest*): BulkRequest = - BulkRequest(requests = requests.toList, index = None, refresh = false, routing = None) + object Bulk { + def of(requests: BulkableRequest[_]*): Bulk = + Bulk(requests = requests.toList, index = None, refresh = false, routing = None) } - private[elasticsearch] final case class CreateRequest( + sealed trait CreateRequest extends BulkableRequest[DocumentId] with HasRefresh[DocumentId] with HasRouting[DocumentId] + + private[elasticsearch] final case class Create( index: IndexName, document: Document, refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[DocumentId, Create] + ) extends CreateRequest { self => + def refresh(value: Boolean): Create = self.copy(refresh = value) - private[elasticsearch] final case class CreateWithIdRequest( + def refreshFalse: Create = refresh(false) + + def refreshTrue: Create = refresh(true) + + def routing(value: Routing): Create = self.copy(routing = Some(value)) + } + + sealed trait CreateWithIdRequest + extends BulkableRequest[CreationOutcome] + with HasRefresh[CreationOutcome] + with HasRouting[CreationOutcome] + + private[elasticsearch] final case class CreateWithId( index: IndexName, id: DocumentId, document: Document, refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[CreationOutcome, CreateWithId] + ) extends CreateWithIdRequest { self => + def refresh(value: Boolean): CreateWithId = self.copy(refresh = value) + + def refreshFalse: CreateWithId = refresh(false) + + def refreshTrue: CreateWithId = refresh(true) - private[elasticsearch] final case class CreateIndexRequest( + def routing(value: Routing): CreateWithId = self.copy(routing = Some(value)) + } + + sealed trait CreateIndexRequest extends ElasticRequest[CreationOutcome] + + private[elasticsearch] final case class CreateIndex( name: IndexName, definition: Option[String] - ) extends ElasticRequest[CreationOutcome, CreateIndex] + ) extends CreateIndexRequest + + sealed trait CreateOrUpdateRequest extends BulkableRequest[Unit] with HasRefresh[Unit] with HasRouting[Unit] - private[elasticsearch] final case class CreateOrUpdateRequest( + private[elasticsearch] final case class CreateOrUpdate( index: IndexName, id: DocumentId, document: Document, refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[Unit, Upsert] + ) extends CreateOrUpdateRequest { self => + def refresh(value: Boolean): CreateOrUpdate = self.copy(refresh = value) + + def refreshFalse: CreateOrUpdate = refresh(false) + + def refreshTrue: CreateOrUpdate = refresh(true) + + def routing(value: Routing): CreateOrUpdate = self.copy(routing = Some(value)) + } - private[elasticsearch] final case class DeleteByIdRequest( + sealed trait DeleteByIdRequest + extends BulkableRequest[DeletionOutcome] + with HasRefresh[DeletionOutcome] + with HasRouting[DeletionOutcome] + + private[elasticsearch] final case class DeleteById( index: IndexName, id: DocumentId, refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[DeletionOutcome, DeleteById] + ) extends DeleteByIdRequest { self => + def refresh(value: Boolean): DeleteById = self.copy(refresh = value) + + def refreshFalse: DeleteById = refresh(false) - private[elasticsearch] final case class DeleteByQueryRequest( + def refreshTrue: DeleteById = refresh(true) + + def routing(value: Routing): DeleteById = self.copy(routing = Some(value)) + } + + sealed trait DeleteByQueryRequest + extends ElasticRequest[DeletionOutcome] + with HasRefresh[DeletionOutcome] + with HasRouting[DeletionOutcome] + + private[elasticsearch] final case class DeleteByQuery( index: IndexName, query: ElasticQuery[_], refresh: Boolean, routing: Option[Routing] - ) extends ElasticRequest[DeletionOutcome, DeleteByQuery] + ) extends DeleteByQueryRequest { self => + def refresh(value: Boolean): DeleteByQuery = self.copy(refresh = value) + + def refreshFalse: DeleteByQuery = refresh(false) - private[elasticsearch] final case class DeleteIndexRequest(name: IndexName) - extends ElasticRequest[DeletionOutcome, DeleteIndex] + def refreshTrue: DeleteByQuery = refresh(true) - private[elasticsearch] final case class ExistsRequest( + def routing(value: Routing): DeleteByQuery = self.copy(routing = Some(value)) + } + + sealed trait DeleteIndexRequest extends ElasticRequest[DeletionOutcome] + + final case class DeleteIndex(name: IndexName) extends DeleteIndexRequest + + sealed trait ExistRequest extends ElasticRequest[Boolean] with HasRouting[Boolean] + + private[elasticsearch] final case class Exists( index: IndexName, id: DocumentId, routing: Option[Routing] - ) extends ElasticRequest[Boolean, Exists] + ) extends ExistRequest { self => + def routing(value: Routing): Exists = self.copy(routing = Some(value)) + } + + sealed trait GetByIdRequest extends ElasticRequest[GetResult] with HasRouting[GetResult] - private[elasticsearch] final case class GetByIdRequest( + private[elasticsearch] final case class GetById( index: IndexName, id: DocumentId, routing: Option[Routing] - ) extends ElasticRequest[Option[Document], GetById] + ) extends GetByIdRequest { self => + def routing(value: Routing): GetById = self.copy(routing = Some(value)) + } + + sealed trait GetByQueryRequest extends ElasticRequest[SearchResult] - private[elasticsearch] final case class GetByQueryRequest( + private[elasticsearch] final case class Search( index: IndexName, query: ElasticQuery[_], routing: Option[Routing] - ) extends ElasticRequest[ElasticQueryResponse, GetByQuery] - - private[elasticsearch] final case class Map[A, B, ERT <: ElasticRequestType]( - request: ElasticRequest[A, ERT], - mapper: A => Either[DecodingException, B] - ) extends ElasticRequest[B, ERT] + ) extends GetByQueryRequest private def getActionAndMeta(requestType: String, parameters: List[(String, Any)]): String = parameters.collect { case (name, Some(value)) => s""""$name" : "${value.toString}"""" } @@ -221,28 +255,10 @@ object ElasticRequest { } -sealed trait ElasticRequestType - -sealed trait BulkableRequestType extends ElasticRequestType - -object ElasticRequestType { - sealed trait Bulk extends ElasticRequestType - sealed trait CreateIndex extends ElasticRequestType - sealed trait Create extends BulkableRequestType - sealed trait CreateWithId extends BulkableRequestType - sealed trait DeleteById extends BulkableRequestType - sealed trait DeleteByQuery extends ElasticRequestType - sealed trait DeleteIndex extends ElasticRequestType - sealed trait Exists extends ElasticRequestType - sealed trait GetById extends ElasticRequestType - sealed trait GetByQuery extends ElasticRequestType - sealed trait Upsert extends BulkableRequestType -} - sealed abstract class CreationOutcome -case object Created extends CreationOutcome case object AlreadyExists extends CreationOutcome +case object Created extends CreationOutcome sealed abstract class DeletionOutcome diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala new file mode 100644 index 000000000..68fbc12d1 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticResult.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch + +import zio.prelude.ZValidation +import zio.schema.Schema +import zio.{IO, Task, ZIO} + +sealed trait ElasticResult[F[_]] { + def documentAs[A: Schema]: Task[F[A]] +} + +final class GetResult private[elasticsearch] (private val doc: Option[Item]) extends ElasticResult[Option] { + def documentAs[A: Schema]: IO[DecodingException, Option[A]] = + ZIO + .fromEither(doc match { + case Some(item) => + item.documentAs match { + case Left(e) => Left(DecodingException(s"Could not parse the document: ${e.message}")) + case Right(doc) => Right(Some(doc)) + } + case None => + Right(None) + }) + .mapError(e => DecodingException(s"Could not parse the document: ${e.message}")) +} + +final class SearchResult private[elasticsearch] (private val hits: List[Item]) extends ElasticResult[List] { + def documentAs[A: Schema]: IO[DecodingException, List[A]] = + ZIO.fromEither { + ZValidation.validateAll(hits.map(item => ZValidation.fromEither(item.documentAs))).toEitherWith { errors => + DecodingException(s"Could not parse all documents successfully: ${errors.map(_.message).mkString(",")})") + } + } +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala index a06ab56d9..8ad6a1338 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Elasticsearch.scala @@ -19,17 +19,17 @@ package zio.elasticsearch import zio.{RIO, Task, URLayer, ZIO, ZLayer} trait Elasticsearch { - def execute[A](request: ElasticRequest[A, _]): Task[A] + def execute[A](request: ElasticRequest[A]): Task[A] } object Elasticsearch { - def execute[A](request: ElasticRequest[A, _]): RIO[Elasticsearch, A] = + def execute[A](request: ElasticRequest[A]): RIO[Elasticsearch, A] = ZIO.serviceWithZIO[Elasticsearch](_.execute(request)) lazy val layer: URLayer[ElasticExecutor, Elasticsearch] = ZLayer.fromFunction { executor: ElasticExecutor => new Elasticsearch { - def execute[A](request: ElasticRequest[A, _]): Task[A] = executor.execute(request) + def execute[A](request: ElasticRequest[A]): Task[A] = executor.execute(request) } } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala index e8504ca87..4bd79a4e3 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/HttpElasticExecutor.scala @@ -28,7 +28,10 @@ import sttp.model.StatusCode.{ } import zio.ZIO.logDebug import zio.elasticsearch.ElasticRequest._ -import zio.{Task, ZIO} +import zio.json.ast.Json.{Obj, Str} +import zio.schema.Schema +import zio.stream.{Stream, ZStream} +import zio.{Chunk, Task, ZIO} import scala.collection.immutable.{Map => ScalaMap} @@ -37,23 +40,35 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC import HttpElasticExecutor._ - def execute[A](request: ElasticRequest[A, _]): Task[A] = + def execute[A](request: ElasticRequest[A]): Task[A] = request match { - case r: BulkRequest => executeBulk(r) - case r: CreateRequest => executeCreate(r) - case r: CreateWithIdRequest => executeCreateWithId(r) - case r: CreateIndexRequest => executeCreateIndex(r) - case r: CreateOrUpdateRequest => executeCreateOrUpdate(r) - case r: DeleteByIdRequest => executeDeleteById(r) - case r: DeleteByQueryRequest => executeDeleteByQuery(r) - case r: DeleteIndexRequest => executeDeleteIndex(r) - case r: ExistsRequest => executeExists(r) - case r: GetByIdRequest => executeGetById(r) - case r: GetByQueryRequest => executeGetByQuery(r) - case map @ Map(_, _) => execute(map.request).flatMap(a => ZIO.fromEither(map.mapper(a))) + case r: Bulk => executeBulk(r) + case r: Create => executeCreate(r) + case r: CreateWithId => executeCreateWithId(r) + case r: CreateIndex => executeCreateIndex(r) + case r: CreateOrUpdate => executeCreateOrUpdate(r) + case r: DeleteById => executeDeleteById(r) + case r: DeleteByQuery => executeDeleteByQuery(r) + case r: DeleteIndex => executeDeleteIndex(r) + case r: Exists => executeExists(r) + case r: GetById => executeGetById(r) + case r: Search => executeSearch(r) } - private def executeBulk(r: BulkRequest): Task[Unit] = { + def stream(r: Search): Stream[Throwable, Item] = + ZStream.paginateChunkZIO("") { s => + if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s) + } + + def streamAs[A: Schema](r: Search): Stream[Throwable, A] = + ZStream + .paginateChunkZIO("") { s => + if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s) + } + .map(_.documentAs[A]) + .collectWhileRight + + private def executeBulk(r: Bulk): Task[Unit] = { val uri = (r.index match { case Some(index) => uri"${config.uri}/$index/$Bulk" case None => uri"${config.uri}/$Bulk" @@ -69,7 +84,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeCreate(r: CreateRequest): Task[DocumentId] = { + private def executeCreate(r: Create): Task[DocumentId] = { val uri = uri"${config.uri}/${r.index}/$Doc" .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) @@ -95,7 +110,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } - private def executeCreateWithId(r: CreateWithIdRequest): Task[CreationOutcome] = { + private def executeCreateWithId(r: CreateWithId): Task[CreationOutcome] = { val uri = uri"${config.uri}/${r.index}/$Create/${r.id}" .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) @@ -113,7 +128,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeCreateIndex(createIndex: CreateIndexRequest): Task[CreationOutcome] = + private def executeCreateIndex(createIndex: CreateIndex): Task[CreationOutcome] = sendRequest( request .put(uri"${config.uri}/${createIndex.name}") @@ -127,7 +142,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeCreateOrUpdate(r: CreateOrUpdateRequest): Task[Unit] = { + private def executeCreateOrUpdate(r: CreateOrUpdate): Task[Unit] = { val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}" .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) @@ -139,7 +154,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeDeleteById(r: DeleteByIdRequest): Task[DeletionOutcome] = { + private def executeDeleteById(r: DeleteById): Task[DeletionOutcome] = { val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}" .withParams(getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing)))) @@ -152,7 +167,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeDeleteByQuery(r: DeleteByQueryRequest): Task[DeletionOutcome] = { + private def executeDeleteByQuery(r: DeleteByQuery): Task[DeletionOutcome] = { val uri = uri"${config.uri}/${r.index}/$DeleteByQuery".withParams( getQueryParams(List(("refresh", Some(r.refresh)), ("routing", r.routing))) @@ -172,7 +187,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeDeleteIndex(r: DeleteIndexRequest): Task[DeletionOutcome] = + private def executeDeleteIndex(r: DeleteIndex): Task[DeletionOutcome] = sendRequest(request.delete(uri"${config.uri}/${r.name}")).flatMap { response => response.code match { case HttpOk => ZIO.succeed(Deleted) @@ -181,7 +196,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeExists(r: ExistsRequest): Task[Boolean] = { + private def executeExists(r: Exists): Task[Boolean] = { val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}".withParams(getQueryParams(List(("routing", r.routing)))) sendRequest(request.head(uri)).flatMap { response => @@ -193,7 +208,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC } } - private def executeGetById(r: GetByIdRequest): Task[Option[Document]] = { + private def executeGetById(r: GetById): Task[GetResult] = { val uri = uri"${config.uri}/${r.index}/$Doc/${r.id}".withParams(getQueryParams(List(("routing", r.routing)))) sendRequestWithCustomResponse[ElasticGetResponse]( @@ -202,14 +217,14 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC .response(asJson[ElasticGetResponse]) ).flatMap { response => response.code match { - case HttpOk => ZIO.attempt(response.body.toOption.map(d => Document.from(d.source))) - case HttpNotFound => ZIO.succeed(None) + case HttpOk => ZIO.attempt(new GetResult(response.body.toOption.map(r => Item(r.source)))) + case HttpNotFound => ZIO.succeed(new GetResult(None)) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) } } } - private def executeGetByQuery(r: GetByQueryRequest): Task[ElasticQueryResponse] = + private def executeSearch(r: Search): Task[SearchResult] = sendRequestWithCustomResponse( request .post(uri"${config.uri}/${r.index}/$Search") @@ -221,7 +236,49 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC case HttpOk => response.body.fold( e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), - value => ZIO.succeed(value) + value => ZIO.succeed(new SearchResult(value.results.map(Item))) + ) + case _ => + ZIO.fail(createElasticExceptionFromCustomResponse(response)) + } + } + + private def executeGetByQueryWithScroll(r: Search): Task[(Chunk[Item], Option[String])] = + sendRequestWithCustomResponse( + request + .post( + uri"${config.uri}/${r.index}/$Search".withParams((Scroll, ScrollDefaultDuration)) + ) + .response(asJson[ElasticQueryResponse]) + .contentType(ApplicationJson) + .body(r.query.toJson) + ).flatMap { response => + response.code match { + case HttpOk => + response.body.fold( + e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), + value => ZIO.succeed((Chunk.fromIterable(value.results).map(Item), value.scrollId)) + ) + case _ => + ZIO.fail(createElasticExceptionFromCustomResponse(response)) + } + } + + private def executeGetByScroll(scrollId: String): Task[(Chunk[Item], Option[String])] = + sendRequestWithCustomResponse( + request + .post(uri"${config.uri}/$Search/$Scroll".withParams((Scroll, ScrollDefaultDuration))) + .response(asJson[ElasticQueryResponse]) + .contentType(ApplicationJson) + .body(Obj(ScrollId -> Str(scrollId))) + ).flatMap { response => + response.code match { + case HttpOk => + response.body.fold( + e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), + value => + if (value.results.isEmpty) ZIO.succeed((Chunk.empty, None)) + else ZIO.succeed((Chunk.fromIterable(value.results).map(Item), value.scrollId.orElse(Some(scrollId)))) ) case _ => ZIO.fail(createElasticExceptionFromCustomResponse(response)) @@ -265,11 +322,14 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC private[elasticsearch] object HttpElasticExecutor { - private final val Bulk = "_bulk" - private final val Create = "_create" - private final val DeleteByQuery = "_delete_by_query" - private final val Doc = "_doc" - private final val Search = "_search" + private final val Bulk = "_bulk" + private final val Create = "_create" + private final val DeleteByQuery = "_delete_by_query" + private final val Doc = "_doc" + private final val Search = "_search" + private final val Scroll = "scroll" + private final val ScrollDefaultDuration = "1m" + private final val ScrollId = "scroll_id" def apply(config: ElasticConfig, client: SttpBackend[Task, Any]) = new HttpElasticExecutor(config, client) diff --git a/modules/library/src/main/scala/zio/elasticsearch/Item.scala b/modules/library/src/main/scala/zio/elasticsearch/Item.scala new file mode 100644 index 000000000..760e37c79 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/Item.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch + +import zio.json.ast.Json +import zio.schema.Schema +import zio.schema.codec.DecodeError +import zio.schema.codec.JsonCodec.JsonDecoder + +final case class Item(raw: Json) { + def documentAs[A](implicit schema: Schema[A]): Either[DecodeError, A] = JsonDecoder.decode(schema, raw.toString) +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala b/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala deleted file mode 100644 index 095947511..000000000 --- a/modules/library/src/main/scala/zio/elasticsearch/Refresh.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2022 LambdaWorks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package zio.elasticsearch - -import zio.elasticsearch.ElasticRequest._ -import zio.elasticsearch.ElasticRequestType._ - -object Refresh { - - trait WithRefresh[ERT <: ElasticRequestType] { - def withRefresh[A](request: ElasticRequest[A, ERT], value: Boolean): ElasticRequest[A, ERT] - } - - object WithRefresh { - implicit val bulkWithRefresh: WithRefresh[Bulk] = new WithRefresh[Bulk] { - def withRefresh[A](request: ElasticRequest[A, Bulk], value: Boolean): ElasticRequest[A, Bulk] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: BulkRequest => r.copy(refresh = value) - } - } - - implicit val createWithRefresh: WithRefresh[Create] = new WithRefresh[Create] { - def withRefresh[A](request: ElasticRequest[A, Create], value: Boolean): ElasticRequest[A, Create] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateRequest => r.copy(refresh = value) - } - } - - implicit val createWithIdWithRefresh: WithRefresh[CreateWithId] = new WithRefresh[CreateWithId] { - def withRefresh[A](request: ElasticRequest[A, CreateWithId], value: Boolean): ElasticRequest[A, CreateWithId] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateWithIdRequest => r.copy(refresh = value) - } - } - - implicit val deleteByIdWithRefresh: WithRefresh[DeleteById] = new WithRefresh[DeleteById] { - def withRefresh[A](request: ElasticRequest[A, DeleteById], value: Boolean): ElasticRequest[A, DeleteById] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: DeleteByIdRequest => r.copy(refresh = value) - } - } - - implicit val deleteByQueryWithRefresh: WithRefresh[DeleteByQuery] = new WithRefresh[DeleteByQuery] { - def withRefresh[A](request: ElasticRequest[A, DeleteByQuery], value: Boolean): ElasticRequest[A, DeleteByQuery] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: DeleteByQueryRequest => r.copy(refresh = value) - } - } - - implicit val upsertWithRefresh: WithRefresh[Upsert] = new WithRefresh[Upsert] { - def withRefresh[A](request: ElasticRequest[A, Upsert], value: Boolean): ElasticRequest[A, Upsert] = - request match { - case Map(r, mapper) => Map(withRefresh(r, value), mapper) - case r: CreateOrUpdateRequest => r.copy(refresh = value) - } - } - } -} diff --git a/modules/library/src/main/scala/zio/elasticsearch/Routing.scala b/modules/library/src/main/scala/zio/elasticsearch/Routing.scala index 28cc7cd9b..6f82fc2d8 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/Routing.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/Routing.scala @@ -16,17 +16,6 @@ package zio.elasticsearch -import zio.elasticsearch.ElasticRequest._ -import zio.elasticsearch.ElasticRequestType.{ - Bulk, - Create, - CreateWithId, - DeleteById, - DeleteByQuery, - Exists, - GetById, - Upsert -} import zio.prelude.Assertion.isEmptyString import zio.prelude.Newtype @@ -34,77 +23,4 @@ object Routing extends Newtype[String] { override def assertion = assert(!isEmptyString) // scalafix:ok type Routing = Routing.Type - - trait WithRouting[ERT <: ElasticRequestType] { - def withRouting[A](request: ElasticRequest[A, ERT], routing: Routing): ElasticRequest[A, ERT] - } - - object WithRouting { - implicit val bulkWithRouting: WithRouting[Bulk] = new WithRouting[Bulk] { - def withRouting[A](request: ElasticRequest[A, Bulk], routing: Routing): ElasticRequest[A, Bulk] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: BulkRequest => r.copy(routing = Some(routing)) - } - } - - implicit val createWithRouting: WithRouting[Create] = new WithRouting[Create] { - def withRouting[A](request: ElasticRequest[A, Create], routing: Routing): ElasticRequest[A, Create] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateRequest => r.copy(routing = Some(routing)) - } - } - - implicit val createWithIdWithRouting: WithRouting[CreateWithId] = new WithRouting[CreateWithId] { - def withRouting[A](request: ElasticRequest[A, CreateWithId], routing: Routing): ElasticRequest[A, CreateWithId] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateWithIdRequest => r.copy(routing = Some(routing)) - } - } - - implicit val deleteByIdWithRouting: WithRouting[DeleteById] = new WithRouting[DeleteById] { - def withRouting[A](request: ElasticRequest[A, DeleteById], routing: Routing): ElasticRequest[A, DeleteById] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: DeleteByIdRequest => r.copy(routing = Some(routing)) - } - } - - implicit val deleteByQueryWithRouting: WithRouting[DeleteByQuery] = new WithRouting[DeleteByQuery] { - def withRouting[A]( - request: ElasticRequest[A, DeleteByQuery], - routing: Routing - ): ElasticRequest[A, DeleteByQuery] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: DeleteByQueryRequest => r.copy(routing = Some(routing)) - } - } - - implicit val existsWithRouting: WithRouting[Exists] = new WithRouting[Exists] { - def withRouting[A](request: ElasticRequest[A, Exists], routing: Routing): ElasticRequest[A, Exists] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: ExistsRequest => r.copy(routing = Some(routing)) - } - } - - implicit val getByIdWithRouting: WithRouting[GetById] = new WithRouting[GetById] { - def withRouting[A](request: ElasticRequest[A, GetById], routing: Routing): ElasticRequest[A, GetById] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: GetByIdRequest => r.copy(routing = Some(routing)) - } - } - - implicit val upsertWithRouting: WithRouting[Upsert] = new WithRouting[Upsert] { - def withRouting[A](request: ElasticRequest[A, Upsert], routing: Routing): ElasticRequest[A, Upsert] = - request match { - case Map(r, mapper) => Map(withRouting(r, routing), mapper) - case r: CreateOrUpdateRequest => r.copy(routing = Some(routing)) - } - } - } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/package.scala b/modules/library/src/main/scala/zio/elasticsearch/package.scala index 353380e4e..78c8d0466 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/package.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/package.scala @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils._ import zio.prelude.AssertionError.failure import zio.prelude.Newtype +import zio.schema.Schema package object elasticsearch { private[elasticsearch] class ElasticException(message: String) extends RuntimeException(message) @@ -61,4 +62,7 @@ package object elasticsearch { def containsAny(name: String, params: List[String]): Boolean = params.exists(StringUtils.contains(name, _)) + final implicit class ZIOResultOps[R, F[_]](zio: RIO[R, ElasticResult[F]]) { + def documentAs[A: Schema]: RIO[R, F[A]] = zio.flatMap(_.documentAs[A]) + } } diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 71699cc86..1ebb33b70 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -306,11 +306,13 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticExecutor.execute( - ElasticRequest - .getById[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) - .routing(Routing("routing")) - ) + addStubMapping *> ElasticExecutor + .execute( + ElasticRequest + .getById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) + .routing(Routing("routing")) + ) + .documentAs[GitHubRepo] )(isSome(equalTo(repo))) }, test("getting by query request") { @@ -361,7 +363,9 @@ object HttpElasticExecutorSpec extends WireMockSpec { ) assertZIO( - addStubMapping *> ElasticExecutor.execute(ElasticRequest.search[GitHubRepo](index = index, query = matchAll)) + addStubMapping *> ElasticExecutor + .execute(ElasticRequest.search(index = index, query = matchAll)) + .documentAs[GitHubRepo] )( equalTo(List(repo)) ) diff --git a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala index 7f177dc1d..0ea88bc54 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala @@ -18,7 +18,7 @@ package zio.elasticsearch import zio.Scope import zio.elasticsearch.ElasticQuery._ -import zio.elasticsearch.ElasticRequest.BulkRequest +import zio.elasticsearch.ElasticRequest.Bulk import zio.elasticsearch.utils._ import zio.prelude.Newtype.unsafeWrap import zio.prelude.Validation @@ -1073,8 +1073,8 @@ object QueryDSLSpec extends ZIOSpecDefault { val req4 = ElasticRequest.deleteById(index, DocumentId("1VNzFt2XUFZfXZheDc")).routing(unsafeWrap(Routing)(user.id)) ElasticRequest.bulk(req1, req2, req3, req4) match { - case r: BulkRequest => Some(r.body) - case _ => None + case r: Bulk => Some(r.body) + case _ => None } }