Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
dbulaja98 committed Mar 6, 2023
1 parent 0c13270 commit 1fba35f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ object HttpExecutorSpec extends IntegrationSpec {
checkOnce(genDocumentId) { documentId =>
assertZIO(
ElasticExecutor.execute(ElasticRequest.getById(index, documentId)).documentAs[CustomerDocument]
)(
isNone
)
)(isNone)
}
},
test("fail with throwable if decoding fails") {
Expand Down Expand Up @@ -202,10 +200,9 @@ object HttpExecutorSpec extends IntegrationSpec {
.refreshTrue
)
query = range("age").gte(0)
res <-
ElasticExecutor
.execute(ElasticRequest.search(secondSearchIndex, query))
.documentAs[CustomerDocument]
res <- ElasticExecutor
.execute(ElasticRequest.search(secondSearchIndex, query))
.documentAs[CustomerDocument]
} yield res

assertZIO(result.exit)(
Expand Down Expand Up @@ -302,19 +299,16 @@ object HttpExecutorSpec extends IntegrationSpec {

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <-
ElasticExecutor.execute(
ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer)
)
_ <-
ElasticExecutor.execute(
ElasticRequest
.upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer)
.refreshTrue
)
_ <- ElasticExecutor.execute(
ElasticRequest.upsert[CustomerDocument](firstSearchIndex, firstDocumentId, firstCustomer)
)
_ <- ElasticExecutor.execute(
ElasticRequest
.upsert[CustomerDocument](firstSearchIndex, secondDocumentId, secondCustomer)
.refreshTrue
)
query = range("balance").gte(100)
res <-
ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
} yield assert(res)(isNonEmpty)
}
} @@ around(
Expand All @@ -323,8 +317,7 @@ object HttpExecutorSpec extends IntegrationSpec {
),
test("search for documents using range query with multiple pages") {
checkOnce(genCustomer) { customer =>
def sink: Sink[Throwable, Item, Nothing, Chunk[Item]] =
ZSink.collectAll[Item]
def sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(secondSearchIndex, matchAll))
Expand Down Expand Up @@ -363,15 +356,25 @@ object HttpExecutorSpec extends IntegrationSpec {
_ <- ElasticExecutor.execute(ElasticRequest.bulk(reqs: _*).refreshTrue)
query = range("balance").gte(100)
res <- ElasticExecutor
.streamAs[CustomerDocument](
ElasticRequest.search(secondSearchIndex, query)
)
.streamAs[CustomerDocument](ElasticRequest.search(secondSearchIndex, query))
.run(sink)
} yield assert(res)(hasSize(equalTo(201)))
}
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(secondSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(secondSearchIndex)).orDie
),
test("search for documents using range query - empty stream") {
val sink: Sink[Throwable, Item, Nothing, Chunk[Item]] = ZSink.collectAll[Item]

for {
_ <- ElasticExecutor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
query = range("balance").gte(100)
res <- ElasticExecutor.stream(ElasticRequest.search(firstSearchIndex, query)).run(sink)
} yield assert(res)(hasSize(equalTo(0)))
} @@ around(
ElasticExecutor.execute(ElasticRequest.createIndex(firstSearchIndex)),
ElasticExecutor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
)
) @@ shrinks(0),
suite("deleting by query")(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package zio.elasticsearch

import sttp.client3.SttpBackend
import zio.elasticsearch.ElasticRequest.GetByQuery
import zio.elasticsearch.ElasticRequest.Search
import zio.schema.Schema
import zio.stream.{Stream, ZStream}
import zio.{RIO, Task, URLayer, ZIO, ZLayer}

private[elasticsearch] trait ElasticExecutor {
def execute[A](request: ElasticRequest[A]): Task[A]

def stream(request: GetByQuery): Stream[Throwable, Item]
def stream(request: Search): Stream[Throwable, Item]

def streamAs[A: Schema](request: GetByQuery): Stream[Throwable, A]
def streamAs[A: Schema](request: Search): Stream[Throwable, A]
}

object ElasticExecutor {
Expand All @@ -40,9 +40,9 @@ object ElasticExecutor {
private[elasticsearch] def execute[A](request: ElasticRequest[A]): RIO[ElasticExecutor, A] =
ZIO.serviceWithZIO[ElasticExecutor](_.execute(request))

private[elasticsearch] def stream(request: GetByQuery): ZStream[ElasticExecutor, Throwable, Item] =
private[elasticsearch] def stream(request: Search): ZStream[ElasticExecutor, Throwable, Item] =
ZStream.serviceWithStream[ElasticExecutor](_.stream(request))

private[elasticsearch] def streamAs[A: Schema](request: GetByQuery): ZStream[ElasticExecutor, Throwable, A] =
private[elasticsearch] def streamAs[A: Schema](request: Search): ZStream[ElasticExecutor, Throwable, A] =
ZStream.serviceWithStream[ElasticExecutor](_.streamAs[A](request))
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ trait HasRouting[A] {
def routing(value: Routing): ElasticRequest[A]
}

sealed trait ElasticRequest[A]

sealed trait BulkableRequest[A] extends ElasticRequest[A]

sealed trait ElasticRequest[A]

object ElasticRequest {

def bulk(requests: BulkableRequest[_]*): Bulk =
Expand Down Expand Up @@ -67,8 +67,8 @@ object ElasticRequest {
def getById(index: IndexName, id: DocumentId): GetById =
GetById(index = index, id = id, routing = None)

def search(index: IndexName, query: ElasticQuery[_]): GetByQuery =
GetByQuery(index = index, query = query, routing = None)
def search(index: IndexName, query: ElasticQuery[_]): Search =
Search(index = index, query = query, routing = None)

def upsert[A: Schema](index: IndexName, id: DocumentId, doc: A): CreateOrUpdate =
CreateOrUpdate(index = index, id = id, document = Document.from(doc), refresh = false, routing = None)
Expand Down Expand Up @@ -157,7 +157,7 @@ object ElasticRequest {
private[elasticsearch] final case class CreateIndex(
name: IndexName,
definition: Option[String]
) extends ElasticRequest[CreationOutcome]
) extends CreateIndexRequest

sealed trait CreateOrUpdateRequest extends BulkableRequest[Unit] with HasRefresh[Unit] with HasRouting[Unit]

Expand Down Expand Up @@ -243,7 +243,7 @@ object ElasticRequest {

sealed trait GetByQueryRequest extends ElasticRequest[SearchResult]

private[elasticsearch] final case class GetByQuery(
private[elasticsearch] final case class Search(
index: IndexName,
query: ElasticQuery[_],
routing: Option[Routing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
case r: DeleteIndex => executeDeleteIndex(r)
case r: Exists => executeExists(r)
case r: GetById => executeGetById(r)
case r: GetByQuery => executeGetByQuery(r)
case r: Search => executeSearch(r)
}

def stream(r: GetByQuery): Stream[Throwable, Item] =
def stream(r: Search): Stream[Throwable, Item] =
ZStream.paginateChunkZIO("") { s =>
if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s)
}

def streamAs[A: Schema](r: GetByQuery): Stream[Throwable, A] =
def streamAs[A: Schema](r: Search): Stream[Throwable, A] =
ZStream
.paginateChunkZIO("") { s =>
if (s.isEmpty) executeGetByQueryWithScroll(r) else executeGetByScroll(s)
Expand Down Expand Up @@ -224,7 +224,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
}
}

private def executeGetByQuery(r: GetByQuery): Task[SearchResult] =
private def executeSearch(r: Search): Task[SearchResult] =
sendRequestWithCustomResponse(
request
.post(uri"${config.uri}/${r.index}/$Search")
Expand All @@ -243,7 +243,7 @@ private[elasticsearch] final class HttpElasticExecutor private (config: ElasticC
}
}

private def executeGetByQueryWithScroll(r: GetByQuery): Task[(Chunk[Item], Option[String])] =
private def executeGetByQueryWithScroll(r: Search): Task[(Chunk[Item], Option[String])] =
sendRequestWithCustomResponse(
request
.post(
Expand Down

0 comments on commit 1fba35f

Please sign in to comment.