diff --git a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala index 29bc3a673..94c10e87e 100644 --- a/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/library/src/it/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -1436,11 +1436,21 @@ object HttpExecutorSpec extends IntegrationSpec { firstDocumentId, Script("ctx._source.intField = params['factor']").withParams("factor" -> 100) ) - res <- Executor.execute(ElasticRequest.bulk(req1, req2, req3, req4, req5, req6).refreshTrue) + req7 = + ElasticRequest + .update[TestDocument](index, DocumentId("invalid-document-id"), document.copy(intField = 100)) + res <- + Executor.execute(ElasticRequest.bulk(req1, req2, req3, req4, req5, req6, req7).refreshTrue) doc1 <- Executor.execute(ElasticRequest.getById(index, firstDocumentId)).documentAs[TestDocument] doc2 <- Executor.execute(ElasticRequest.getById(index, secondDocumentId)).documentAs[TestDocument] doc3 <- Executor.execute(ElasticRequest.getById(index, thirdDocumentId)).documentAs[TestDocument] - } yield assert(res)(isUnit) && assert(doc3)(isSome(equalTo(document.copy(intField = 100)))) && + } yield assert(res.items.size)(equalTo(7)) && + assert(res.items.map(_.error.isDefined))( + equalTo(List(false, false, false, false, false, false, true)) + ) && + assert(res.items(6).status)(equalTo(Some(404))) && + assert(res.items(6).error.map(_.`type`))(equalTo(Some("document_missing_exception"))) && + assert(doc3)(isSome(equalTo(document.copy(intField = 100)))) && assert(doc2)(isNone) && assert(doc1)( isSome(equalTo(document.copy(doubleField = 3000, intField = 100))) ) diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index 4b1083eba..1f8832a8d 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -19,6 +19,7 @@ package zio.elasticsearch import zio.Chunk import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps import zio.elasticsearch.aggregation.ElasticAggregation +import zio.elasticsearch.executor.response.BulkResponse import zio.elasticsearch.highlights.Highlights import zio.elasticsearch.query.ElasticQuery import zio.elasticsearch.query.sort.Sort @@ -361,7 +362,10 @@ object ElasticRequest { private[elasticsearch] final case class Aggregate(index: IndexName, aggregation: ElasticAggregation) extends AggregateRequest - sealed trait BulkRequest extends ElasticRequest[Unit] with HasRefresh[BulkRequest] with HasRouting[BulkRequest] + sealed trait BulkRequest + extends ElasticRequest[BulkResponse] + with HasRefresh[BulkRequest] + with HasRouting[BulkRequest] private[elasticsearch] final case class Bulk( requests: List[BulkableRequest[_]], diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/HttpExecutor.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/HttpExecutor.scala index bf52b548b..184f47e64 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/executor/HttpExecutor.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/HttpExecutor.scala @@ -33,6 +33,7 @@ import zio.ZIO.logDebug import zio.elasticsearch.ElasticRequest._ import zio.elasticsearch._ import zio.elasticsearch.executor.response.{ + BulkResponse, CountResponse, CreateResponse, DocumentWithHighlightsAndSort, @@ -126,18 +127,26 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig } } - private def executeBulk(r: Bulk): Task[Unit] = { + private def executeBulk(r: Bulk): Task[BulkResponse] = { val uri = (r.index match { case Some(index) => uri"${esConfig.uri}/$index/$Bulk" case None => uri"${esConfig.uri}/$Bulk" }).withParams(getQueryParams(List(("refresh", r.refresh), ("routing", r.routing)))) - sendRequest( - baseRequest.post(uri).contentType(ApplicationJson).body(r.body) + sendRequestWithCustomResponse( + baseRequest + .post(uri) + .contentType(ApplicationJson) + .body(r.body) + .response(asJson[BulkResponse]) ).flatMap { response => response.code match { - case HttpOk => ZIO.unit - case _ => ZIO.fail(handleFailures(response)) + case HttpOk => + response.body.fold( + e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")), + value => ZIO.succeed(value) + ) + case _ => ZIO.fail(handleFailuresFromCustomResponse(response)) } } } diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/BulkResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/BulkResponse.scala new file mode 100644 index 000000000..1dcf397dc --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/BulkResponse.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch.executor.response + +import zio.json.{DeriveJsonDecoder, JsonDecoder} + +final case class BulkResponse private[elasticsearch] ( + took: Int, + errors: Boolean, + items: List[Item] +) + +private[elasticsearch] object BulkResponse { + implicit val decoder: JsonDecoder[BulkResponse] = DeriveJsonDecoder.gen[BulkResponse] +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/Error.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Error.scala new file mode 100644 index 000000000..f531d7881 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Error.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch.executor.response + +import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField} + +final case class Error private[elasticsearch] ( + `type`: String, + reason: String, + @jsonField("index_uuid") + indexUuid: String, + shard: String, + index: String +) + +private[elasticsearch] object Error { + implicit val decoder: JsonDecoder[Error] = DeriveJsonDecoder.gen[Error] +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/Item.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Item.scala new file mode 100644 index 000000000..5f39ea1e1 --- /dev/null +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Item.scala @@ -0,0 +1,109 @@ +/* + * Copyright 2022 LambdaWorks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.elasticsearch.executor.response + +import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField, jsonHint} + +sealed trait Item { + def index: String + def id: String + def version: Option[Int] + def result: Option[String] + def shards: Option[Shards] + def status: Option[Int] + def error: Option[Error] +} + +@jsonHint("create") +final case class Create private[elasticsearch] ( + @jsonField("_index") + index: String, + @jsonField("_id") + id: String, + @jsonField("_version") + version: Option[Int], + result: Option[String], + @jsonField("_shards") + shards: Option[Shards], + status: Option[Int], + error: Option[Error] +) extends Item + +private[elasticsearch] object Create { + implicit val decoder: JsonDecoder[Create] = DeriveJsonDecoder.gen[Create] +} + +@jsonHint("delete") +final case class Delete private[elasticsearch] ( + @jsonField("_index") + index: String, + @jsonField("_id") + id: String, + @jsonField("_version") + version: Option[Int], + result: Option[String], + @jsonField("_shards") + shards: Option[Shards], + status: Option[Int], + error: Option[Error] +) extends Item + +private[elasticsearch] object Delete { + implicit val decoder: JsonDecoder[Delete] = DeriveJsonDecoder.gen[Delete] +} + +@jsonHint("index") +final case class Index private[elasticsearch] ( + @jsonField("_index") + index: String, + @jsonField("_id") + id: String, + @jsonField("_version") + version: Option[Int], + result: Option[String], + @jsonField("_shards") + shards: Option[Shards], + status: Option[Int], + error: Option[Error] +) extends Item + +private[elasticsearch] object Index { + implicit val decoder: JsonDecoder[Index] = DeriveJsonDecoder.gen[Index] +} + +@jsonHint("update") +final case class Update private[elasticsearch] ( + @jsonField("_index") + index: String, + @jsonField("_id") + id: String, + @jsonField("_version") + version: Option[Int], + result: Option[String], + @jsonField("_shards") + shards: Option[Shards], + status: Option[Int], + error: Option[Error] +) extends Item + +private[elasticsearch] object Update { + implicit val decoder: JsonDecoder[Update] = DeriveJsonDecoder.gen[Update] +} + +private[elasticsearch] object Item { + implicit val decoder: JsonDecoder[Item] = DeriveJsonDecoder.gen[Item] +} diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/Shards.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Shards.scala index 04ba9bab6..dbce62b84 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/executor/response/Shards.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/Shards.scala @@ -21,7 +21,7 @@ import zio.json.{DeriveJsonDecoder, JsonDecoder} private[elasticsearch] final case class Shards( total: Int, successful: Int, - skipped: Int, + skipped: Int = 0, failed: Int ) diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 8c6e4073e..e8239bf4c 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -20,7 +20,13 @@ import zio.elasticsearch.ElasticAggregation.termsAggregation import zio.elasticsearch.ElasticQuery.{matchAll, term} import zio.elasticsearch.domain.TestDocument import zio.elasticsearch.executor.Executor -import zio.elasticsearch.executor.response.{TermsAggregationBucket, TermsAggregationResponse} +import zio.elasticsearch.executor.response.{ + BulkResponse, + Create, + Shards, + TermsAggregationBucket, + TermsAggregationResponse +} import zio.elasticsearch.request.CreationOutcome.Created import zio.elasticsearch.request.DeletionOutcome.Deleted import zio.elasticsearch.request.UpdateConflicts.Proceed @@ -48,7 +54,30 @@ object HttpElasticExecutorSpec extends SttpBackendStubSpec { assertZIO( Executor.execute(ElasticRequest.bulk(ElasticRequest.create(index, doc)).refreshTrue) )( - isUnit + equalTo( + BulkResponse( + took = 3, + errors = false, + items = List( + Create( + index = "repositories", + id = "123", + version = Some(1), + result = Some("created"), + shards = Some( + Shards( + total = 1, + successful = 1, + failed = 0, + skipped = 0 + ) + ), + status = Some(201), + error = None + ) + ) + ) + ) ) }, test("count request") { diff --git a/modules/library/src/test/scala/zio/elasticsearch/SttpBackendStubSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/SttpBackendStubSpec.scala index 185963169..052523de5 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/SttpBackendStubSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/SttpBackendStubSpec.scala @@ -61,9 +61,9 @@ trait SttpBackendStubSpec extends ZIOSpecDefault { response = Response( """ |{ - | "took" = 3, - | "errors" = false, - | "items" = [ + | "took" : 3, + | "errors" : false, + | "items" : [ | { | "create": { | "_index": "repositories",