Skip to content

Commit

Permalink
Fix code remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulaja98 committed Apr 12, 2023
1 parent fbc4fd3 commit 8d445b9
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import zio.elasticsearch.ElasticQuery._
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.domain.{TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
import zio.elasticsearch.executor.response.UpdateByQueryResponse
import zio.elasticsearch.query.sort.SortMode.Max
import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
import zio.elasticsearch.result.Item
import zio.elasticsearch.result.{Item, UpdateByQueryResult}
import zio.elasticsearch.script.Script
import zio.json.ast.Json.{Arr, Str}
import zio.stream.{Sink, ZSink}
Expand Down Expand Up @@ -1314,7 +1313,7 @@ object HttpExecutorSpec extends IntegrationSpec {
)
updateRes <- Executor.execute(
ElasticRequest
.updateByQuery(
.updateAllByQuery(
updateByQueryIndex,
Script("ctx._source['stringField'] = params['str']").withParams("str" -> stringField)
)
Expand All @@ -1323,13 +1322,7 @@ object HttpExecutorSpec extends IntegrationSpec {
doc <- Executor.execute(ElasticRequest.getById(updateByQueryIndex, documentId)).documentAs[TestDocument]
} yield assert(updateRes)(
equalTo(
UpdateByQueryResponse(
took = updateRes.took,
total = 1,
updated = 1,
deleted = 0,
versionConflicts = 0
)
UpdateByQueryResult(took = updateRes.took, total = 1, updated = 1, deleted = 0, versionConflicts = 0)
)
) && assert(doc)(isSome(equalTo(document.copy(stringField = stringField))))
}
Expand All @@ -1345,22 +1338,18 @@ object HttpExecutorSpec extends IntegrationSpec {
updateRes <-
Executor.execute(
ElasticRequest
.updateByQuery(updateByQueryIndex, Script("ctx._source['intField']++"))
.query(
term(field = TestDocument.stringField, multiField = Some("keyword"), value = "StringField")
.updateByQuery(
index = updateByQueryIndex,
query =
term(field = TestDocument.stringField, multiField = Some("keyword"), value = "StringField"),
script = Script("ctx._source['intField']++")
)
.refreshTrue
)
doc <- Executor.execute(ElasticRequest.getById(updateByQueryIndex, documentId)).documentAs[TestDocument]
} yield assert(updateRes)(
equalTo(
UpdateByQueryResponse(
took = updateRes.took,
total = 1,
updated = 1,
deleted = 0,
versionConflicts = 0
)
UpdateByQueryResult(took = updateRes.took, total = 1, updated = 1, deleted = 0, versionConflicts = 0)
)
) && assert(doc)(isSome(equalTo(newDocument.copy(intField = newDocument.intField + 1))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package zio.elasticsearch

import zio.elasticsearch.ElasticPrimitive.ElasticPrimitiveOps
import zio.elasticsearch.aggregation.ElasticAggregation
import zio.elasticsearch.executor.response.UpdateByQueryResponse
import zio.elasticsearch.highlights.Highlights
import zio.elasticsearch.query.ElasticQuery
import zio.elasticsearch.query.sort.Sort
import zio.elasticsearch.request._
import zio.elasticsearch.result.{AggregationResult, GetResult, SearchAndAggregateResult, SearchResult}
import zio.elasticsearch.result.{
AggregationResult,
GetResult,
SearchAndAggregateResult,
SearchResult,
UpdateByQueryResult
}
import zio.elasticsearch.script.Script
import zio.json.ast.Json
import zio.json.ast.Json.{Arr, Obj}
Expand Down Expand Up @@ -110,9 +115,12 @@ object ElasticRequest {
upsert = None
)

def updateByQuery(index: IndexName, script: Script): UpdateByQueryRequest =
def updateAllByQuery(index: IndexName, script: Script): UpdateByQueryRequest =
UpdateByQuery(index = index, script = script, conflicts = None, query = None, refresh = None, routing = None)

def updateByQuery(index: IndexName, query: ElasticQuery[_], script: Script): UpdateByQueryRequest =
UpdateByQuery(index = index, script = script, conflicts = None, query = Some(query), refresh = None, routing = None)

def updateByScript(index: IndexName, id: DocumentId, script: Script): UpdateRequest =
Update(index = index, id = id, doc = None, refresh = None, routing = None, script = Some(script), upsert = None)

Expand Down Expand Up @@ -486,15 +494,13 @@ object ElasticRequest {
}

sealed trait UpdateByQueryRequest
extends ElasticRequest[UpdateByQueryResponse]
extends ElasticRequest[UpdateByQueryResult]
with HasRefresh[UpdateByQueryRequest]
with HasRouting[UpdateByQueryRequest] {
def conflicts(value: UpdateConflicts): UpdateByQueryRequest

def query(value: ElasticQuery[_]): UpdateByQueryRequest
}

private[elasticsearch] case class UpdateByQuery(
private[elasticsearch] final case class UpdateByQuery(
index: IndexName,
script: Script,
conflicts: Option[UpdateConflicts],
Expand All @@ -505,9 +511,6 @@ object ElasticRequest {
def conflicts(value: UpdateConflicts): UpdateByQueryRequest =
self.copy(conflicts = Some(value))

def query(value: ElasticQuery[_]): UpdateByQueryRequest =
self.copy(query = Some(value))

def refresh(value: Boolean): UpdateByQueryRequest =
self.copy(refresh = Some(value))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
}
}

private def executeUpdateByQuery(r: UpdateByQuery): Task[UpdateByQueryResponse] =
private def executeUpdateByQuery(r: UpdateByQuery): Task[UpdateByQueryResult] =
sendRequestWithCustomResponse(
baseRequest
.post(
Expand All @@ -543,7 +543,7 @@ private[elasticsearch] final class HttpExecutor private (esConfig: ElasticConfig
case HttpOk =>
response.body.fold(
e => ZIO.fail(new ElasticException(s"Exception occurred: ${e.getMessage}")),
value => ZIO.succeed(value)
value => ZIO.succeed(UpdateByQueryResult(value))
)
case HttpConflict => ZIO.fail(VersionConflictException)
case _ => ZIO.fail(handleFailuresFromCustomResponse(response))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package zio.elasticsearch.executor.response

import zio.json.{DeriveJsonDecoder, JsonDecoder, jsonField}

final case class UpdateByQueryResponse(
private[elasticsearch] final case class UpdateByQueryResponse(
took: Int,
total: Int,
updated: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object UpdateConflicts {
case object Abort extends UpdateConflicts {
override def toString: String = "abort"
}

case object Proceed extends UpdateConflicts {
override def toString: String = "proceed"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2022 LambdaWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.elasticsearch.result

import zio.elasticsearch.executor.response.UpdateByQueryResponse

final case class UpdateByQueryResult(
took: Int,
total: Int,
updated: Int,
deleted: Int,
versionConflicts: Int
)

object UpdateByQueryResult {
def apply(updateByQueryResponse: UpdateByQueryResponse): UpdateByQueryResult =
UpdateByQueryResult(
updateByQueryResponse.took,
updateByQueryResponse.total,
updateByQueryResponse.updated,
updateByQueryResponse.deleted,
updateByQueryResponse.versionConflicts
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ package object result {

final case class DecodingException(message: String) extends ElasticException(message)

final case object UnauthorizedException extends ElasticException("Wrong credentials provided.")
case object UnauthorizedException extends ElasticException("Wrong credentials provided.")

final case object VersionConflictException extends ElasticException("There are some conflicts in versions.")
final case class VersionConflictException(succeeded: Int, failed: Int)
extends ElasticException(s"There are $failed conflicts in versions. Only $succeeded documents are updated.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,9 @@ object ElasticRequestsDSLSpec extends ZIOSpecDefault {
test("successfully encode update by query request with query to JSON") {
val jsonRequest = updateByQuery(
index = index,
query = term(TestDocument.stringField, Some("keyword"), "StringField"),
script = Script("ctx._source['intField']++")
).query(term(TestDocument.stringField, Some("keyword"), "StringField")) match {
case r: UpdateByQuery => r.toJson
}
) match { case r: UpdateByQuery => r.toJson }

val expected =
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package zio.elasticsearch

import zio.elasticsearch.ElasticAggregation.termsAggregation
import zio.elasticsearch.ElasticQuery.matchAll
import zio.elasticsearch.ElasticQuery.{matchAll, term}
import zio.elasticsearch.domain.TestDocument
import zio.elasticsearch.executor.Executor
import zio.elasticsearch.executor.response.{TermsAggregationBucket, TermsAggregationResponse, UpdateByQueryResponse}
import zio.elasticsearch.executor.response.{TermsAggregationBucket, TermsAggregationResponse}
import zio.elasticsearch.request.CreationOutcome.Created
import zio.elasticsearch.request.DeletionOutcome.Deleted
import zio.elasticsearch.request.UpdateConflicts.Proceed
import zio.elasticsearch.request.UpdateOutcome
import zio.elasticsearch.result.UpdateByQueryResult
import zio.elasticsearch.script.Script
import zio.test.Assertion._
import zio.test.{Spec, TestEnvironment, TestResultZIOOps, assertZIO}
Expand Down Expand Up @@ -207,16 +208,31 @@ object HttpElasticExecutorSpec extends SttpBackendStubSpec {
)
)(equalTo(UpdateOutcome.Updated))
},
test("update all by query request") {
assertZIO(
Executor.execute(
ElasticRequest
.updateAllByQuery(index = index, script = Script("ctx._source['intField']++"))
.conflicts(Proceed)
.routing(Routing("routing"))
.refreshTrue
)
)(equalTo(UpdateByQueryResult(took = 1, total = 10, updated = 8, deleted = 0, versionConflicts = 2)))
},
test("update by query request") {
assertZIO(
Executor.execute(
ElasticRequest
.updateByQuery(index = index, Script("ctx._source['intField']++"))
.updateByQuery(
index = index,
query = term(field = TestDocument.stringField, multiField = Some("keyword"), value = "StringField"),
script = Script("ctx._source['intField']++")
)
.conflicts(Proceed)
.routing(Routing("routing"))
.refreshTrue
)
)(equalTo(UpdateByQueryResponse(took = 1, total = 10, updated = 8, deleted = 0, versionConflicts = 2)))
)(equalTo(UpdateByQueryResult(took = 1, total = 10, updated = 8, deleted = 0, versionConflicts = 2)))
}
).provideShared(elasticsearchSttpLayer)
}

0 comments on commit 8d445b9

Please sign in to comment.