diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala index 7dc4149c4..596a7d302 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticRequest.scala @@ -109,18 +109,20 @@ object ElasticRequest { // We use @unchecked to ignore 'pattern match not exhaustive' error since we guarantee that it will not happen // because these are only Bulkable Requests and other matches will not occur. (r.request: @unchecked) match { - case CreateRequest(index, document, _, _) => - val actionAndMeta = s"""{ "create" : { "_index" : "$index" } }""" - List(actionAndMeta, document.json) - case CreateWithIdRequest(index, id, document, _, _) => - val actionAndMeta = s"""{ "create" : { "_index" : "$index", "_id" : "$id" } }""" - List(actionAndMeta, document.json) - case CreateOrUpdateRequest(index, id, document, _, _) => - val actionAndMeta = s"""{ "index" : { "_index" : "$index", "_id" : "$id" } }""" - List(actionAndMeta, document.json) - case DeleteByIdRequest(index, id, _, _) => - val actionAndMeta = s"""{ "delete" : { "_index" : "$index", "_id" : "$id" } }""" - List(actionAndMeta) + case CreateRequest(index, document, _, maybeRouting) => + List(getActionAndMeta("create", List(("_index", Some(index)), ("routing", maybeRouting))), document.json) + case CreateWithIdRequest(index, id, document, _, maybeRouting) => + List( + getActionAndMeta("create", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), + document.json + ) + case CreateOrUpdateRequest(index, id, document, _, maybeRouting) => + List( + getActionAndMeta("index", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting))), + document.json + ) + case DeleteByIdRequest(index, id, _, maybeRouting) => + List(getActionAndMeta("delete", List(("_index", Some(index)), ("_id", Some(id)), ("routing", maybeRouting)))) } }.mkString(start = "", sep = "\n", end = "\n") } @@ -197,6 +199,10 @@ object ElasticRequest { mapper: A => Either[DecodingException, B] ) extends ElasticRequest[B, ERT] + private def getActionAndMeta(requestType: String, parameters: List[(String, Any)]): String = + parameters.collect { case (name, Some(value)) => s""""$name" : "${value.toString}"""" } + .mkString(s"""{ "$requestType" : { """, ", ", " } }") + } sealed trait ElasticRequestType diff --git a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala index 5abaf1b70..4d1dce68d 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/HttpElasticExecutorSpec.scala @@ -12,27 +12,65 @@ object HttpElasticExecutorSpec extends WiremockSpec { override def spec: Spec[TestEnvironment, Any] = suite("HttpElasticExecutor")( + suite("bulk request") { + test("successfully execute bulk request") { + server.addStubMapping( + post(urlEqualTo("/_bulk?refresh=true")) + .willReturn( + aResponse + .withBody( + """ + |{ + | "took" = 3, + | "errors" = false, + | "items" = [ + | { + | "create": { + | "_index": "repositories", + | "_type": "_doc", + | "_id": "123", + | "_version": 1, + | "result": "created", + | "_shards": { + | "total": 1, + | "successful": 1, + | "failed": 0 + | }, + | "_seq_no": 0, + | "_primary_term": 1, + | "status": 201 + | } + | } + | ] + |}""".stripMargin + ) + .withStatus(StatusCode.Ok.code) + ) + .build + ) + + assertZIO(ElasticRequest.bulk(ElasticRequest.create(index, repo)).refreshTrue.execute)(isUnit) + } + }, suite("creating document request") { test("return document ID") { server.addStubMapping( post(urlEqualTo("/repositories/_doc?refresh=true&routing=routing")) .willReturn( aResponse - .withBody(""" - |{ - | "_id": "V4x8q4UB3agN0z75fv5r" - |}""".stripMargin) + .withBody( + """ + |{ + | "_id": "V4x8q4UB3agN0z75fv5r" + |}""".stripMargin + ) .withStatus(StatusCode.Created.code) ) .build ) assertZIO( - ElasticRequest - .create[GitHubRepo](index = index, doc = repo) - .routing(Routing("routing")) - .refresh(value = true) - .execute + ElasticRequest.create[GitHubRepo](index = index, doc = repo).routing(Routing("routing")).refreshTrue.execute )(equalTo(DocumentId("V4x8q4UB3agN0z75fv5r"))) } }, @@ -48,7 +86,7 @@ object HttpElasticExecutorSpec extends WiremockSpec { ElasticRequest .create[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) .routing(Routing("routing")) - .refresh(value = true) + .refreshTrue .execute )(equalTo(CreationOutcome.Created)) } @@ -56,9 +94,7 @@ object HttpElasticExecutorSpec extends WiremockSpec { suite("creating index request") { test("return Created outcome") { server.addStubMapping( - put(urlEqualTo("/repositories")) - .willReturn(aResponse.withStatus(StatusCode.Ok.code)) - .build + put(urlEqualTo("/repositories")).willReturn(aResponse.withStatus(StatusCode.Ok.code)).build ) assertZIO(ElasticRequest.createIndex(name = index, definition = None).execute)( @@ -78,7 +114,7 @@ object HttpElasticExecutorSpec extends WiremockSpec { ElasticRequest .upsert[GitHubRepo](index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r"), doc = repo) .routing(Routing("routing")) - .refresh(value = true) + .refreshTrue .execute )(isUnit) } @@ -95,7 +131,7 @@ object HttpElasticExecutorSpec extends WiremockSpec { ElasticRequest .deleteById(index = index, id = DocumentId("V4x8q4UB3agN0z75fv5r")) .routing(Routing("routing")) - .refresh(value = true) + .refreshTrue .execute )(equalTo(DeletionOutcome.Deleted)) } @@ -108,9 +144,9 @@ object HttpElasticExecutorSpec extends WiremockSpec { .build ) - assertZIO( - ElasticRequest.deleteByQuery(index = index, query = matchAll()).refresh(value = true).execute - )(equalTo(DeletionOutcome.Deleted)) + assertZIO(ElasticRequest.deleteByQuery(index = index, query = matchAll()).refreshTrue.execute)( + equalTo(DeletionOutcome.Deleted) + ) } }, suite("deleting index request") { diff --git a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala index d59189e0d..8b192dbdd 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/QueryDSLSpec.scala @@ -4,6 +4,7 @@ import zio.Scope import zio.elasticsearch.ElasticQuery._ import zio.elasticsearch.ElasticRequest.BulkRequest import zio.elasticsearch.utils._ +import zio.prelude.Newtype.unsafeWrap import zio.schema.{DeriveSchema, Schema} import zio.test.Assertion.{equalTo, isSome} import zio.test._ @@ -496,24 +497,29 @@ object QueryDSLSpec extends ZIOSpecDefault { val user = UserDocument(id = "WeeMwR5d5", name = "Name", address = "Address", balance = 1000, age = 24) val req1 = - ElasticRequest.create[UserDocument](index, DocumentId("ETux1srpww2ObCx"), user.copy(age = 39)) - val req2 = ElasticRequest.create[UserDocument](index, user) + ElasticRequest + .create[UserDocument](index, DocumentId("ETux1srpww2ObCx"), user.copy(age = 39)) + .routing(unsafeWrap(Routing)(user.id)) + val req2 = ElasticRequest.create[UserDocument](index, user).routing(unsafeWrap(Routing)(user.id)) val req3 = - ElasticRequest.upsert[UserDocument](index, DocumentId("yMyEG8iFL5qx"), user.copy(balance = 3000)) - val req4 = ElasticRequest.deleteById(index, DocumentId("1VNzFt2XUFZfXZheDc")) + ElasticRequest + .upsert[UserDocument](index, DocumentId("yMyEG8iFL5qx"), user.copy(balance = 3000)) + .routing(unsafeWrap(Routing)(user.id)) + val req4 = + ElasticRequest.deleteById(index, DocumentId("1VNzFt2XUFZfXZheDc")).routing(unsafeWrap(Routing)(user.id)) val bulkQuery = ElasticRequest.bulk(req1, req2, req3, req4) match { case r: BulkRequest => Some(r.body) case _ => None } val expectedBody = - """|{ "create" : { "_index" : "users", "_id" : "ETux1srpww2ObCx" } } + """|{ "create" : { "_index" : "users", "_id" : "ETux1srpww2ObCx", "routing" : "WeeMwR5d5" } } |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":39} - |{ "create" : { "_index" : "users" } } + |{ "create" : { "_index" : "users", "routing" : "WeeMwR5d5" } } |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":1000.0,"age":24} - |{ "index" : { "_index" : "users", "_id" : "yMyEG8iFL5qx" } } + |{ "index" : { "_index" : "users", "_id" : "yMyEG8iFL5qx", "routing" : "WeeMwR5d5" } } |{"id":"WeeMwR5d5","name":"Name","address":"Address","balance":3000.0,"age":24} - |{ "delete" : { "_index" : "users", "_id" : "1VNzFt2XUFZfXZheDc" } } + |{ "delete" : { "_index" : "users", "_id" : "1VNzFt2XUFZfXZheDc", "routing" : "WeeMwR5d5" } } |""".stripMargin assert(bulkQuery)(isSome(equalTo(expectedBody)))